1 Commits

Author SHA1 Message Date
2ccf6c293a Implement to_mgt_ip DNS logic and update port range parsing
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
2026-02-06 15:40:13 -05:00
3 changed files with 80 additions and 21 deletions

View File

@@ -1,5 +1,6 @@
from typing import List, Dict, Any
from .models import Server, Flow
from .network import to_mgt_ip
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
"""
@@ -19,9 +20,11 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
for s in servers.values():
if s.ip_address:
ip_to_server[s.ip_address] = s
# also index by hostname/reference potentially?
# ip_to_server[s.reference] = s
# But flows ususally have IPs.
# Also index by reference/hostname for DNS matches
if s.reference:
ip_to_server[s.reference.lower()] = s
if s.hostname:
ip_to_server[s.hostname.lower()] = s
inventory_hosts = {}
@@ -34,12 +37,24 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
server = ip_to_server.get(flow.source_ip)
if not server:
# Try finding by looking if source matches any server's reference/hostname?
# Unlikely for IPs.
drop_count += 1
if drop_count <= 5: # Debug spam limit
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} not found in Servers tab.")
continue
# Try DNS resolution (Public IP -> Management FQDN)
mgt_dns = to_mgt_ip(flow.source_ip)
if mgt_dns:
# mgt_dns might be "server.ds.gc.ca".
# Our keys might be "server" or "server.ds.gc.ca" or IPs
# Try exact match
server = ip_to_server.get(mgt_dns.lower())
# If not found, try shortname?
if not server:
short = mgt_dns.split('.')[0]
server = ip_to_server.get(short.lower())
if not server:
drop_count += 1
if drop_count <= 5: # Debug spam limit
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.")
continue
match_count += 1

50
wif2ansible/network.py Normal file
View File

@@ -0,0 +1,50 @@
import socket
from typing import Optional
def get_hostname(ip: str) -> Optional[str]:
try:
# Python's equivalent to Resolv.getname(ip)
# returns (hostname, aliaslist, ipaddrlist)
return socket.gethostbyaddr(ip)[0]
except socket.error:
return None
def get_ip(hostname: str) -> Optional[str]:
try:
return socket.gethostbyname(hostname)
except socket.error:
return None
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
"""
Mimics the Ruby script's to_mgt_ip logic:
1. Reverse lookup IP to get FQDN.
2. Construct management FQDN ({host}.ds.gc.ca or .pre-ds.gc.ca).
3. Resolve that management FQDN to an IP.
4. Return the Management FQDN if successful.
"""
# In Ruby script, input 'name' is often an IP address from the WIF source column.
# Step 1: Reverse Lookup
fqdn = get_hostname(name_or_ip)
if not fqdn:
# If input is already a name, use it? Ruby script assumes it gets a name from Resolv.getname(ip)
# If name_or_ip is NOT an IP, gethostbyaddr might fail or behave differently.
# But if it's already a name, we can try using it.
fqdn = name_or_ip
short_name = fqdn.split('.')[0]
# Step 2 & 3: Try suffixes
suffixes = ['.ds.gc.ca', '.pre-ds.gc.ca']
for suffix in suffixes:
mgt_dns = short_name + suffix
resolved_ip = get_ip(mgt_dns)
if resolved_ip:
# Ruby: return mgt_dns if mgt_ip.to_s.length > 4
return mgt_dns
# print(f"Warning: {name_or_ip} could not be resolved to a management address.")
return None

View File

@@ -43,18 +43,12 @@ def parse_ports(port_str: str) -> List[int]:
if range_match:
start, end = map(int, range_match.groups())
if start <= end:
# Limitation: adding huge ranges might blow up inventory size
# but for Ansible 'ports' list it's better to be explicit or use range syntax.
# For now, let's keep it expanded if small, or maybe just keeps the start/end?
# Ruby script logic: expanded it.
# We'll limit expansion to avoid DOSing ourselves.
if end - start < 1000:
ports.update(range(start, end + 1))
else:
# Fallback: just add start and end to avoid massive lists?
# Or maybe ansible allows ranges?
# Usually we list ports. Let's expand for now.
ports.update(range(start, end + 1))
# User Request: "only add the first, last, and middle port"
ports.add(start)
ports.add(end)
if end - start > 1:
middle = start + (end - start) // 2
ports.add(middle)
continue
# Single port