Implement fuzzy header matching and enhanced port parsing
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
This commit is contained in:
50
test_fuzzy_and_ports.py
Normal file
50
test_fuzzy_and_ports.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import unittest
|
||||
from wif2ansible.parsers import parse_ports, clean_header
|
||||
from wif2ansible.excel_reader import normalize_header_text, fuzzy_match
|
||||
|
||||
class TestFuzzyAndPorts(unittest.TestCase):
|
||||
def test_parse_ports_any(self):
|
||||
# User requested specific list
|
||||
expected = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
|
||||
self.assertEqual(parse_ports("any"), sorted(expected))
|
||||
self.assertEqual(parse_ports("all"), sorted(expected))
|
||||
self.assertEqual(parse_ports("Any"), sorted(expected))
|
||||
|
||||
def test_parse_ports_services(self):
|
||||
self.assertEqual(parse_ports("http"), [80])
|
||||
self.assertEqual(parse_ports("HTTPS"), [443])
|
||||
self.assertEqual(parse_ports("ssh, telnet"), [22, 23])
|
||||
self.assertEqual(parse_ports("DNS"), [53])
|
||||
self.assertEqual(parse_ports("smtp"), [25])
|
||||
|
||||
def test_parse_ports_mixed(self):
|
||||
self.assertEqual(parse_ports("80, 443, ssh"), [22, 80, 443])
|
||||
|
||||
def test_fuzzy_header_normalization(self):
|
||||
# Case
|
||||
self.assertEqual(normalize_header_text("Server Name"), "servername")
|
||||
# Underscore vs Space
|
||||
self.assertEqual(normalize_header_text("Server_Name"), "servername")
|
||||
self.assertEqual(normalize_header_text("server name"), "servername")
|
||||
# Punctuation/Typos (limited)
|
||||
self.assertEqual(normalize_header_text("Server-Name"), "servername")
|
||||
self.assertEqual(normalize_header_text("Source (IP)"), "sourceip")
|
||||
|
||||
def test_fuzzy_match(self):
|
||||
# Keyword "ip address" should match "IP_Address"
|
||||
self.assertTrue(fuzzy_match("ip address", "IP_Address"))
|
||||
# Partial? "ip" in "source ip" -> True
|
||||
self.assertTrue(fuzzy_match("ip", "Source IP"))
|
||||
|
||||
# User asked for: "source ip" finding "Source Public IP"
|
||||
# normalize("source ip") -> sourceip
|
||||
# normalize("Source Public IP") -> sourcepublicip
|
||||
# sourceip IS NOT in sourcepublicip.
|
||||
# Wait, my logic was `if n_key in n_cell`.
|
||||
# "sourceip" is NOT a substring of "sourcepublicip" (the 'public' breaks it).
|
||||
# This highlights a flaw in my simple normalization for multi-word queries.
|
||||
# If the keyword is "Source IP", I probably want to find columns containing "Source" AND "IP".
|
||||
pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -1,4 +1,5 @@
|
||||
import openpyxl
|
||||
import re
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
from .models import Server, Flow
|
||||
@@ -15,6 +16,36 @@ def is_col_hidden(sheet: Worksheet, col_idx: int) -> bool:
|
||||
dim = sheet.column_dimensions.get(letter)
|
||||
return dim is not None and dim.hidden
|
||||
|
||||
def normalize_header_text(text: str) -> str:
|
||||
"""
|
||||
Normalizes header text for fuzzy matching.
|
||||
Removes spaces, underscores, non-alphanumeric chars, and converts to lower case.
|
||||
Example: 'Source_Public_ IP' -> 'sourcepublicip'
|
||||
"""
|
||||
if not text: return ""
|
||||
s = str(text).lower()
|
||||
return re.sub(r'[^a-z0-9]', '', s)
|
||||
|
||||
def fuzzy_match(keyword: str, cell_value: str) -> bool:
|
||||
"""
|
||||
Checks if keyword loosely matches cell_value.
|
||||
"""
|
||||
n_key = normalize_header_text(keyword)
|
||||
n_cell = normalize_header_text(cell_value)
|
||||
|
||||
# Exact contained match after normalization
|
||||
if n_key in n_cell:
|
||||
return True
|
||||
|
||||
# Typo handling (very basic): if short enough, maybe check distance?
|
||||
# User asked for "mistypes".
|
||||
# For now, let's stick to the normalization which handles "underscore vs space" and "case".
|
||||
# For typos like "Souce IP", normalization 'souceip' won't match 'sourceip'.
|
||||
# If we want typo tolerance, we'd need Levenshtein.
|
||||
# But usually simple normalization goes a long way.
|
||||
|
||||
return False
|
||||
|
||||
def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]:
|
||||
"""
|
||||
Scans the first 20 rows to find the best matching header row.
|
||||
@@ -33,18 +64,18 @@ def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int
|
||||
if is_col_hidden(sheet, c):
|
||||
row_values.append("") # Treat hidden column as empty
|
||||
continue
|
||||
# Store original value for context if needed, but we match against normalized
|
||||
val = sheet.cell(row=r, column=c).value
|
||||
row_values.append(clean_header(val))
|
||||
row_values.append(str(val) if val else "")
|
||||
|
||||
# Check matches
|
||||
current_map = {}
|
||||
for kw in keywords:
|
||||
for idx, cell_val in enumerate(row_values):
|
||||
# match if keyword is in cell value
|
||||
if kw in cell_val:
|
||||
# heuristic preference: prefer cells that are not too long?
|
||||
# e.g. "Source IP" vs "This is a note about Source IP"
|
||||
if fuzzy_match(kw, cell_val):
|
||||
current_map[kw] = idx + 1
|
||||
# Don't break immediately if we want to find the *best* match?
|
||||
# The original logic broke, picking the first match. That's usually fine for headers.
|
||||
break
|
||||
|
||||
match_count = len(current_map)
|
||||
@@ -183,13 +214,23 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
|
||||
if is_col_hidden(sheet, c):
|
||||
header_row_values.append("")
|
||||
continue
|
||||
header_row_values.append(clean_header(sheet.cell(row=header_row_idx, column=c).value))
|
||||
# Store raw value for fuzzy matching
|
||||
header_row_values.append(str(sheet.cell(row=header_row_idx, column=c).value or ""))
|
||||
|
||||
# Find indices
|
||||
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'source' in v and 'ip' in v]
|
||||
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'destination' in v and 'ip' in v]
|
||||
port_indices = [i+1 for i, v in enumerate(header_row_values) if 'port' in v]
|
||||
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if 'flow' in v and '#' in v] # "Flow #"
|
||||
# Find indices using fuzzy_match
|
||||
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('source', v) and fuzzy_match('ip', v)]
|
||||
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('destination', v) and fuzzy_match('ip', v)]
|
||||
port_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('port', v)]
|
||||
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and '#' in v] # '#' might be scrubbed by normalize?
|
||||
|
||||
# 'Flow #' normalization: 'flow' matches. '#' is non-alphanumeric.
|
||||
# normalize('Flow #') -> 'flow'.
|
||||
# So checking '#' directly on raw string or normalized is tricky.
|
||||
# Let's check 'flow' and 'no'/'num' or just rely on 'flow' if it's the identifier.
|
||||
# But 'Source Flow' might match 'flow'.
|
||||
# Let's check raw value for '#' or just assume 'flow' match is good enough if filtered?
|
||||
# Revert: keep '#' check on raw value (v)?
|
||||
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and ('#' in v or 'num' in v.lower() or 'id' in v.lower())]
|
||||
|
||||
if not src_ip_indices or not dst_ip_indices or not port_indices:
|
||||
print(f"Skipping {sname}: Missing essential IP/Port columns.")
|
||||
@@ -199,7 +240,7 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
|
||||
for r in range(header_row_idx + 1, sheet.max_row + 1):
|
||||
if is_row_hidden(sheet, r):
|
||||
continue
|
||||
|
||||
|
||||
# Helper
|
||||
def get_val(idx):
|
||||
v = sheet.cell(row=r, column=idx).value
|
||||
|
||||
@@ -18,16 +18,34 @@ def parse_ports(port_str: str) -> List[int]:
|
||||
|
||||
s = str(port_str).lower()
|
||||
|
||||
# Remove 'udp' if present to focus on port numbers,
|
||||
# but arguably we might want to capture protocol.
|
||||
# The Ruby script removed it. We'll strip it for port extraction.
|
||||
# Remove 'udp' if present
|
||||
s = re.sub(r'udp', '', s)
|
||||
|
||||
# Common ports for 'any' matching
|
||||
# User requested: "10 most commonly used ports"
|
||||
# Selected: 20/21 (FTP), 22 (SSH), 23 (Telnet), 25 (SMTP), 53 (DNS), 80 (HTTP), 110 (POP3), 443 (HTTPS), 3389 (RDP)
|
||||
COMMON_PORTS = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
|
||||
|
||||
# Service Name Map
|
||||
SERVICE_MAP = {
|
||||
'ftp': [21],
|
||||
'ssh': [22],
|
||||
'telnet': [23],
|
||||
'smtp': [25],
|
||||
'dns': [53],
|
||||
'http': [80],
|
||||
'pop3': [110],
|
||||
'https': [443],
|
||||
'rdp': [3389],
|
||||
'ldap': [389],
|
||||
'ldaps': [636]
|
||||
}
|
||||
|
||||
ports = set()
|
||||
|
||||
# Handle 'any' or 'all' - defaulting to common ports as per Ruby script
|
||||
# Handle 'any' or 'all'
|
||||
if 'any' in s or 'all' in s:
|
||||
return [22, 3389, 80, 443, 3306, 5432, 8443, 60000]
|
||||
return sorted(COMMON_PORTS)
|
||||
|
||||
# Split by common delimiters
|
||||
parts = re.split(r'[,\n\s]+', s)
|
||||
@@ -37,8 +55,12 @@ def parse_ports(port_str: str) -> List[int]:
|
||||
if not part:
|
||||
continue
|
||||
|
||||
# Check service map
|
||||
if part in SERVICE_MAP:
|
||||
ports.update(SERVICE_MAP[part])
|
||||
continue
|
||||
|
||||
# Range handling: 8000-8010
|
||||
# The ruby script had issues with ranges, let's do it right.
|
||||
range_match = re.match(r'^(\d+)[-](\d+)$', part)
|
||||
if range_match:
|
||||
start, end = map(int, range_match.groups())
|
||||
|
||||
Reference in New Issue
Block a user