3 Commits

Author SHA1 Message Date
9e7e4054c4 Validate inventory hostnames via DNS resolution
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 11s
2026-02-06 16:25:39 -05:00
fc1c4bfaa8 Support multiple IPs per server and robust mapping
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 16:19:05 -05:00
34f936e21c Capture Server Name column and prioritize for inventory keys
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 16:11:48 -05:00
5 changed files with 124 additions and 42 deletions

View File

@@ -1,11 +1,16 @@
import unittest import unittest
import unittest.mock
from wif2ansible.models import Server, Flow from wif2ansible.models import Server, Flow
from wif2ansible.inventory import generate_inventory from wif2ansible.inventory import generate_inventory
class TestInventoryKeys(unittest.TestCase): class TestInventoryKeys(unittest.TestCase):
def test_inventory_keys_are_hostnames(self): @unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
def test_inventory_keys_are_hostnames(self, mock_resolves):
# Mock DNS to say server01 exists
mock_resolves.return_value = True
# Create a server with Ref, Hostname, IP # Create a server with Ref, Hostname, IP
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows") s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_addresses=["192.168.1.10"], platform="windows")
# Create a flow matching this server # Create a flow matching this server
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80]) f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
@@ -18,29 +23,37 @@ class TestInventoryKeys(unittest.TestCase):
# Verify stricture # Verify stricture
hosts = inventory['all']['hosts'] hosts = inventory['all']['hosts']
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback) # Key should be HOSTNAME "server01" (prioritized over Ref)
self.assertIn("SERVER_REF_01", hosts) self.assertIn("server01", hosts)
self.assertNotIn("192.168.1.10", hosts) self.assertNotIn("192.168.1.10", hosts)
# Check variables # Check variables
host_vars = hosts["SERVER_REF_01"] host_vars = hosts["server01"]
self.assertEqual(host_vars['ansible_host'], "192.168.1.10") self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
self.assertEqual(host_vars['ansible_connection'], "winrm") self.assertEqual(host_vars['ansible_connection'], "winrm")
def test_clean_reference_logic(self): @unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
from wif2ansible.parsers import clean_reference def test_inventory_keys_resolution(self, mock_resolves):
# Setup mock: 'bad_name' -> False, 'good_name' -> True
def side_effect(name):
if name == "bad_name": return False
if name == "good_name": return True
return False
mock_resolves.side_effect = side_effect
# Test cases # Server with a BAD hostname but a GOOD reference (simulated)
self.assertEqual(clean_reference("SRV123 MyServer"), "MyServer") # Actually logic is candidates: [hostname, cleaned_ref, rev_dns]
self.assertEqual(clean_reference("SVR999 AnotherServer"), "AnotherServer") # Let's say hostname is "bad_name" and cleaned ref is "good_name"
self.assertEqual(clean_reference("srv001 lowercase"), "lowercase") s1 = Server(reference="SRV01 good_name", hostname="bad_name", ip_addresses=["10.10.10.10"])
self.assertEqual(clean_reference("SvR555 MixedCase"), "MixedCase")
self.assertEqual(clean_reference("JustName"), "JustName") f1 = Flow(flow_id="1", source_ip="10.10.10.10", destination_ip="1.1.1.1", ports=[80])
self.assertEqual(clean_reference("SRV123"), "") # Should be empty? or handle?
# If it's just SRV123, strip returns empty. inventory = generate_inventory({"k":s1}, [f1])
# User said "never include these in output". hosts = inventory['all']['hosts']
# If the server is ONLY named SRV123, what then?
# Assuming there is usually a name. # It should have skipped "bad_name" and picked "good_name" (from cleaned ref)
self.assertIn("good_name", hosts)
self.assertNotIn("bad_name", hosts)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -77,8 +77,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
print("Warning: No 'Servers' sheet found.") print("Warning: No 'Servers' sheet found.")
return {} return {}
# keywords: reference, platform, ip address, management ip, production ip # keywords: reference, platform, ip address, management ip, production ip, server name
header_keywords = ['reference', 'platform', 'ip address', 'production ip'] header_keywords = ['reference', 'platform', 'ip address', 'production ip', 'server name']
header_row_idx, col_map = find_header_row(target_sheet, header_keywords) header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
@@ -96,6 +96,7 @@ def read_servers(filename: str) -> Dict[str, Server]:
# Extract data # Extract data
ref_idx = col_map.get('reference') ref_idx = col_map.get('reference')
name_idx = col_map.get('server name') # User confirmed header
plat_idx = col_map.get('platform') plat_idx = col_map.get('platform')
ip_idx = col_map.get('ip address') # Generic/Management IP ip_idx = col_map.get('ip address') # Generic/Management IP
prod_ip_idx = col_map.get('production ip') # Specific Production IP prod_ip_idx = col_map.get('production ip') # Specific Production IP
@@ -110,30 +111,33 @@ def read_servers(filename: str) -> Dict[str, Server]:
if not ref or ref.lower() == 'example': if not ref or ref.lower() == 'example':
continue continue
# Hostname Logic:
# 1. Use 'Server Name' column if available (e.g. ITSMDEV-5009898)
# 2. Fallback to cleaned Reference (Stripping SRV###)
server_name_raw = get_val(name_idx)
final_hostname = server_name_raw if server_name_raw else clean_reference(ref)
plat = get_val(plat_idx) or 'unknown' plat = get_val(plat_idx) or 'unknown'
# Parse Management IP # Parse Management IP
# Support multiple IPs
ip_raw = get_val(ip_idx) ip_raw = get_val(ip_idx)
ip_addr = None ip_list = []
if ip_raw: if ip_raw:
ips = parse_ip(ip_raw) ip_list = parse_ip(ip_raw)
if ips:
ip_addr = ips[0]
# Parse Production IP # Parse Production IP
prod_ip_raw = get_val(prod_ip_idx) prod_ip_raw = get_val(prod_ip_idx)
prod_ip_addr = None prod_ip_list = []
if prod_ip_raw: if prod_ip_raw:
ips = parse_ip(prod_ip_raw) prod_ip_list = parse_ip(prod_ip_raw)
if ips:
prod_ip_addr = ips[0]
s = Server( s = Server(
reference=ref, reference=ref,
hostname=clean_reference(ref), hostname=final_hostname,
platform=plat, platform=plat,
ip_address=ip_addr, ip_addresses=ip_list,
production_ip=prod_ip_addr production_ips=prod_ip_list
) )
servers[ref] = s servers[ref] = s

