5 Commits

Author SHA1 Message Date
9e7e4054c4 Validate inventory hostnames via DNS resolution
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 11s
2026-02-06 16:25:39 -05:00
fc1c4bfaa8 Support multiple IPs per server and robust mapping
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 16:19:05 -05:00
34f936e21c Capture Server Name column and prioritize for inventory keys
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 16:11:48 -05:00
5c95469ca3 Support SVR prefix in hostname cleanup
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 11s
2026-02-06 16:06:30 -05:00
07c7ec23d9 Add prompt_journal.md
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 11s
2026-02-06 15:58:20 -05:00
7 changed files with 154 additions and 31 deletions

26
prompt_journal.md Normal file
View File

@@ -0,0 +1,26 @@
# Prompt Journal
This document contains a log of the prompts used to build and refine this project.
## Session 1: Initial Refactor & Packaging
1. **Objective**: I have a ruby script that parses excel "WIF" documents that contain server names and network flows source destination ports. I need these to create an ansible inventory with attributes that detail the network connectivity required by each server in the solution. parse the WIF excel document and my existing by-hand ruby script and completely refactor them in python and increase the robustness of the parsing of network flows by using regex and allowing for shifts of cells, be aware of hidden cells and do not include a hidden cell in the output. Some cells may be visually the same cell with formatting/styling but in fact be different cells within the spreasheet - try to accomodate this. Ensure that only flows where the servers listed on the Servers tab are included in the resulting ansible inventory. the new python refactored script should allow me to take another WIF file with different servers and network flows and generate another ansible inventory file. You can see the two starting files to work with in the current project folder
2. **Packaging**: is there a way to package this
3. **Executable**: yes make me an exe
4. **Documentation**: create a readme.md that details how to run this
5. **Git Setup**: add all xls and xlsx to gitignore, add generated yml inventories to gitignore (not individually) all of them and all future. add and push this project to main at https://gitea.krisforbes.ca/krisf/wif2ansible.git. advise me of any potential ip address leakage before commiting and pushing this project
## Session 2: CI/CD & Logic Refinement
6. **CI/CD**: create me a gitea runner to generate a new release and automatically version releases each time a new push is made
7. **Push**: push it and ensure a new exe is generated by gitea runner
8. **Debug**: where is my bundled exe in my release
9. **Debug**: my release should contain an exe downloadable from the releases tab in git but it does not
10. **Debug**: error
11. **Debug**: error again
12. **Logic Update (Ports/DNS)**: when you parse a range of ports to be added to the result, only add the first, last, and middle port. examine the to_mgt_ip function in my original ruby script, this function converts the production interface public ipv4 address listed in the flows tab to the management private nic name by using DNS. please ensure this is implemented in the python version and push the new version
13. **Logic Update (Server Tab)**: thats a good point, you can also use the server's tab of the wif to ensure the name is used for the source so our ansible playbook can connect to the server. With the production ipv4, we have no ability to connect. use the server's tab of the WIF first and fallback to my ruby DNS method
14. **Release**: push a new version
15. **Output Format**: in the genrrated yaml file, i should see all: hosts : and the entires under this should not be IP addresses, they should be the server names from mapping the servers tab of the excel file
16. **Refining Output**: the server name key should not contain SRV### this is part of a reference and is not relevant except to provide mapping from the flows tab information to the servers information, please consider this SRV### information in your servers tab matching logic
17. **Documentation**: add a prompt journal to this project that cotnains a copy of all prompts used and automatically adds new prompts as i send them

View File

@@ -1,11 +1,16 @@
import unittest import unittest
import unittest.mock
from wif2ansible.models import Server, Flow from wif2ansible.models import Server, Flow
from wif2ansible.inventory import generate_inventory from wif2ansible.inventory import generate_inventory
class TestInventoryKeys(unittest.TestCase): class TestInventoryKeys(unittest.TestCase):
def test_inventory_keys_are_hostnames(self): @unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
def test_inventory_keys_are_hostnames(self, mock_resolves):
# Mock DNS to say server01 exists
mock_resolves.return_value = True
# Create a server with Ref, Hostname, IP # Create a server with Ref, Hostname, IP
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows") s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_addresses=["192.168.1.10"], platform="windows")
# Create a flow matching this server # Create a flow matching this server
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80]) f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
@@ -18,14 +23,37 @@ class TestInventoryKeys(unittest.TestCase):
# Verify stricture # Verify stricture
hosts = inventory['all']['hosts'] hosts = inventory['all']['hosts']
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback) # Key should be HOSTNAME "server01" (prioritized over Ref)
self.assertIn("SERVER_REF_01", hosts) self.assertIn("server01", hosts)
self.assertNotIn("192.168.1.10", hosts) self.assertNotIn("192.168.1.10", hosts)
# Check variables # Check variables
host_vars = hosts["SERVER_REF_01"] host_vars = hosts["server01"]
self.assertEqual(host_vars['ansible_host'], "192.168.1.10") self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
self.assertEqual(host_vars['ansible_connection'], "winrm") self.assertEqual(host_vars['ansible_connection'], "winrm")
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
def test_inventory_keys_resolution(self, mock_resolves):
# Setup mock: 'bad_name' -> False, 'good_name' -> True
def side_effect(name):
if name == "bad_name": return False
if name == "good_name": return True
return False
mock_resolves.side_effect = side_effect
# Server with a BAD hostname but a GOOD reference (simulated)
# Actually logic is candidates: [hostname, cleaned_ref, rev_dns]
# Let's say hostname is "bad_name" and cleaned ref is "good_name"
s1 = Server(reference="SRV01 good_name", hostname="bad_name", ip_addresses=["10.10.10.10"])
f1 = Flow(flow_id="1", source_ip="10.10.10.10", destination_ip="1.1.1.1", ports=[80])
inventory = generate_inventory({"k":s1}, [f1])
hosts = inventory['all']['hosts']
# It should have skipped "bad_name" and picked "good_name" (from cleaned ref)
self.assertIn("good_name", hosts)
self.assertNotIn("bad_name", hosts)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -77,8 +77,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
print("Warning: No 'Servers' sheet found.") print("Warning: No 'Servers' sheet found.")
return {} return {}
# keywords: reference, platform, ip address, management ip, production ip # keywords: reference, platform, ip address, management ip, production ip, server name
header_keywords = ['reference', 'platform', 'ip address', 'production ip'] header_keywords = ['reference', 'platform', 'ip address', 'production ip', 'server name']
header_row_idx, col_map = find_header_row(target_sheet, header_keywords) header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
@@ -96,6 +96,7 @@ def read_servers(filename: str) -> Dict[str, Server]:
# Extract data # Extract data
ref_idx = col_map.get('reference') ref_idx = col_map.get('reference')
name_idx = col_map.get('server name') # User confirmed header
plat_idx = col_map.get('platform') plat_idx = col_map.get('platform')
ip_idx = col_map.get('ip address') # Generic/Management IP ip_idx = col_map.get('ip address') # Generic/Management IP
prod_ip_idx = col_map.get('production ip') # Specific Production IP prod_ip_idx = col_map.get('production ip') # Specific Production IP
@@ -110,30 +111,33 @@ def read_servers(filename: str) -> Dict[str, Server]:
if not ref or ref.lower() == 'example': if not ref or ref.lower() == 'example':
continue continue
# Hostname Logic:
# 1. Use 'Server Name' column if available (e.g. ITSMDEV-5009898)
# 2. Fallback to cleaned Reference (Stripping SRV###)
server_name_raw = get_val(name_idx)
final_hostname = server_name_raw if server_name_raw else clean_reference(ref)
plat = get_val(plat_idx) or 'unknown' plat = get_val(plat_idx) or 'unknown'
# Parse Management IP # Parse Management IP
# Support multiple IPs
ip_raw = get_val(ip_idx) ip_raw = get_val(ip_idx)
ip_addr = None ip_list = []
if ip_raw: if ip_raw:
ips = parse_ip(ip_raw) ip_list = parse_ip(ip_raw)
if ips:
ip_addr = ips[0]
# Parse Production IP # Parse Production IP
prod_ip_raw = get_val(prod_ip_idx) prod_ip_raw = get_val(prod_ip_idx)
prod_ip_addr = None prod_ip_list = []
if prod_ip_raw: if prod_ip_raw:
ips = parse_ip(prod_ip_raw) prod_ip_list = parse_ip(prod_ip_raw)
if ips:
prod_ip_addr = ips[0]
s = Server( s = Server(
reference=ref, reference=ref,
hostname=clean_reference(ref), hostname=final_hostname,
platform=plat, platform=plat,
ip_address=ip_addr, ip_addresses=ip_list,
production_ip=prod_ip_addr production_ips=prod_ip_list
) )
servers[ref] = s servers[ref] = s

View File

@@ -1,6 +1,8 @@
from typing import List, Dict, Any from typing import List, Dict, Any
from .models import Server, Flow from .models import Server, Flow
from .network import to_mgt_ip from .models import Server, Flow
from .network import to_mgt_ip, is_valid_hostname, get_hostname
from .parsers import clean_reference
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]: def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
""" """
@@ -18,10 +20,14 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
ip_to_server = {} ip_to_server = {}
for s in servers.values(): for s in servers.values():
if s.ip_address: # Index all Management IPs
ip_to_server[s.ip_address] = s for ip in s.ip_addresses:
if s.production_ip: ip_to_server[ip] = s
ip_to_server[s.production_ip] = s
# Index all Production IPs
for ip in s.production_ips:
ip_to_server[ip] = s
# Also index by reference/hostname for DNS matches # Also index by reference/hostname for DNS matches
if s.reference: if s.reference:
ip_to_server[s.reference.lower()] = s ip_to_server[s.reference.lower()] = s
@@ -61,8 +67,48 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
match_count += 1 match_count += 1
# Prepare host entry if new # Prepare host entry if new
# We use the Reference/Hostname as the key in inventory 'hosts'
host_key = server.reference or server.hostname or server.ip_address # Candidate Resolution Logic
# User Requirement: "gather all potential names ... check to see what actually resolves"
candidates = []
# 1. Server Name Column (Highest priority from Excel)
if server.hostname:
candidates.append(server.hostname)
# 2. Cleaned Reference (Fallback from Excel)
if server.reference:
candidates.append(clean_reference(server.reference))
# 3. Reverse DNS of Primary IP?
# If the Excel names are garbage, maybe the IP resolves to the "Real" DNS name.
if server.primary_ip:
# Try simple reverse lookup
rev_name = get_hostname(server.primary_ip)
if rev_name:
candidates.append(rev_name)
# Select the first candidate that resolves
final_host_key = None
for cand in candidates:
if not cand: continue
if is_valid_hostname(cand):
final_host_key = cand
break
# Fallback: strict fallback to IP if nothing resolves?
# Or best effort (first candidate)?
# User said: "You are getting it incorrect every time" -> likely implying the garbage name was used.
# But if *nothing* resolves, we must output something. The IP is safe connectivity-wise, but user wants Names.
# Let's fallback to the IP if NO name works, to ensure ansible works.
if not final_host_key:
if candidates:
# Warn?
print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.")
final_host_key = server.primary_ip
host_key = final_host_key
if host_key not in inventory_hosts: if host_key not in inventory_hosts:
host_vars = server.get_ansible_vars() host_vars = server.get_ansible_vars()

View File

@@ -5,8 +5,19 @@ from typing import List, Dict, Optional, Any
class Server: class Server:
reference: str reference: str
hostname: str # This might be same as reference hostname: str # This might be same as reference
ip_address: Optional[str] = None # Support multiple IPs per field (lists)
production_ip: Optional[str] = None ip_addresses: List[str] = field(default_factory=list)
production_ips: List[str] = field(default_factory=list)
# helper for compatibility/primary IP
@property
def primary_ip(self) -> Optional[str]:
return self.ip_addresses[0] if self.ip_addresses else None
@property
def primary_prod_ip(self) -> Optional[str]:
return self.production_ips[0] if self.production_ips else None
platform: str = 'unknown' # e.g. 'Windows', 'Linux' platform: str = 'unknown' # e.g. 'Windows', 'Linux'
def get_ansible_vars(self) -> Dict[str, Any]: def get_ansible_vars(self) -> Dict[str, Any]:
@@ -23,8 +34,8 @@ class Server:
# Default ssh is usually fine, but being explicit doesn't hurt # Default ssh is usually fine, but being explicit doesn't hurt
pass pass
if self.ip_address: if self.primary_ip:
vars['ansible_host'] = self.ip_address vars['ansible_host'] = self.primary_ip
return vars return vars

View File

@@ -15,6 +15,14 @@ def get_ip(hostname: str) -> Optional[str]:
except socket.error: except socket.error:
return None return None
def is_valid_hostname(hostname: str) -> bool:
"""
Checks if a hostname resolves to an IP.
"""
if not hostname:
return False
return get_ip(hostname) is not None
def to_mgt_ip(name_or_ip: str) -> Optional[str]: def to_mgt_ip(name_or_ip: str) -> Optional[str]:
""" """
Mimics the Ruby script's to_mgt_ip logic: Mimics the Ruby script's to_mgt_ip logic:

View File

@@ -67,8 +67,8 @@ def clean_reference(ref: str) -> str:
return "" return ""
s = str(ref) s = str(ref)
# Remove SRV followed by digits and whitespace # Remove SRV or SVR followed by digits and whitespace
s = re.sub(r'SRV\d+\s*', '', s, flags=re.IGNORECASE) s = re.sub(r'S(RV|VR)\d+\s*', '', s, flags=re.IGNORECASE)
# Remove leading/trailing whitespace # Remove leading/trailing whitespace
return s.strip() return s.strip()