Compare commits
5 Commits
v2026.02.0
...
v2026.02.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e9c722a93 | |||
| a13fc5b282 | |||
| dcddd88cbc | |||
| 9e7e4054c4 | |||
| fc1c4bfaa8 |
50
test_fuzzy_and_ports.py
Normal file
50
test_fuzzy_and_ports.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
import unittest
|
||||||
|
from wif2ansible.parsers import parse_ports, clean_header
|
||||||
|
from wif2ansible.excel_reader import normalize_header_text, fuzzy_match
|
||||||
|
|
||||||
|
class TestFuzzyAndPorts(unittest.TestCase):
|
||||||
|
def test_parse_ports_any(self):
|
||||||
|
# User requested specific list
|
||||||
|
expected = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
|
||||||
|
self.assertEqual(parse_ports("any"), sorted(expected))
|
||||||
|
self.assertEqual(parse_ports("all"), sorted(expected))
|
||||||
|
self.assertEqual(parse_ports("Any"), sorted(expected))
|
||||||
|
|
||||||
|
def test_parse_ports_services(self):
|
||||||
|
self.assertEqual(parse_ports("http"), [80])
|
||||||
|
self.assertEqual(parse_ports("HTTPS"), [443])
|
||||||
|
self.assertEqual(parse_ports("ssh, telnet"), [22, 23])
|
||||||
|
self.assertEqual(parse_ports("DNS"), [53])
|
||||||
|
self.assertEqual(parse_ports("smtp"), [25])
|
||||||
|
|
||||||
|
def test_parse_ports_mixed(self):
|
||||||
|
self.assertEqual(parse_ports("80, 443, ssh"), [22, 80, 443])
|
||||||
|
|
||||||
|
def test_fuzzy_header_normalization(self):
|
||||||
|
# Case
|
||||||
|
self.assertEqual(normalize_header_text("Server Name"), "servername")
|
||||||
|
# Underscore vs Space
|
||||||
|
self.assertEqual(normalize_header_text("Server_Name"), "servername")
|
||||||
|
self.assertEqual(normalize_header_text("server name"), "servername")
|
||||||
|
# Punctuation/Typos (limited)
|
||||||
|
self.assertEqual(normalize_header_text("Server-Name"), "servername")
|
||||||
|
self.assertEqual(normalize_header_text("Source (IP)"), "sourceip")
|
||||||
|
|
||||||
|
def test_fuzzy_match(self):
|
||||||
|
# Keyword "ip address" should match "IP_Address"
|
||||||
|
self.assertTrue(fuzzy_match("ip address", "IP_Address"))
|
||||||
|
# Partial? "ip" in "source ip" -> True
|
||||||
|
self.assertTrue(fuzzy_match("ip", "Source IP"))
|
||||||
|
|
||||||
|
# User asked for: "source ip" finding "Source Public IP"
|
||||||
|
# normalize("source ip") -> sourceip
|
||||||
|
# normalize("Source Public IP") -> sourcepublicip
|
||||||
|
# sourceip IS NOT in sourcepublicip.
|
||||||
|
# Wait, my logic was `if n_key in n_cell`.
|
||||||
|
# "sourceip" is NOT a substring of "sourcepublicip" (the 'public' breaks it).
|
||||||
|
# This highlights a flaw in my simple normalization for multi-word queries.
|
||||||
|
# If the keyword is "Source IP", I probably want to find columns containing "Source" AND "IP".
|
||||||
|
pass
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
@@ -1,11 +1,16 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
import unittest.mock
|
||||||
from wif2ansible.models import Server, Flow
|
from wif2ansible.models import Server, Flow
|
||||||
from wif2ansible.inventory import generate_inventory
|
from wif2ansible.inventory import generate_inventory
|
||||||
|
|
||||||
class TestInventoryKeys(unittest.TestCase):
|
class TestInventoryKeys(unittest.TestCase):
|
||||||
def test_inventory_keys_are_hostnames(self):
|
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
|
||||||
|
def test_inventory_keys_are_hostnames(self, mock_resolves):
|
||||||
|
# Mock DNS to say server01 exists
|
||||||
|
mock_resolves.return_value = True
|
||||||
|
|
||||||
# Create a server with Ref, Hostname, IP
|
# Create a server with Ref, Hostname, IP
|
||||||
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows")
|
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_addresses=["192.168.1.10"], platform="windows")
|
||||||
|
|
||||||
# Create a flow matching this server
|
# Create a flow matching this server
|
||||||
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
|
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
|
||||||
@@ -18,29 +23,44 @@ class TestInventoryKeys(unittest.TestCase):
|
|||||||
# Verify stricture
|
# Verify stricture
|
||||||
hosts = inventory['all']['hosts']
|
hosts = inventory['all']['hosts']
|
||||||
|
|
||||||
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback)
|
# Key should be HOSTNAME "server01" (prioritized over Ref)
|
||||||
self.assertIn("SERVER_REF_01", hosts)
|
self.assertIn("server01", hosts)
|
||||||
self.assertNotIn("192.168.1.10", hosts)
|
self.assertNotIn("192.168.1.10", hosts)
|
||||||
|
|
||||||
# Check variables
|
# Check variables
|
||||||
host_vars = hosts["SERVER_REF_01"]
|
host_vars = hosts["server01"]
|
||||||
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
||||||
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
||||||
|
|
||||||
def test_clean_reference_logic(self):
|
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
|
||||||
from wif2ansible.parsers import clean_reference
|
def test_inventory_keys_resolution(self, mock_resolves):
|
||||||
|
# Setup mock: 'bad_name' -> False, 'good_name' -> True
|
||||||
|
def side_effect(name):
|
||||||
|
if name == "bad_name": return False
|
||||||
|
if name == "good_name": return True
|
||||||
|
return False
|
||||||
|
mock_resolves.side_effect = side_effect
|
||||||
|
|
||||||
# Test cases
|
# Server with a BAD hostname but a GOOD reference (simulated)
|
||||||
self.assertEqual(clean_reference("SRV123 MyServer"), "MyServer")
|
# Actually logic is candidates: [hostname, cleaned_ref, rev_dns]
|
||||||
self.assertEqual(clean_reference("SVR999 AnotherServer"), "AnotherServer")
|
# Let's say hostname is "bad_name" and cleaned ref is "good_name"
|
||||||
self.assertEqual(clean_reference("srv001 lowercase"), "lowercase")
|
s1 = Server(reference="SRV01 good_name", hostname="bad_name", ip_addresses=["10.10.10.10"])
|
||||||
self.assertEqual(clean_reference("SvR555 MixedCase"), "MixedCase")
|
|
||||||
self.assertEqual(clean_reference("JustName"), "JustName")
|
|
||||||
self.assertEqual(clean_reference("SRV123"), "") # Should be empty? or handle?
|
|
||||||
# If it's just SRV123, strip returns empty.
|
|
||||||
# User said "never include these in output".
|
|
||||||
# If the server is ONLY named SRV123, what then?
|
|
||||||
# Assuming there is usually a name.
|
|
||||||
|
|
||||||
|
f1 = Flow(flow_id="1", source_ip="10.10.10.10", destination_ip="1.1.1.1", ports=[80])
|
||||||
|
|
||||||
|
inventory = generate_inventory({"k":s1}, [f1])
|
||||||
|
hosts = inventory['all']['hosts']
|
||||||
|
|
||||||
|
# It should have skipped "bad_name" and picked "good_name" (from cleaned ref)
|
||||||
|
self.assertIn("good_name", hosts)
|
||||||
|
self.assertNotIn("bad_name", hosts)
|
||||||
|
|
||||||
|
def test_suffix_stripping(self):
|
||||||
|
from wif2ansible.parsers import clean_hostname
|
||||||
|
self.assertEqual(clean_hostname("server.prod.global.gc.ca"), "server")
|
||||||
|
self.assertEqual(clean_hostname("server.PROD.GLOBAL.GC.CA"), "server")
|
||||||
|
self.assertEqual(clean_hostname("nosuffix"), "nosuffix")
|
||||||
|
self.assertEqual(clean_hostname("other.suffix.com"), "other.suffix.com")
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import openpyxl
|
import openpyxl
|
||||||
|
import re
|
||||||
from openpyxl.worksheet.worksheet import Worksheet
|
from openpyxl.worksheet.worksheet import Worksheet
|
||||||
from typing import List, Dict, Tuple, Optional
|
from typing import List, Dict, Tuple, Optional
|
||||||
from .models import Server, Flow
|
from .models import Server, Flow
|
||||||
@@ -15,6 +16,36 @@ def is_col_hidden(sheet: Worksheet, col_idx: int) -> bool:
|
|||||||
dim = sheet.column_dimensions.get(letter)
|
dim = sheet.column_dimensions.get(letter)
|
||||||
return dim is not None and dim.hidden
|
return dim is not None and dim.hidden
|
||||||
|
|
||||||
|
def normalize_header_text(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Normalizes header text for fuzzy matching.
|
||||||
|
Removes spaces, underscores, non-alphanumeric chars, and converts to lower case.
|
||||||
|
Example: 'Source_Public_ IP' -> 'sourcepublicip'
|
||||||
|
"""
|
||||||
|
if not text: return ""
|
||||||
|
s = str(text).lower()
|
||||||
|
return re.sub(r'[^a-z0-9]', '', s)
|
||||||
|
|
||||||
|
def fuzzy_match(keyword: str, cell_value: str) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if keyword loosely matches cell_value.
|
||||||
|
"""
|
||||||
|
n_key = normalize_header_text(keyword)
|
||||||
|
n_cell = normalize_header_text(cell_value)
|
||||||
|
|
||||||
|
# Exact contained match after normalization
|
||||||
|
if n_key in n_cell:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Typo handling (very basic): if short enough, maybe check distance?
|
||||||
|
# User asked for "mistypes".
|
||||||
|
# For now, let's stick to the normalization which handles "underscore vs space" and "case".
|
||||||
|
# For typos like "Souce IP", normalization 'souceip' won't match 'sourceip'.
|
||||||
|
# If we want typo tolerance, we'd need Levenshtein.
|
||||||
|
# But usually simple normalization goes a long way.
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]:
|
def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]:
|
||||||
"""
|
"""
|
||||||
Scans the first 20 rows to find the best matching header row.
|
Scans the first 20 rows to find the best matching header row.
|
||||||
@@ -33,18 +64,18 @@ def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int
|
|||||||
if is_col_hidden(sheet, c):
|
if is_col_hidden(sheet, c):
|
||||||
row_values.append("") # Treat hidden column as empty
|
row_values.append("") # Treat hidden column as empty
|
||||||
continue
|
continue
|
||||||
|
# Store original value for context if needed, but we match against normalized
|
||||||
val = sheet.cell(row=r, column=c).value
|
val = sheet.cell(row=r, column=c).value
|
||||||
row_values.append(clean_header(val))
|
row_values.append(str(val) if val else "")
|
||||||
|
|
||||||
# Check matches
|
# Check matches
|
||||||
current_map = {}
|
current_map = {}
|
||||||
for kw in keywords:
|
for kw in keywords:
|
||||||
for idx, cell_val in enumerate(row_values):
|
for idx, cell_val in enumerate(row_values):
|
||||||
# match if keyword is in cell value
|
if fuzzy_match(kw, cell_val):
|
||||||
if kw in cell_val:
|
|
||||||
# heuristic preference: prefer cells that are not too long?
|
|
||||||
# e.g. "Source IP" vs "This is a note about Source IP"
|
|
||||||
current_map[kw] = idx + 1
|
current_map[kw] = idx + 1
|
||||||
|
# Don't break immediately if we want to find the *best* match?
|
||||||
|
# The original logic broke, picking the first match. That's usually fine for headers.
|
||||||
break
|
break
|
||||||
|
|
||||||
match_count = len(current_map)
|
match_count = len(current_map)
|
||||||
@@ -120,27 +151,24 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
|||||||
plat = get_val(plat_idx) or 'unknown'
|
plat = get_val(plat_idx) or 'unknown'
|
||||||
|
|
||||||
# Parse Management IP
|
# Parse Management IP
|
||||||
|
# Support multiple IPs
|
||||||
ip_raw = get_val(ip_idx)
|
ip_raw = get_val(ip_idx)
|
||||||
ip_addr = None
|
ip_list = []
|
||||||
if ip_raw:
|
if ip_raw:
|
||||||
ips = parse_ip(ip_raw)
|
ip_list = parse_ip(ip_raw)
|
||||||
if ips:
|
|
||||||
ip_addr = ips[0]
|
|
||||||
|
|
||||||
# Parse Production IP
|
# Parse Production IP
|
||||||
prod_ip_raw = get_val(prod_ip_idx)
|
prod_ip_raw = get_val(prod_ip_idx)
|
||||||
prod_ip_addr = None
|
prod_ip_list = []
|
||||||
if prod_ip_raw:
|
if prod_ip_raw:
|
||||||
ips = parse_ip(prod_ip_raw)
|
prod_ip_list = parse_ip(prod_ip_raw)
|
||||||
if ips:
|
|
||||||
prod_ip_addr = ips[0]
|
|
||||||
|
|
||||||
s = Server(
|
s = Server(
|
||||||
reference=ref,
|
reference=ref,
|
||||||
hostname=final_hostname,
|
hostname=final_hostname,
|
||||||
platform=plat,
|
platform=plat,
|
||||||
ip_address=ip_addr,
|
ip_addresses=ip_list,
|
||||||
production_ip=prod_ip_addr
|
production_ips=prod_ip_list
|
||||||
)
|
)
|
||||||
servers[ref] = s
|
servers[ref] = s
|
||||||
|
|
||||||
@@ -186,13 +214,23 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
|
|||||||
if is_col_hidden(sheet, c):
|
if is_col_hidden(sheet, c):
|
||||||
header_row_values.append("")
|
header_row_values.append("")
|
||||||
continue
|
continue
|
||||||
header_row_values.append(clean_header(sheet.cell(row=header_row_idx, column=c).value))
|
# Store raw value for fuzzy matching
|
||||||
|
header_row_values.append(str(sheet.cell(row=header_row_idx, column=c).value or ""))
|
||||||
|
|
||||||
# Find indices
|
# Find indices using fuzzy_match
|
||||||
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'source' in v and 'ip' in v]
|
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('source', v) and fuzzy_match('ip', v)]
|
||||||
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'destination' in v and 'ip' in v]
|
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('destination', v) and fuzzy_match('ip', v)]
|
||||||
port_indices = [i+1 for i, v in enumerate(header_row_values) if 'port' in v]
|
port_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('port', v)]
|
||||||
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if 'flow' in v and '#' in v] # "Flow #"
|
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and '#' in v] # '#' might be scrubbed by normalize?
|
||||||
|
|
||||||
|
# 'Flow #' normalization: 'flow' matches. '#' is non-alphanumeric.
|
||||||
|
# normalize('Flow #') -> 'flow'.
|
||||||
|
# So checking '#' directly on raw string or normalized is tricky.
|
||||||
|
# Let's check 'flow' and 'no'/'num' or just rely on 'flow' if it's the identifier.
|
||||||
|
# But 'Source Flow' might match 'flow'.
|
||||||
|
# Let's check raw value for '#' or just assume 'flow' match is good enough if filtered?
|
||||||
|
# Revert: keep '#' check on raw value (v)?
|
||||||
|
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and ('#' in v or 'num' in v.lower() or 'id' in v.lower())]
|
||||||
|
|
||||||
if not src_ip_indices or not dst_ip_indices or not port_indices:
|
if not src_ip_indices or not dst_ip_indices or not port_indices:
|
||||||
print(f"Skipping {sname}: Missing essential IP/Port columns.")
|
print(f"Skipping {sname}: Missing essential IP/Port columns.")
|
||||||
@@ -202,7 +240,7 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
|
|||||||
for r in range(header_row_idx + 1, sheet.max_row + 1):
|
for r in range(header_row_idx + 1, sheet.max_row + 1):
|
||||||
if is_row_hidden(sheet, r):
|
if is_row_hidden(sheet, r):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Helper
|
# Helper
|
||||||
def get_val(idx):
|
def get_val(idx):
|
||||||
v = sheet.cell(row=r, column=idx).value
|
v = sheet.cell(row=r, column=idx).value
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
from typing import List, Dict, Any
|
from typing import List, Dict, Any
|
||||||
from .models import Server, Flow
|
from .models import Server, Flow
|
||||||
from .network import to_mgt_ip
|
from .models import Server, Flow
|
||||||
|
from .network import to_mgt_ip, is_valid_hostname, get_hostname
|
||||||
|
from .parsers import clean_reference
|
||||||
|
|
||||||
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -18,10 +20,14 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
|
|
||||||
ip_to_server = {}
|
ip_to_server = {}
|
||||||
for s in servers.values():
|
for s in servers.values():
|
||||||
if s.ip_address:
|
# Index all Management IPs
|
||||||
ip_to_server[s.ip_address] = s
|
for ip in s.ip_addresses:
|
||||||
if s.production_ip:
|
ip_to_server[ip] = s
|
||||||
ip_to_server[s.production_ip] = s
|
|
||||||
|
# Index all Production IPs
|
||||||
|
for ip in s.production_ips:
|
||||||
|
ip_to_server[ip] = s
|
||||||
|
|
||||||
# Also index by reference/hostname for DNS matches
|
# Also index by reference/hostname for DNS matches
|
||||||
if s.reference:
|
if s.reference:
|
||||||
ip_to_server[s.reference.lower()] = s
|
ip_to_server[s.reference.lower()] = s
|
||||||
@@ -33,13 +39,20 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
# Process flows
|
# Process flows
|
||||||
match_count = 0
|
match_count = 0
|
||||||
drop_count = 0
|
drop_count = 0
|
||||||
|
total_flows = len(flows)
|
||||||
|
|
||||||
for flow in flows:
|
print(f"Starting inventory generation for {total_flows} flows...")
|
||||||
|
|
||||||
|
for idx, flow in enumerate(flows, 1):
|
||||||
|
if idx % 10 == 0:
|
||||||
|
print(f"Processing flow {idx}/{total_flows}...")
|
||||||
|
|
||||||
# Find source server
|
# Find source server
|
||||||
server = ip_to_server.get(flow.source_ip)
|
server = ip_to_server.get(flow.source_ip)
|
||||||
|
|
||||||
if not server:
|
if not server:
|
||||||
# Try DNS resolution (Public IP -> Management FQDN)
|
# Try DNS resolution (Public IP -> Management FQDN)
|
||||||
|
print(f"Flow {idx}: Source {flow.source_ip} not found in map. Attempting DNS resolution...")
|
||||||
mgt_dns = to_mgt_ip(flow.source_ip)
|
mgt_dns = to_mgt_ip(flow.source_ip)
|
||||||
if mgt_dns:
|
if mgt_dns:
|
||||||
# mgt_dns might be "server.ds.gc.ca".
|
# mgt_dns might be "server.ds.gc.ca".
|
||||||
@@ -54,18 +67,66 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
|
|
||||||
if not server:
|
if not server:
|
||||||
drop_count += 1
|
drop_count += 1
|
||||||
if drop_count <= 5: # Debug spam limit
|
if drop_count <= 10: # Increased debug spam limit
|
||||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.")
|
print(f"Dropping flow {flow.flow_id} ({idx}/{total_flows}): Source {flow.source_ip} (Mgt: {mgt_dns}) resolved but not found in Servers tab.")
|
||||||
continue
|
continue
|
||||||
|
else:
|
||||||
|
print(f"Flow {idx}: Resolved {flow.source_ip} -> {server.hostname or server.reference}")
|
||||||
|
|
||||||
match_count += 1
|
match_count += 1
|
||||||
|
|
||||||
# Prepare host entry if new
|
# Prepare host entry if new
|
||||||
# We use the Hostname (from Server Name col) -> Reference (cleaned) -> IP match
|
|
||||||
host_key = server.hostname or server.reference or server.ip_address
|
# Candidate Resolution Logic
|
||||||
|
# User Requirement: "gather all potential names ... check to see what actually resolves"
|
||||||
|
candidates = []
|
||||||
|
|
||||||
|
# 1. Server Name Column (Highest priority from Excel)
|
||||||
|
if server.hostname:
|
||||||
|
candidates.append(server.hostname)
|
||||||
|
|
||||||
|
# 2. Cleaned Reference (Fallback from Excel)
|
||||||
|
if server.reference:
|
||||||
|
candidates.append(clean_reference(server.reference))
|
||||||
|
|
||||||
|
# 3. Reverse DNS of Primary IP?
|
||||||
|
# If the Excel names are garbage, maybe the IP resolves to the "Real" DNS name.
|
||||||
|
if server.primary_ip:
|
||||||
|
# Try simple reverse lookup
|
||||||
|
rev_name = get_hostname(server.primary_ip)
|
||||||
|
if rev_name:
|
||||||
|
candidates.append(rev_name)
|
||||||
|
|
||||||
|
# Select the first candidate that resolves
|
||||||
|
|
||||||
|
final_host_key = None
|
||||||
|
for cand in candidates:
|
||||||
|
if not cand: continue
|
||||||
|
if is_valid_hostname(cand):
|
||||||
|
final_host_key = cand
|
||||||
|
break
|
||||||
|
|
||||||
|
# Fallback: strict fallback to IP if nothing resolves?
|
||||||
|
# Or best effort (first candidate)?
|
||||||
|
# User said: "You are getting it incorrect every time" -> likely implying the garbage name was used.
|
||||||
|
# But if *nothing* resolves, we must output something. The IP is safe connectivity-wise, but user wants Names.
|
||||||
|
# Let's fallback to the IP if NO name works, to ensure ansible works.
|
||||||
|
if not final_host_key:
|
||||||
|
if candidates:
|
||||||
|
# Warn?
|
||||||
|
print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.")
|
||||||
|
final_host_key = server.primary_ip
|
||||||
|
|
||||||
|
# Final cleanup: Strip suffixes if user requested
|
||||||
|
from .parsers import clean_hostname
|
||||||
|
host_key = clean_hostname(final_host_key)
|
||||||
|
|
||||||
if host_key not in inventory_hosts:
|
if host_key not in inventory_hosts:
|
||||||
host_vars = server.get_ansible_vars()
|
host_vars = server.get_ansible_vars()
|
||||||
|
|
||||||
|
# Ensure proper ansible_host is set if key is not IP
|
||||||
|
if host_key != server.primary_ip and server.primary_ip:
|
||||||
|
host_vars['ansible_host'] = server.primary_ip
|
||||||
host_vars['flows'] = []
|
host_vars['flows'] = []
|
||||||
inventory_hosts[host_key] = host_vars
|
inventory_hosts[host_key] = host_vars
|
||||||
|
|
||||||
|
|||||||
@@ -5,8 +5,19 @@ from typing import List, Dict, Optional, Any
|
|||||||
class Server:
|
class Server:
|
||||||
reference: str
|
reference: str
|
||||||
hostname: str # This might be same as reference
|
hostname: str # This might be same as reference
|
||||||
ip_address: Optional[str] = None
|
# Support multiple IPs per field (lists)
|
||||||
production_ip: Optional[str] = None
|
ip_addresses: List[str] = field(default_factory=list)
|
||||||
|
production_ips: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
# helper for compatibility/primary IP
|
||||||
|
@property
|
||||||
|
def primary_ip(self) -> Optional[str]:
|
||||||
|
return self.ip_addresses[0] if self.ip_addresses else None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def primary_prod_ip(self) -> Optional[str]:
|
||||||
|
return self.production_ips[0] if self.production_ips else None
|
||||||
|
|
||||||
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
||||||
|
|
||||||
def get_ansible_vars(self) -> Dict[str, Any]:
|
def get_ansible_vars(self) -> Dict[str, Any]:
|
||||||
@@ -23,8 +34,8 @@ class Server:
|
|||||||
# Default ssh is usually fine, but being explicit doesn't hurt
|
# Default ssh is usually fine, but being explicit doesn't hurt
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if self.ip_address:
|
if self.primary_ip:
|
||||||
vars['ansible_host'] = self.ip_address
|
vars['ansible_host'] = self.primary_ip
|
||||||
|
|
||||||
return vars
|
return vars
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import socket
|
import socket
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
|
@lru_cache(maxsize=1024)
|
||||||
def get_hostname(ip: str) -> Optional[str]:
|
def get_hostname(ip: str) -> Optional[str]:
|
||||||
try:
|
try:
|
||||||
# Python's equivalent to Resolv.getname(ip)
|
# Python's equivalent to Resolv.getname(ip)
|
||||||
@@ -9,12 +11,21 @@ def get_hostname(ip: str) -> Optional[str]:
|
|||||||
except socket.error:
|
except socket.error:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@lru_cache(maxsize=1024)
|
||||||
def get_ip(hostname: str) -> Optional[str]:
|
def get_ip(hostname: str) -> Optional[str]:
|
||||||
try:
|
try:
|
||||||
return socket.gethostbyname(hostname)
|
return socket.gethostbyname(hostname)
|
||||||
except socket.error:
|
except socket.error:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def is_valid_hostname(hostname: str) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if a hostname resolves to an IP.
|
||||||
|
"""
|
||||||
|
if not hostname:
|
||||||
|
return False
|
||||||
|
return get_ip(hostname) is not None
|
||||||
|
|
||||||
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
Mimics the Ruby script's to_mgt_ip logic:
|
Mimics the Ruby script's to_mgt_ip logic:
|
||||||
|
|||||||
@@ -18,16 +18,34 @@ def parse_ports(port_str: str) -> List[int]:
|
|||||||
|
|
||||||
s = str(port_str).lower()
|
s = str(port_str).lower()
|
||||||
|
|
||||||
# Remove 'udp' if present to focus on port numbers,
|
# Remove 'udp' if present
|
||||||
# but arguably we might want to capture protocol.
|
|
||||||
# The Ruby script removed it. We'll strip it for port extraction.
|
|
||||||
s = re.sub(r'udp', '', s)
|
s = re.sub(r'udp', '', s)
|
||||||
|
|
||||||
|
# Common ports for 'any' matching
|
||||||
|
# User requested: "10 most commonly used ports"
|
||||||
|
# Selected: 20/21 (FTP), 22 (SSH), 23 (Telnet), 25 (SMTP), 53 (DNS), 80 (HTTP), 110 (POP3), 443 (HTTPS), 3389 (RDP)
|
||||||
|
COMMON_PORTS = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
|
||||||
|
|
||||||
|
# Service Name Map
|
||||||
|
SERVICE_MAP = {
|
||||||
|
'ftp': [21],
|
||||||
|
'ssh': [22],
|
||||||
|
'telnet': [23],
|
||||||
|
'smtp': [25],
|
||||||
|
'dns': [53],
|
||||||
|
'http': [80],
|
||||||
|
'pop3': [110],
|
||||||
|
'https': [443],
|
||||||
|
'rdp': [3389],
|
||||||
|
'ldap': [389],
|
||||||
|
'ldaps': [636]
|
||||||
|
}
|
||||||
|
|
||||||
ports = set()
|
ports = set()
|
||||||
|
|
||||||
# Handle 'any' or 'all' - defaulting to common ports as per Ruby script
|
# Handle 'any' or 'all'
|
||||||
if 'any' in s or 'all' in s:
|
if 'any' in s or 'all' in s:
|
||||||
return [22, 3389, 80, 443, 3306, 5432, 8443, 60000]
|
return sorted(COMMON_PORTS)
|
||||||
|
|
||||||
# Split by common delimiters
|
# Split by common delimiters
|
||||||
parts = re.split(r'[,\n\s]+', s)
|
parts = re.split(r'[,\n\s]+', s)
|
||||||
@@ -37,8 +55,12 @@ def parse_ports(port_str: str) -> List[int]:
|
|||||||
if not part:
|
if not part:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Check service map
|
||||||
|
if part in SERVICE_MAP:
|
||||||
|
ports.update(SERVICE_MAP[part])
|
||||||
|
continue
|
||||||
|
|
||||||
# Range handling: 8000-8010
|
# Range handling: 8000-8010
|
||||||
# The ruby script had issues with ranges, let's do it right.
|
|
||||||
range_match = re.match(r'^(\d+)[-](\d+)$', part)
|
range_match = re.match(r'^(\d+)[-](\d+)$', part)
|
||||||
if range_match:
|
if range_match:
|
||||||
start, end = map(int, range_match.groups())
|
start, end = map(int, range_match.groups())
|
||||||
@@ -72,6 +94,15 @@ def clean_reference(ref: str) -> str:
|
|||||||
# Remove leading/trailing whitespace
|
# Remove leading/trailing whitespace
|
||||||
return s.strip()
|
return s.strip()
|
||||||
|
|
||||||
|
def clean_hostname(name: str) -> str:
|
||||||
|
"""
|
||||||
|
Strips specific suffixes like .prod.global.gc.ca to get shortname.
|
||||||
|
"""
|
||||||
|
if not name:
|
||||||
|
return ""
|
||||||
|
# Case insensitive strip
|
||||||
|
return re.sub(r'\.prod\.global\.gc\.ca$', '', name, flags=re.IGNORECASE)
|
||||||
|
|
||||||
def parse_ip(ip_str: str) -> List[str]:
|
def parse_ip(ip_str: str) -> List[str]:
|
||||||
"""Finds all IPv4 addresses in a string."""
|
"""Finds all IPv4 addresses in a string."""
|
||||||
if not ip_str:
|
if not ip_str:
|
||||||
|
|||||||
Reference in New Issue
Block a user