View File

@@ -1,6 +1,8 @@
from typing import List, Dict, Any from typing import List, Dict, Any
from .models import Server, Flow from .models import Server, Flow
from .network import to_mgt_ip from .models import Server, Flow
from .network import to_mgt_ip, is_valid_hostname, get_hostname
from .parsers import clean_reference
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]: def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
""" """
@@ -18,10 +20,14 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
ip_to_server = {} ip_to_server = {}
for s in servers.values(): for s in servers.values():
if s.ip_address: # Index all Management IPs
ip_to_server[s.ip_address] = s for ip in s.ip_addresses:
if s.production_ip: ip_to_server[ip] = s
ip_to_server[s.production_ip] = s
# Index all Production IPs
for ip in s.production_ips:
ip_to_server[ip] = s
# Also index by reference/hostname for DNS matches # Also index by reference/hostname for DNS matches
if s.reference: if s.reference:
ip_to_server[s.reference.lower()] = s ip_to_server[s.reference.lower()] = s
@@ -61,8 +67,48 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
match_count += 1 match_count += 1
# Prepare host entry if new # Prepare host entry if new
# We use the Reference/Hostname as the key in inventory 'hosts'
host_key = server.reference or server.hostname or server.ip_address # Candidate Resolution Logic
# User Requirement: "gather all potential names ... check to see what actually resolves"
candidates = []
# 1. Server Name Column (Highest priority from Excel)
if server.hostname:
candidates.append(server.hostname)
# 2. Cleaned Reference (Fallback from Excel)
if server.reference:
candidates.append(clean_reference(server.reference))
# 3. Reverse DNS of Primary IP?
# If the Excel names are garbage, maybe the IP resolves to the "Real" DNS name.
if server.primary_ip:
# Try simple reverse lookup
rev_name = get_hostname(server.primary_ip)
if rev_name:
candidates.append(rev_name)
# Select the first candidate that resolves
final_host_key = None
for cand in candidates:
if not cand: continue
if is_valid_hostname(cand):
final_host_key = cand
break
# Fallback: strict fallback to IP if nothing resolves?
# Or best effort (first candidate)?
# User said: "You are getting it incorrect every time" -> likely implying the garbage name was used.
# But if *nothing* resolves, we must output something. The IP is safe connectivity-wise, but user wants Names.
# Let's fallback to the IP if NO name works, to ensure ansible works.
if not final_host_key:
if candidates:
# Warn?
print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.")
final_host_key = server.primary_ip
host_key = final_host_key
if host_key not in inventory_hosts: if host_key not in inventory_hosts:
host_vars = server.get_ansible_vars() host_vars = server.get_ansible_vars()

View File

@@ -5,8 +5,19 @@ from typing import List, Dict, Optional, Any
class Server: class Server:
reference: str reference: str
hostname: str # This might be same as reference hostname: str # This might be same as reference
ip_address: Optional[str] = None # Support multiple IPs per field (lists)
production_ip: Optional[str] = None ip_addresses: List[str] = field(default_factory=list)
production_ips: List[str] = field(default_factory=list)
# helper for compatibility/primary IP
@property
def primary_ip(self) -> Optional[str]:
return self.ip_addresses[0] if self.ip_addresses else None
@property
def primary_prod_ip(self) -> Optional[str]:
return self.production_ips[0] if self.production_ips else None
platform: str = 'unknown' # e.g. 'Windows', 'Linux' platform: str = 'unknown' # e.g. 'Windows', 'Linux'
def get_ansible_vars(self) -> Dict[str, Any]: def get_ansible_vars(self) -> Dict[str, Any]:
@@ -23,8 +34,8 @@ class Server:
# Default ssh is usually fine, but being explicit doesn't hurt # Default ssh is usually fine, but being explicit doesn't hurt
pass pass
if self.ip_address: if self.primary_ip:
vars['ansible_host'] = self.ip_address vars['ansible_host'] = self.primary_ip
return vars return vars

View File

@@ -15,6 +15,14 @@ def get_ip(hostname: str) -> Optional[str]:
except socket.error: except socket.error:
return None return None
def is_valid_hostname(hostname: str) -> bool:
"""
Checks if a hostname resolves to an IP.
"""
if not hostname:
return False
return get_ip(hostname) is not None
def to_mgt_ip(name_or_ip: str) -> Optional[str]: def to_mgt_ip(name_or_ip: str) -> Optional[str]:
""" """
Mimics the Ruby script's to_mgt_ip logic: Mimics the Ruby script's to_mgt_ip logic: