Compare commits
6 Commits
v2026.02.0
...
v2026.02.0
| Author | SHA1 | Date | |
|---|---|---|---|
| a13fc5b282 | |||
| dcddd88cbc | |||
| 9e7e4054c4 | |||
| fc1c4bfaa8 | |||
| 34f936e21c | |||
| 5c95469ca3 |
@@ -1,11 +1,16 @@
|
||||
import unittest
|
||||
import unittest.mock
|
||||
from wif2ansible.models import Server, Flow
|
||||
from wif2ansible.inventory import generate_inventory
|
||||
|
||||
class TestInventoryKeys(unittest.TestCase):
|
||||
def test_inventory_keys_are_hostnames(self):
|
||||
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
|
||||
def test_inventory_keys_are_hostnames(self, mock_resolves):
|
||||
# Mock DNS to say server01 exists
|
||||
mock_resolves.return_value = True
|
||||
|
||||
# Create a server with Ref, Hostname, IP
|
||||
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows")
|
||||
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_addresses=["192.168.1.10"], platform="windows")
|
||||
|
||||
# Create a flow matching this server
|
||||
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
|
||||
@@ -18,14 +23,44 @@ class TestInventoryKeys(unittest.TestCase):
|
||||
# Verify stricture
|
||||
hosts = inventory['all']['hosts']
|
||||
|
||||
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback)
|
||||
self.assertIn("SERVER_REF_01", hosts)
|
||||
# Key should be HOSTNAME "server01" (prioritized over Ref)
|
||||
self.assertIn("server01", hosts)
|
||||
self.assertNotIn("192.168.1.10", hosts)
|
||||
|
||||
# Check variables
|
||||
host_vars = hosts["SERVER_REF_01"]
|
||||
host_vars = hosts["server01"]
|
||||
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
||||
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
||||
|
||||
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
|
||||
def test_inventory_keys_resolution(self, mock_resolves):
|
||||
# Setup mock: 'bad_name' -> False, 'good_name' -> True
|
||||
def side_effect(name):
|
||||
if name == "bad_name": return False
|
||||
if name == "good_name": return True
|
||||
return False
|
||||
mock_resolves.side_effect = side_effect
|
||||
|
||||
# Server with a BAD hostname but a GOOD reference (simulated)
|
||||
# Actually logic is candidates: [hostname, cleaned_ref, rev_dns]
|
||||
# Let's say hostname is "bad_name" and cleaned ref is "good_name"
|
||||
s1 = Server(reference="SRV01 good_name", hostname="bad_name", ip_addresses=["10.10.10.10"])
|
||||
|
||||
f1 = Flow(flow_id="1", source_ip="10.10.10.10", destination_ip="1.1.1.1", ports=[80])
|
||||
|
||||
inventory = generate_inventory({"k":s1}, [f1])
|
||||
hosts = inventory['all']['hosts']
|
||||
|
||||
# It should have skipped "bad_name" and picked "good_name" (from cleaned ref)
|
||||
self.assertIn("good_name", hosts)
|
||||
self.assertNotIn("bad_name", hosts)
|
||||
|
||||
def test_suffix_stripping(self):
|
||||
from wif2ansible.parsers import clean_hostname
|
||||
self.assertEqual(clean_hostname("server.prod.global.gc.ca"), "server")
|
||||
self.assertEqual(clean_hostname("server.PROD.GLOBAL.GC.CA"), "server")
|
||||
self.assertEqual(clean_hostname("nosuffix"), "nosuffix")
|
||||
self.assertEqual(clean_hostname("other.suffix.com"), "other.suffix.com")
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -77,8 +77,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
print("Warning: No 'Servers' sheet found.")
|
||||
return {}
|
||||
|
||||
# keywords: reference, platform, ip address, management ip, production ip
|
||||
header_keywords = ['reference', 'platform', 'ip address', 'production ip']
|
||||
# keywords: reference, platform, ip address, management ip, production ip, server name
|
||||
header_keywords = ['reference', 'platform', 'ip address', 'production ip', 'server name']
|
||||
|
||||
header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
|
||||
|
||||
@@ -96,6 +96,7 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
|
||||
# Extract data
|
||||
ref_idx = col_map.get('reference')
|
||||
name_idx = col_map.get('server name') # User confirmed header
|
||||
plat_idx = col_map.get('platform')
|
||||
ip_idx = col_map.get('ip address') # Generic/Management IP
|
||||
prod_ip_idx = col_map.get('production ip') # Specific Production IP
|
||||
@@ -110,30 +111,33 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
if not ref or ref.lower() == 'example':
|
||||
continue
|
||||
|
||||
# Hostname Logic:
|
||||
# 1. Use 'Server Name' column if available (e.g. ITSMDEV-5009898)
|
||||
# 2. Fallback to cleaned Reference (Stripping SRV###)
|
||||
server_name_raw = get_val(name_idx)
|
||||
final_hostname = server_name_raw if server_name_raw else clean_reference(ref)
|
||||
|
||||
plat = get_val(plat_idx) or 'unknown'
|
||||
|
||||
# Parse Management IP
|
||||
# Support multiple IPs
|
||||
ip_raw = get_val(ip_idx)
|
||||
ip_addr = None
|
||||
ip_list = []
|
||||
if ip_raw:
|
||||
ips = parse_ip(ip_raw)
|
||||
if ips:
|
||||
ip_addr = ips[0]
|
||||
ip_list = parse_ip(ip_raw)
|
||||
|
||||
# Parse Production IP
|
||||
prod_ip_raw = get_val(prod_ip_idx)
|
||||
prod_ip_addr = None
|
||||
prod_ip_list = []
|
||||
if prod_ip_raw:
|
||||
ips = parse_ip(prod_ip_raw)
|
||||
if ips:
|
||||
prod_ip_addr = ips[0]
|
||||
prod_ip_list = parse_ip(prod_ip_raw)
|
||||
|
||||
s = Server(
|
||||
reference=ref,
|
||||
hostname=clean_reference(ref),
|
||||
hostname=final_hostname,
|
||||
platform=plat,
|
||||
ip_address=ip_addr,
|
||||
production_ip=prod_ip_addr
|
||||
ip_addresses=ip_list,
|
||||
production_ips=prod_ip_list
|
||||
)
|
||||
servers[ref] = s
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
from typing import List, Dict, Any
|
||||
from .models import Server, Flow
|
||||
from .network import to_mgt_ip
|
||||
from .models import Server, Flow
|
||||
from .network import to_mgt_ip, is_valid_hostname, get_hostname
|
||||
from .parsers import clean_reference
|
||||
|
||||
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -18,10 +20,14 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
|
||||
ip_to_server = {}
|
||||
for s in servers.values():
|
||||
if s.ip_address:
|
||||
ip_to_server[s.ip_address] = s
|
||||
if s.production_ip:
|
||||
ip_to_server[s.production_ip] = s
|
||||
# Index all Management IPs
|
||||
for ip in s.ip_addresses:
|
||||
ip_to_server[ip] = s
|
||||
|
||||
# Index all Production IPs
|
||||
for ip in s.production_ips:
|
||||
ip_to_server[ip] = s
|
||||
|
||||
# Also index by reference/hostname for DNS matches
|
||||
if s.reference:
|
||||
ip_to_server[s.reference.lower()] = s
|
||||
@@ -33,13 +39,20 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
# Process flows
|
||||
match_count = 0
|
||||
drop_count = 0
|
||||
total_flows = len(flows)
|
||||
|
||||
print(f"Starting inventory generation for {total_flows} flows...")
|
||||
|
||||
for idx, flow in enumerate(flows, 1):
|
||||
if idx % 10 == 0:
|
||||
print(f"Processing flow {idx}/{total_flows}...")
|
||||
|
||||
for flow in flows:
|
||||
# Find source server
|
||||
server = ip_to_server.get(flow.source_ip)
|
||||
|
||||
if not server:
|
||||
# Try DNS resolution (Public IP -> Management FQDN)
|
||||
print(f"Flow {idx}: Source {flow.source_ip} not found in map. Attempting DNS resolution...")
|
||||
mgt_dns = to_mgt_ip(flow.source_ip)
|
||||
if mgt_dns:
|
||||
# mgt_dns might be "server.ds.gc.ca".
|
||||
@@ -54,18 +67,66 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
|
||||
if not server:
|
||||
drop_count += 1
|
||||
if drop_count <= 5: # Debug spam limit
|
||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.")
|
||||
if drop_count <= 10: # Increased debug spam limit
|
||||
print(f"Dropping flow {flow.flow_id} ({idx}/{total_flows}): Source {flow.source_ip} (Mgt: {mgt_dns}) resolved but not found in Servers tab.")
|
||||
continue
|
||||
else:
|
||||
print(f"Flow {idx}: Resolved {flow.source_ip} -> {server.hostname or server.reference}")
|
||||
|
||||
match_count += 1
|
||||
|
||||
# Prepare host entry if new
|
||||
# We use the Reference/Hostname as the key in inventory 'hosts'
|
||||
host_key = server.reference or server.hostname or server.ip_address
|
||||
|
||||
# Candidate Resolution Logic
|
||||
# User Requirement: "gather all potential names ... check to see what actually resolves"
|
||||
candidates = []
|
||||
|
||||
# 1. Server Name Column (Highest priority from Excel)
|
||||
if server.hostname:
|
||||
candidates.append(server.hostname)
|
||||
|
||||
# 2. Cleaned Reference (Fallback from Excel)
|
||||
if server.reference:
|
||||
candidates.append(clean_reference(server.reference))
|
||||
|
||||
# 3. Reverse DNS of Primary IP?
|
||||
# If the Excel names are garbage, maybe the IP resolves to the "Real" DNS name.
|
||||
if server.primary_ip:
|
||||
# Try simple reverse lookup
|
||||
rev_name = get_hostname(server.primary_ip)
|
||||
if rev_name:
|
||||
candidates.append(rev_name)
|
||||
|
||||
# Select the first candidate that resolves
|
||||
|
||||
final_host_key = None
|
||||
for cand in candidates:
|
||||
if not cand: continue
|
||||
if is_valid_hostname(cand):
|
||||
final_host_key = cand
|
||||
break
|
||||
|
||||
# Fallback: strict fallback to IP if nothing resolves?
|
||||
# Or best effort (first candidate)?
|
||||
# User said: "You are getting it incorrect every time" -> likely implying the garbage name was used.
|
||||
# But if *nothing* resolves, we must output something. The IP is safe connectivity-wise, but user wants Names.
|
||||
# Let's fallback to the IP if NO name works, to ensure ansible works.
|
||||
if not final_host_key:
|
||||
if candidates:
|
||||
# Warn?
|
||||
print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.")
|
||||
final_host_key = server.primary_ip
|
||||
|
||||
# Final cleanup: Strip suffixes if user requested
|
||||
from .parsers import clean_hostname
|
||||
host_key = clean_hostname(final_host_key)
|
||||
|
||||
if host_key not in inventory_hosts:
|
||||
host_vars = server.get_ansible_vars()
|
||||
|
||||
# Ensure proper ansible_host is set if key is not IP
|
||||
if host_key != server.primary_ip and server.primary_ip:
|
||||
host_vars['ansible_host'] = server.primary_ip
|
||||
host_vars['flows'] = []
|
||||
inventory_hosts[host_key] = host_vars
|
||||
|
||||
|
||||
@@ -5,8 +5,19 @@ from typing import List, Dict, Optional, Any
|
||||
class Server:
|
||||
reference: str
|
||||
hostname: str # This might be same as reference
|
||||
ip_address: Optional[str] = None
|
||||
production_ip: Optional[str] = None
|
||||
# Support multiple IPs per field (lists)
|
||||
ip_addresses: List[str] = field(default_factory=list)
|
||||
production_ips: List[str] = field(default_factory=list)
|
||||
|
||||
# helper for compatibility/primary IP
|
||||
@property
|
||||
def primary_ip(self) -> Optional[str]:
|
||||
return self.ip_addresses[0] if self.ip_addresses else None
|
||||
|
||||
@property
|
||||
def primary_prod_ip(self) -> Optional[str]:
|
||||
return self.production_ips[0] if self.production_ips else None
|
||||
|
||||
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
||||
|
||||
def get_ansible_vars(self) -> Dict[str, Any]:
|
||||
@@ -23,8 +34,8 @@ class Server:
|
||||
# Default ssh is usually fine, but being explicit doesn't hurt
|
||||
pass
|
||||
|
||||
if self.ip_address:
|
||||
vars['ansible_host'] = self.ip_address
|
||||
if self.primary_ip:
|
||||
vars['ansible_host'] = self.primary_ip
|
||||
|
||||
return vars
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import socket
|
||||
from typing import Optional
|
||||
from functools import lru_cache
|
||||
|
||||
@lru_cache(maxsize=1024)
|
||||
def get_hostname(ip: str) -> Optional[str]:
|
||||
try:
|
||||
# Python's equivalent to Resolv.getname(ip)
|
||||
@@ -9,12 +11,21 @@ def get_hostname(ip: str) -> Optional[str]:
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
@lru_cache(maxsize=1024)
|
||||
def get_ip(hostname: str) -> Optional[str]:
|
||||
try:
|
||||
return socket.gethostbyname(hostname)
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
def is_valid_hostname(hostname: str) -> bool:
|
||||
"""
|
||||
Checks if a hostname resolves to an IP.
|
||||
"""
|
||||
if not hostname:
|
||||
return False
|
||||
return get_ip(hostname) is not None
|
||||
|
||||
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
||||
"""
|
||||
Mimics the Ruby script's to_mgt_ip logic:
|
||||
|
||||
@@ -67,11 +67,20 @@ def clean_reference(ref: str) -> str:
|
||||
return ""
|
||||
|
||||
s = str(ref)
|
||||
# Remove SRV followed by digits and whitespace
|
||||
s = re.sub(r'SRV\d+\s*', '', s, flags=re.IGNORECASE)
|
||||
# Remove SRV or SVR followed by digits and whitespace
|
||||
s = re.sub(r'S(RV|VR)\d+\s*', '', s, flags=re.IGNORECASE)
|
||||
# Remove leading/trailing whitespace
|
||||
return s.strip()
|
||||
|
||||
def clean_hostname(name: str) -> str:
|
||||
"""
|
||||
Strips specific suffixes like .prod.global.gc.ca to get shortname.
|
||||
"""
|
||||
if not name:
|
||||
return ""
|
||||
# Case insensitive strip
|
||||
return re.sub(r'\.prod\.global\.gc\.ca$', '', name, flags=re.IGNORECASE)
|
||||
|
||||
def parse_ip(ip_str: str) -> List[str]:
|
||||
"""Finds all IPv4 addresses in a string."""
|
||||
if not ip_str:
|
||||
|
||||
Reference in New Issue
Block a user