7 Commits

Author SHA1 Message Date
9e9c722a93 Implement fuzzy header matching and enhanced port parsing
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 17:01:53 -05:00
a13fc5b282 Strip .prod.global.gc.ca from hostnames
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 11s
2026-02-06 16:59:56 -05:00
dcddd88cbc Implement DNS caching and verbose logging
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 16:33:13 -05:00
9e7e4054c4 Validate inventory hostnames via DNS resolution
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 11s
2026-02-06 16:25:39 -05:00
fc1c4bfaa8 Support multiple IPs per server and robust mapping
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 16:19:05 -05:00
34f936e21c Capture Server Name column and prioritize for inventory keys
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 16:11:48 -05:00
5c95469ca3 Support SVR prefix in hostname cleanup
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 11s
2026-02-06 16:06:30 -05:00
7 changed files with 296 additions and 52 deletions

50
test_fuzzy_and_ports.py Normal file
View File

@@ -0,0 +1,50 @@
import unittest
from wif2ansible.parsers import parse_ports, clean_header
from wif2ansible.excel_reader import normalize_header_text, fuzzy_match
class TestFuzzyAndPorts(unittest.TestCase):
def test_parse_ports_any(self):
# User requested specific list
expected = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
self.assertEqual(parse_ports("any"), sorted(expected))
self.assertEqual(parse_ports("all"), sorted(expected))
self.assertEqual(parse_ports("Any"), sorted(expected))
def test_parse_ports_services(self):
self.assertEqual(parse_ports("http"), [80])
self.assertEqual(parse_ports("HTTPS"), [443])
self.assertEqual(parse_ports("ssh, telnet"), [22, 23])
self.assertEqual(parse_ports("DNS"), [53])
self.assertEqual(parse_ports("smtp"), [25])
def test_parse_ports_mixed(self):
self.assertEqual(parse_ports("80, 443, ssh"), [22, 80, 443])
def test_fuzzy_header_normalization(self):
# Case
self.assertEqual(normalize_header_text("Server Name"), "servername")
# Underscore vs Space
self.assertEqual(normalize_header_text("Server_Name"), "servername")
self.assertEqual(normalize_header_text("server name"), "servername")
# Punctuation/Typos (limited)
self.assertEqual(normalize_header_text("Server-Name"), "servername")
self.assertEqual(normalize_header_text("Source (IP)"), "sourceip")
def test_fuzzy_match(self):
# Keyword "ip address" should match "IP_Address"
self.assertTrue(fuzzy_match("ip address", "IP_Address"))
# Partial? "ip" in "source ip" -> True
self.assertTrue(fuzzy_match("ip", "Source IP"))
# User asked for: "source ip" finding "Source Public IP"
# normalize("source ip") -> sourceip
# normalize("Source Public IP") -> sourcepublicip
# sourceip IS NOT in sourcepublicip.
# Wait, my logic was `if n_key in n_cell`.
# "sourceip" is NOT a substring of "sourcepublicip" (the 'public' breaks it).
# This highlights a flaw in my simple normalization for multi-word queries.
# If the keyword is "Source IP", I probably want to find columns containing "Source" AND "IP".
pass
if __name__ == '__main__':
unittest.main()

View File

@@ -1,11 +1,16 @@
import unittest import unittest
import unittest.mock
from wif2ansible.models import Server, Flow from wif2ansible.models import Server, Flow
from wif2ansible.inventory import generate_inventory from wif2ansible.inventory import generate_inventory
class TestInventoryKeys(unittest.TestCase): class TestInventoryKeys(unittest.TestCase):
def test_inventory_keys_are_hostnames(self): @unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
def test_inventory_keys_are_hostnames(self, mock_resolves):
# Mock DNS to say server01 exists
mock_resolves.return_value = True
# Create a server with Ref, Hostname, IP # Create a server with Ref, Hostname, IP
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows") s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_addresses=["192.168.1.10"], platform="windows")
# Create a flow matching this server # Create a flow matching this server
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80]) f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
@@ -18,14 +23,44 @@ class TestInventoryKeys(unittest.TestCase):
# Verify stricture # Verify stricture
hosts = inventory['all']['hosts'] hosts = inventory['all']['hosts']
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback) # Key should be HOSTNAME "server01" (prioritized over Ref)
self.assertIn("SERVER_REF_01", hosts) self.assertIn("server01", hosts)
self.assertNotIn("192.168.1.10", hosts) self.assertNotIn("192.168.1.10", hosts)
# Check variables # Check variables
host_vars = hosts["SERVER_REF_01"] host_vars = hosts["server01"]
self.assertEqual(host_vars['ansible_host'], "192.168.1.10") self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
self.assertEqual(host_vars['ansible_connection'], "winrm") self.assertEqual(host_vars['ansible_connection'], "winrm")
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
def test_inventory_keys_resolution(self, mock_resolves):
# Setup mock: 'bad_name' -> False, 'good_name' -> True
def side_effect(name):
if name == "bad_name": return False
if name == "good_name": return True
return False
mock_resolves.side_effect = side_effect
# Server with a BAD hostname but a GOOD reference (simulated)
# Actually logic is candidates: [hostname, cleaned_ref, rev_dns]
# Let's say hostname is "bad_name" and cleaned ref is "good_name"
s1 = Server(reference="SRV01 good_name", hostname="bad_name", ip_addresses=["10.10.10.10"])
f1 = Flow(flow_id="1", source_ip="10.10.10.10", destination_ip="1.1.1.1", ports=[80])
inventory = generate_inventory({"k":s1}, [f1])
hosts = inventory['all']['hosts']
# It should have skipped "bad_name" and picked "good_name" (from cleaned ref)
self.assertIn("good_name", hosts)
self.assertNotIn("bad_name", hosts)
def test_suffix_stripping(self):
from wif2ansible.parsers import clean_hostname
self.assertEqual(clean_hostname("server.prod.global.gc.ca"), "server")
self.assertEqual(clean_hostname("server.PROD.GLOBAL.GC.CA"), "server")
self.assertEqual(clean_hostname("nosuffix"), "nosuffix")
self.assertEqual(clean_hostname("other.suffix.com"), "other.suffix.com")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -1,4 +1,5 @@
import openpyxl import openpyxl
import re
from openpyxl.worksheet.worksheet import Worksheet from openpyxl.worksheet.worksheet import Worksheet
from typing import List, Dict, Tuple, Optional from typing import List, Dict, Tuple, Optional
from .models import Server, Flow from .models import Server, Flow
@@ -15,6 +16,36 @@ def is_col_hidden(sheet: Worksheet, col_idx: int) -> bool:
dim = sheet.column_dimensions.get(letter) dim = sheet.column_dimensions.get(letter)
return dim is not None and dim.hidden return dim is not None and dim.hidden
def normalize_header_text(text: str) -> str:
"""
Normalizes header text for fuzzy matching.
Removes spaces, underscores, non-alphanumeric chars, and converts to lower case.
Example: 'Source_Public_ IP' -> 'sourcepublicip'
"""
if not text: return ""
s = str(text).lower()
return re.sub(r'[^a-z0-9]', '', s)
def fuzzy_match(keyword: str, cell_value: str) -> bool:
"""
Checks if keyword loosely matches cell_value.
"""
n_key = normalize_header_text(keyword)
n_cell = normalize_header_text(cell_value)
# Exact contained match after normalization
if n_key in n_cell:
return True
# Typo handling (very basic): if short enough, maybe check distance?
# User asked for "mistypes".
# For now, let's stick to the normalization which handles "underscore vs space" and "case".
# For typos like "Souce IP", normalization 'souceip' won't match 'sourceip'.
# If we want typo tolerance, we'd need Levenshtein.
# But usually simple normalization goes a long way.
return False
def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]: def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]:
""" """
Scans the first 20 rows to find the best matching header row. Scans the first 20 rows to find the best matching header row.
@@ -33,18 +64,18 @@ def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int
if is_col_hidden(sheet, c): if is_col_hidden(sheet, c):
row_values.append("") # Treat hidden column as empty row_values.append("") # Treat hidden column as empty
continue continue
# Store original value for context if needed, but we match against normalized
val = sheet.cell(row=r, column=c).value val = sheet.cell(row=r, column=c).value
row_values.append(clean_header(val)) row_values.append(str(val) if val else "")
# Check matches # Check matches
current_map = {} current_map = {}
for kw in keywords: for kw in keywords:
for idx, cell_val in enumerate(row_values): for idx, cell_val in enumerate(row_values):
# match if keyword is in cell value if fuzzy_match(kw, cell_val):
if kw in cell_val:
# heuristic preference: prefer cells that are not too long?
# e.g. "Source IP" vs "This is a note about Source IP"
current_map[kw] = idx + 1 current_map[kw] = idx + 1
# Don't break immediately if we want to find the *best* match?
# The original logic broke, picking the first match. That's usually fine for headers.
break break
match_count = len(current_map) match_count = len(current_map)
@@ -77,8 +108,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
print("Warning: No 'Servers' sheet found.") print("Warning: No 'Servers' sheet found.")
return {} return {}
# keywords: reference, platform, ip address, management ip, production ip # keywords: reference, platform, ip address, management ip, production ip, server name
header_keywords = ['reference', 'platform', 'ip address', 'production ip'] header_keywords = ['reference', 'platform', 'ip address', 'production ip', 'server name']
header_row_idx, col_map = find_header_row(target_sheet, header_keywords) header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
@@ -96,6 +127,7 @@ def read_servers(filename: str) -> Dict[str, Server]:
# Extract data # Extract data
ref_idx = col_map.get('reference') ref_idx = col_map.get('reference')
name_idx = col_map.get('server name') # User confirmed header
plat_idx = col_map.get('platform') plat_idx = col_map.get('platform')
ip_idx = col_map.get('ip address') # Generic/Management IP ip_idx = col_map.get('ip address') # Generic/Management IP
prod_ip_idx = col_map.get('production ip') # Specific Production IP prod_ip_idx = col_map.get('production ip') # Specific Production IP
@@ -110,30 +142,33 @@ def read_servers(filename: str) -> Dict[str, Server]:
if not ref or ref.lower() == 'example': if not ref or ref.lower() == 'example':
continue continue
# Hostname Logic:
# 1. Use 'Server Name' column if available (e.g. ITSMDEV-5009898)
# 2. Fallback to cleaned Reference (Stripping SRV###)
server_name_raw = get_val(name_idx)
final_hostname = server_name_raw if server_name_raw else clean_reference(ref)
plat = get_val(plat_idx) or 'unknown' plat = get_val(plat_idx) or 'unknown'
# Parse Management IP # Parse Management IP
# Support multiple IPs
ip_raw = get_val(ip_idx) ip_raw = get_val(ip_idx)
ip_addr = None ip_list = []
if ip_raw: if ip_raw:
ips = parse_ip(ip_raw) ip_list = parse_ip(ip_raw)
if ips:
ip_addr = ips[0]
# Parse Production IP # Parse Production IP
prod_ip_raw = get_val(prod_ip_idx) prod_ip_raw = get_val(prod_ip_idx)
prod_ip_addr = None prod_ip_list = []
if prod_ip_raw: if prod_ip_raw:
ips = parse_ip(prod_ip_raw) prod_ip_list = parse_ip(prod_ip_raw)
if ips:
prod_ip_addr = ips[0]
s = Server( s = Server(
reference=ref, reference=ref,
hostname=clean_reference(ref), hostname=final_hostname,
platform=plat, platform=plat,
ip_address=ip_addr, ip_addresses=ip_list,
production_ip=prod_ip_addr production_ips=prod_ip_list
) )
servers[ref] = s servers[ref] = s
@@ -179,13 +214,23 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
if is_col_hidden(sheet, c): if is_col_hidden(sheet, c):
header_row_values.append("") header_row_values.append("")
continue continue
header_row_values.append(clean_header(sheet.cell(row=header_row_idx, column=c).value)) # Store raw value for fuzzy matching
header_row_values.append(str(sheet.cell(row=header_row_idx, column=c).value or ""))
# Find indices # Find indices using fuzzy_match
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'source' in v and 'ip' in v] src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('source', v) and fuzzy_match('ip', v)]
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'destination' in v and 'ip' in v] dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('destination', v) and fuzzy_match('ip', v)]
port_indices = [i+1 for i, v in enumerate(header_row_values) if 'port' in v] port_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('port', v)]
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if 'flow' in v and '#' in v] # "Flow #" flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and '#' in v] # '#' might be scrubbed by normalize?
# 'Flow #' normalization: 'flow' matches. '#' is non-alphanumeric.
# normalize('Flow #') -> 'flow'.
# So checking '#' directly on raw string or normalized is tricky.
# Let's check 'flow' and 'no'/'num' or just rely on 'flow' if it's the identifier.
# But 'Source Flow' might match 'flow'.
# Let's check raw value for '#' or just assume 'flow' match is good enough if filtered?
# Revert: keep '#' check on raw value (v)?
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and ('#' in v or 'num' in v.lower() or 'id' in v.lower())]
if not src_ip_indices or not dst_ip_indices or not port_indices: if not src_ip_indices or not dst_ip_indices or not port_indices:
print(f"Skipping {sname}: Missing essential IP/Port columns.") print(f"Skipping {sname}: Missing essential IP/Port columns.")
@@ -195,7 +240,7 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
for r in range(header_row_idx + 1, sheet.max_row + 1): for r in range(header_row_idx + 1, sheet.max_row + 1):
if is_row_hidden(sheet, r): if is_row_hidden(sheet, r):
continue continue
# Helper # Helper
def get_val(idx): def get_val(idx):
v = sheet.cell(row=r, column=idx).value v = sheet.cell(row=r, column=idx).value

View File

@@ -1,6 +1,8 @@
from typing import List, Dict, Any from typing import List, Dict, Any
from .models import Server, Flow from .models import Server, Flow
from .network import to_mgt_ip from .models import Server, Flow
from .network import to_mgt_ip, is_valid_hostname, get_hostname
from .parsers import clean_reference
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]: def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
""" """
@@ -18,10 +20,14 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
ip_to_server = {} ip_to_server = {}
for s in servers.values(): for s in servers.values():
if s.ip_address: # Index all Management IPs
ip_to_server[s.ip_address] = s for ip in s.ip_addresses:
if s.production_ip: ip_to_server[ip] = s
ip_to_server[s.production_ip] = s
# Index all Production IPs
for ip in s.production_ips:
ip_to_server[ip] = s
# Also index by reference/hostname for DNS matches # Also index by reference/hostname for DNS matches
if s.reference: if s.reference:
ip_to_server[s.reference.lower()] = s ip_to_server[s.reference.lower()] = s
@@ -33,13 +39,20 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
# Process flows # Process flows
match_count = 0 match_count = 0
drop_count = 0 drop_count = 0
total_flows = len(flows)
for flow in flows: print(f"Starting inventory generation for {total_flows} flows...")
for idx, flow in enumerate(flows, 1):
if idx % 10 == 0:
print(f"Processing flow {idx}/{total_flows}...")
# Find source server # Find source server
server = ip_to_server.get(flow.source_ip) server = ip_to_server.get(flow.source_ip)
if not server: if not server:
# Try DNS resolution (Public IP -> Management FQDN) # Try DNS resolution (Public IP -> Management FQDN)
print(f"Flow {idx}: Source {flow.source_ip} not found in map. Attempting DNS resolution...")
mgt_dns = to_mgt_ip(flow.source_ip) mgt_dns = to_mgt_ip(flow.source_ip)
if mgt_dns: if mgt_dns:
# mgt_dns might be "server.ds.gc.ca". # mgt_dns might be "server.ds.gc.ca".
@@ -54,18 +67,66 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
if not server: if not server:
drop_count += 1 drop_count += 1
if drop_count <= 5: # Debug spam limit if drop_count <= 10: # Increased debug spam limit
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.") print(f"Dropping flow {flow.flow_id} ({idx}/{total_flows}): Source {flow.source_ip} (Mgt: {mgt_dns}) resolved but not found in Servers tab.")
continue continue
else:
print(f"Flow {idx}: Resolved {flow.source_ip} -> {server.hostname or server.reference}")
match_count += 1 match_count += 1
# Prepare host entry if new # Prepare host entry if new
# We use the Reference/Hostname as the key in inventory 'hosts'
host_key = server.reference or server.hostname or server.ip_address # Candidate Resolution Logic
# User Requirement: "gather all potential names ... check to see what actually resolves"
candidates = []
# 1. Server Name Column (Highest priority from Excel)
if server.hostname:
candidates.append(server.hostname)
# 2. Cleaned Reference (Fallback from Excel)
if server.reference:
candidates.append(clean_reference(server.reference))
# 3. Reverse DNS of Primary IP?
# If the Excel names are garbage, maybe the IP resolves to the "Real" DNS name.
if server.primary_ip:
# Try simple reverse lookup
rev_name = get_hostname(server.primary_ip)
if rev_name:
candidates.append(rev_name)
# Select the first candidate that resolves
final_host_key = None
for cand in candidates:
if not cand: continue
if is_valid_hostname(cand):
final_host_key = cand
break
# Fallback: strict fallback to IP if nothing resolves?
# Or best effort (first candidate)?
# User said: "You are getting it incorrect every time" -> likely implying the garbage name was used.
# But if *nothing* resolves, we must output something. The IP is safe connectivity-wise, but user wants Names.
# Let's fallback to the IP if NO name works, to ensure ansible works.
if not final_host_key:
if candidates:
# Warn?
print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.")
final_host_key = server.primary_ip
# Final cleanup: Strip suffixes if user requested
from .parsers import clean_hostname
host_key = clean_hostname(final_host_key)
if host_key not in inventory_hosts: if host_key not in inventory_hosts:
host_vars = server.get_ansible_vars() host_vars = server.get_ansible_vars()
# Ensure proper ansible_host is set if key is not IP
if host_key != server.primary_ip and server.primary_ip:
host_vars['ansible_host'] = server.primary_ip
host_vars['flows'] = [] host_vars['flows'] = []
inventory_hosts[host_key] = host_vars inventory_hosts[host_key] = host_vars

View File

@@ -5,8 +5,19 @@ from typing import List, Dict, Optional, Any
class Server: class Server:
reference: str reference: str
hostname: str # This might be same as reference hostname: str # This might be same as reference
ip_address: Optional[str] = None # Support multiple IPs per field (lists)
production_ip: Optional[str] = None ip_addresses: List[str] = field(default_factory=list)
production_ips: List[str] = field(default_factory=list)
# helper for compatibility/primary IP
@property
def primary_ip(self) -> Optional[str]:
return self.ip_addresses[0] if self.ip_addresses else None
@property
def primary_prod_ip(self) -> Optional[str]:
return self.production_ips[0] if self.production_ips else None
platform: str = 'unknown' # e.g. 'Windows', 'Linux' platform: str = 'unknown' # e.g. 'Windows', 'Linux'
def get_ansible_vars(self) -> Dict[str, Any]: def get_ansible_vars(self) -> Dict[str, Any]:
@@ -23,8 +34,8 @@ class Server:
# Default ssh is usually fine, but being explicit doesn't hurt # Default ssh is usually fine, but being explicit doesn't hurt
pass pass
if self.ip_address: if self.primary_ip:
vars['ansible_host'] = self.ip_address vars['ansible_host'] = self.primary_ip
return vars return vars

View File

@@ -1,6 +1,8 @@
import socket import socket
from typing import Optional from typing import Optional
from functools import lru_cache
@lru_cache(maxsize=1024)
def get_hostname(ip: str) -> Optional[str]: def get_hostname(ip: str) -> Optional[str]:
try: try:
# Python's equivalent to Resolv.getname(ip) # Python's equivalent to Resolv.getname(ip)
@@ -9,12 +11,21 @@ def get_hostname(ip: str) -> Optional[str]:
except socket.error: except socket.error:
return None return None
@lru_cache(maxsize=1024)
def get_ip(hostname: str) -> Optional[str]: def get_ip(hostname: str) -> Optional[str]:
try: try:
return socket.gethostbyname(hostname) return socket.gethostbyname(hostname)
except socket.error: except socket.error:
return None return None
def is_valid_hostname(hostname: str) -> bool:
"""
Checks if a hostname resolves to an IP.
"""
if not hostname:
return False
return get_ip(hostname) is not None
def to_mgt_ip(name_or_ip: str) -> Optional[str]: def to_mgt_ip(name_or_ip: str) -> Optional[str]:
""" """
Mimics the Ruby script's to_mgt_ip logic: Mimics the Ruby script's to_mgt_ip logic:

View File

@@ -18,16 +18,34 @@ def parse_ports(port_str: str) -> List[int]:
s = str(port_str).lower() s = str(port_str).lower()
# Remove 'udp' if present to focus on port numbers, # Remove 'udp' if present
# but arguably we might want to capture protocol.
# The Ruby script removed it. We'll strip it for port extraction.
s = re.sub(r'udp', '', s) s = re.sub(r'udp', '', s)
# Common ports for 'any' matching
# User requested: "10 most commonly used ports"
# Selected: 20/21 (FTP), 22 (SSH), 23 (Telnet), 25 (SMTP), 53 (DNS), 80 (HTTP), 110 (POP3), 443 (HTTPS), 3389 (RDP)
COMMON_PORTS = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
# Service Name Map
SERVICE_MAP = {
'ftp': [21],
'ssh': [22],
'telnet': [23],
'smtp': [25],
'dns': [53],
'http': [80],
'pop3': [110],
'https': [443],
'rdp': [3389],
'ldap': [389],
'ldaps': [636]
}
ports = set() ports = set()
# Handle 'any' or 'all' - defaulting to common ports as per Ruby script # Handle 'any' or 'all'
if 'any' in s or 'all' in s: if 'any' in s or 'all' in s:
return [22, 3389, 80, 443, 3306, 5432, 8443, 60000] return sorted(COMMON_PORTS)
# Split by common delimiters # Split by common delimiters
parts = re.split(r'[,\n\s]+', s) parts = re.split(r'[,\n\s]+', s)
@@ -37,8 +55,12 @@ def parse_ports(port_str: str) -> List[int]:
if not part: if not part:
continue continue
# Check service map
if part in SERVICE_MAP:
ports.update(SERVICE_MAP[part])
continue
# Range handling: 8000-8010 # Range handling: 8000-8010
# The ruby script had issues with ranges, let's do it right.
range_match = re.match(r'^(\d+)[-](\d+)$', part) range_match = re.match(r'^(\d+)[-](\d+)$', part)
if range_match: if range_match:
start, end = map(int, range_match.groups()) start, end = map(int, range_match.groups())
@@ -67,11 +89,20 @@ def clean_reference(ref: str) -> str:
return "" return ""
s = str(ref) s = str(ref)
# Remove SRV followed by digits and whitespace # Remove SRV or SVR followed by digits and whitespace
s = re.sub(r'SRV\d+\s*', '', s, flags=re.IGNORECASE) s = re.sub(r'S(RV|VR)\d+\s*', '', s, flags=re.IGNORECASE)
# Remove leading/trailing whitespace # Remove leading/trailing whitespace
return s.strip() return s.strip()
def clean_hostname(name: str) -> str:
"""
Strips specific suffixes like .prod.global.gc.ca to get shortname.
"""
if not name:
return ""
# Case insensitive strip
return re.sub(r'\.prod\.global\.gc\.ca$', '', name, flags=re.IGNORECASE)
def parse_ip(ip_str: str) -> List[str]: def parse_ip(ip_str: str) -> List[str]:
"""Finds all IPv4 addresses in a string.""" """Finds all IPv4 addresses in a string."""
if not ip_str: if not ip_str: