2 Commits

Author SHA1 Message Date
9e9c722a93 Implement fuzzy header matching and enhanced port parsing
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 17:01:53 -05:00
a13fc5b282 Strip .prod.global.gc.ca from hostnames
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 11s
2026-02-06 16:59:56 -05:00
5 changed files with 154 additions and 19 deletions

50
test_fuzzy_and_ports.py Normal file
View File

@@ -0,0 +1,50 @@
import unittest
from wif2ansible.parsers import parse_ports, clean_header
from wif2ansible.excel_reader import normalize_header_text, fuzzy_match
class TestFuzzyAndPorts(unittest.TestCase):
def test_parse_ports_any(self):
# User requested specific list
expected = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
self.assertEqual(parse_ports("any"), sorted(expected))
self.assertEqual(parse_ports("all"), sorted(expected))
self.assertEqual(parse_ports("Any"), sorted(expected))
def test_parse_ports_services(self):
self.assertEqual(parse_ports("http"), [80])
self.assertEqual(parse_ports("HTTPS"), [443])
self.assertEqual(parse_ports("ssh, telnet"), [22, 23])
self.assertEqual(parse_ports("DNS"), [53])
self.assertEqual(parse_ports("smtp"), [25])
def test_parse_ports_mixed(self):
self.assertEqual(parse_ports("80, 443, ssh"), [22, 80, 443])
def test_fuzzy_header_normalization(self):
# Case
self.assertEqual(normalize_header_text("Server Name"), "servername")
# Underscore vs Space
self.assertEqual(normalize_header_text("Server_Name"), "servername")
self.assertEqual(normalize_header_text("server name"), "servername")
# Punctuation/Typos (limited)
self.assertEqual(normalize_header_text("Server-Name"), "servername")
self.assertEqual(normalize_header_text("Source (IP)"), "sourceip")
def test_fuzzy_match(self):
# Keyword "ip address" should match "IP_Address"
self.assertTrue(fuzzy_match("ip address", "IP_Address"))
# Partial? "ip" in "source ip" -> True
self.assertTrue(fuzzy_match("ip", "Source IP"))
# User asked for: "source ip" finding "Source Public IP"
# normalize("source ip") -> sourceip
# normalize("Source Public IP") -> sourcepublicip
# sourceip IS NOT in sourcepublicip.
# Wait, my logic was `if n_key in n_cell`.
# "sourceip" is NOT a substring of "sourcepublicip" (the 'public' breaks it).
# This highlights a flaw in my simple normalization for multi-word queries.
# If the keyword is "Source IP", I probably want to find columns containing "Source" AND "IP".
pass
if __name__ == '__main__':
unittest.main()

View File

@@ -55,5 +55,12 @@ class TestInventoryKeys(unittest.TestCase):
self.assertIn("good_name", hosts) self.assertIn("good_name", hosts)
self.assertNotIn("bad_name", hosts) self.assertNotIn("bad_name", hosts)
def test_suffix_stripping(self):
from wif2ansible.parsers import clean_hostname
self.assertEqual(clean_hostname("server.prod.global.gc.ca"), "server")
self.assertEqual(clean_hostname("server.PROD.GLOBAL.GC.CA"), "server")
self.assertEqual(clean_hostname("nosuffix"), "nosuffix")
self.assertEqual(clean_hostname("other.suffix.com"), "other.suffix.com")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -1,4 +1,5 @@
import openpyxl import openpyxl
import re
from openpyxl.worksheet.worksheet import Worksheet from openpyxl.worksheet.worksheet import Worksheet
from typing import List, Dict, Tuple, Optional from typing import List, Dict, Tuple, Optional
from .models import Server, Flow from .models import Server, Flow
@@ -15,6 +16,36 @@ def is_col_hidden(sheet: Worksheet, col_idx: int) -> bool:
dim = sheet.column_dimensions.get(letter) dim = sheet.column_dimensions.get(letter)
return dim is not None and dim.hidden return dim is not None and dim.hidden
def normalize_header_text(text: str) -> str:
"""
Normalizes header text for fuzzy matching.
Removes spaces, underscores, non-alphanumeric chars, and converts to lower case.
Example: 'Source_Public_ IP' -> 'sourcepublicip'
"""
if not text: return ""
s = str(text).lower()
return re.sub(r'[^a-z0-9]', '', s)
def fuzzy_match(keyword: str, cell_value: str) -> bool:
"""
Checks if keyword loosely matches cell_value.
"""
n_key = normalize_header_text(keyword)
n_cell = normalize_header_text(cell_value)
# Exact contained match after normalization
if n_key in n_cell:
return True
# Typo handling (very basic): if short enough, maybe check distance?
# User asked for "mistypes".
# For now, let's stick to the normalization which handles "underscore vs space" and "case".
# For typos like "Souce IP", normalization 'souceip' won't match 'sourceip'.
# If we want typo tolerance, we'd need Levenshtein.
# But usually simple normalization goes a long way.
return False
def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]: def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]:
""" """
Scans the first 20 rows to find the best matching header row. Scans the first 20 rows to find the best matching header row.
@@ -33,18 +64,18 @@ def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int
if is_col_hidden(sheet, c): if is_col_hidden(sheet, c):
row_values.append("") # Treat hidden column as empty row_values.append("") # Treat hidden column as empty
continue continue
# Store original value for context if needed, but we match against normalized
val = sheet.cell(row=r, column=c).value val = sheet.cell(row=r, column=c).value
row_values.append(clean_header(val)) row_values.append(str(val) if val else "")
# Check matches # Check matches
current_map = {} current_map = {}
for kw in keywords: for kw in keywords:
for idx, cell_val in enumerate(row_values): for idx, cell_val in enumerate(row_values):
# match if keyword is in cell value if fuzzy_match(kw, cell_val):
if kw in cell_val:
# heuristic preference: prefer cells that are not too long?
# e.g. "Source IP" vs "This is a note about Source IP"
current_map[kw] = idx + 1 current_map[kw] = idx + 1
# Don't break immediately if we want to find the *best* match?
# The original logic broke, picking the first match. That's usually fine for headers.
break break
match_count = len(current_map) match_count = len(current_map)
@@ -183,13 +214,23 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
if is_col_hidden(sheet, c): if is_col_hidden(sheet, c):
header_row_values.append("") header_row_values.append("")
continue continue
header_row_values.append(clean_header(sheet.cell(row=header_row_idx, column=c).value)) # Store raw value for fuzzy matching
header_row_values.append(str(sheet.cell(row=header_row_idx, column=c).value or ""))
# Find indices # Find indices using fuzzy_match
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'source' in v and 'ip' in v] src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('source', v) and fuzzy_match('ip', v)]
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'destination' in v and 'ip' in v] dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('destination', v) and fuzzy_match('ip', v)]
port_indices = [i+1 for i, v in enumerate(header_row_values) if 'port' in v] port_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('port', v)]
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if 'flow' in v and '#' in v] # "Flow #" flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and '#' in v] # '#' might be scrubbed by normalize?
# 'Flow #' normalization: 'flow' matches. '#' is non-alphanumeric.
# normalize('Flow #') -> 'flow'.
# So checking '#' directly on raw string or normalized is tricky.
# Let's check 'flow' and 'no'/'num' or just rely on 'flow' if it's the identifier.
# But 'Source Flow' might match 'flow'.
# Let's check raw value for '#' or just assume 'flow' match is good enough if filtered?
# Revert: keep '#' check on raw value (v)?
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and ('#' in v or 'num' in v.lower() or 'id' in v.lower())]
if not src_ip_indices or not dst_ip_indices or not port_indices: if not src_ip_indices or not dst_ip_indices or not port_indices:
print(f"Skipping {sname}: Missing essential IP/Port columns.") print(f"Skipping {sname}: Missing essential IP/Port columns.")

View File

@@ -117,10 +117,16 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.") print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.")
final_host_key = server.primary_ip final_host_key = server.primary_ip
host_key = final_host_key # Final cleanup: Strip suffixes if user requested
from .parsers import clean_hostname
host_key = clean_hostname(final_host_key)
if host_key not in inventory_hosts: if host_key not in inventory_hosts:
host_vars = server.get_ansible_vars() host_vars = server.get_ansible_vars()
# Ensure proper ansible_host is set if key is not IP
if host_key != server.primary_ip and server.primary_ip:
host_vars['ansible_host'] = server.primary_ip
host_vars['flows'] = [] host_vars['flows'] = []
inventory_hosts[host_key] = host_vars inventory_hosts[host_key] = host_vars

View File

@@ -18,16 +18,34 @@ def parse_ports(port_str: str) -> List[int]:
s = str(port_str).lower() s = str(port_str).lower()
# Remove 'udp' if present to focus on port numbers, # Remove 'udp' if present
# but arguably we might want to capture protocol.
# The Ruby script removed it. We'll strip it for port extraction.
s = re.sub(r'udp', '', s) s = re.sub(r'udp', '', s)
# Common ports for 'any' matching
# User requested: "10 most commonly used ports"
# Selected: 20/21 (FTP), 22 (SSH), 23 (Telnet), 25 (SMTP), 53 (DNS), 80 (HTTP), 110 (POP3), 443 (HTTPS), 3389 (RDP)
COMMON_PORTS = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
# Service Name Map
SERVICE_MAP = {
'ftp': [21],
'ssh': [22],
'telnet': [23],
'smtp': [25],
'dns': [53],
'http': [80],
'pop3': [110],
'https': [443],
'rdp': [3389],
'ldap': [389],
'ldaps': [636]
}
ports = set() ports = set()
# Handle 'any' or 'all' - defaulting to common ports as per Ruby script # Handle 'any' or 'all'
if 'any' in s or 'all' in s: if 'any' in s or 'all' in s:
return [22, 3389, 80, 443, 3306, 5432, 8443, 60000] return sorted(COMMON_PORTS)
# Split by common delimiters # Split by common delimiters
parts = re.split(r'[,\n\s]+', s) parts = re.split(r'[,\n\s]+', s)
@@ -37,8 +55,12 @@ def parse_ports(port_str: str) -> List[int]:
if not part: if not part:
continue continue
# Check service map
if part in SERVICE_MAP:
ports.update(SERVICE_MAP[part])
continue
# Range handling: 8000-8010 # Range handling: 8000-8010
# The ruby script had issues with ranges, let's do it right.
range_match = re.match(r'^(\d+)[-](\d+)$', part) range_match = re.match(r'^(\d+)[-](\d+)$', part)
if range_match: if range_match:
start, end = map(int, range_match.groups()) start, end = map(int, range_match.groups())
@@ -72,6 +94,15 @@ def clean_reference(ref: str) -> str:
# Remove leading/trailing whitespace # Remove leading/trailing whitespace
return s.strip() return s.strip()
def clean_hostname(name: str) -> str:
"""
Strips specific suffixes like .prod.global.gc.ca to get shortname.
"""
if not name:
return ""
# Case insensitive strip
return re.sub(r'\.prod\.global\.gc\.ca$', '', name, flags=re.IGNORECASE)
def parse_ip(ip_str: str) -> List[str]: def parse_ip(ip_str: str) -> List[str]:
"""Finds all IPv4 addresses in a string.""" """Finds all IPv4 addresses in a string."""
if not ip_str: if not ip_str: