Implement to_mgt_ip DNS logic and update port range parsing
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
All checks were successful
Build and Release / Build Windows Exe (push) Successful in 10s
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from typing import List, Dict, Any
|
||||
from .models import Server, Flow
|
||||
from .network import to_mgt_ip
|
||||
|
||||
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -19,9 +20,11 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
for s in servers.values():
|
||||
if s.ip_address:
|
||||
ip_to_server[s.ip_address] = s
|
||||
# also index by hostname/reference potentially?
|
||||
# ip_to_server[s.reference] = s
|
||||
# But flows ususally have IPs.
|
||||
# Also index by reference/hostname for DNS matches
|
||||
if s.reference:
|
||||
ip_to_server[s.reference.lower()] = s
|
||||
if s.hostname:
|
||||
ip_to_server[s.hostname.lower()] = s
|
||||
|
||||
inventory_hosts = {}
|
||||
|
||||
@@ -34,12 +37,24 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
server = ip_to_server.get(flow.source_ip)
|
||||
|
||||
if not server:
|
||||
# Try finding by looking if source matches any server's reference/hostname?
|
||||
# Unlikely for IPs.
|
||||
drop_count += 1
|
||||
if drop_count <= 5: # Debug spam limit
|
||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} not found in Servers tab.")
|
||||
continue
|
||||
# Try DNS resolution (Public IP -> Management FQDN)
|
||||
mgt_dns = to_mgt_ip(flow.source_ip)
|
||||
if mgt_dns:
|
||||
# mgt_dns might be "server.ds.gc.ca".
|
||||
# Our keys might be "server" or "server.ds.gc.ca" or IPs
|
||||
# Try exact match
|
||||
server = ip_to_server.get(mgt_dns.lower())
|
||||
|
||||
# If not found, try shortname?
|
||||
if not server:
|
||||
short = mgt_dns.split('.')[0]
|
||||
server = ip_to_server.get(short.lower())
|
||||
|
||||
if not server:
|
||||
drop_count += 1
|
||||
if drop_count <= 5: # Debug spam limit
|
||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.")
|
||||
continue
|
||||
|
||||
match_count += 1
|
||||
|
||||
|
||||
50
wif2ansible/network.py
Normal file
50
wif2ansible/network.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import socket
|
||||
from typing import Optional
|
||||
|
||||
def get_hostname(ip: str) -> Optional[str]:
|
||||
try:
|
||||
# Python's equivalent to Resolv.getname(ip)
|
||||
# returns (hostname, aliaslist, ipaddrlist)
|
||||
return socket.gethostbyaddr(ip)[0]
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
def get_ip(hostname: str) -> Optional[str]:
|
||||
try:
|
||||
return socket.gethostbyname(hostname)
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
||||
"""
|
||||
Mimics the Ruby script's to_mgt_ip logic:
|
||||
1. Reverse lookup IP to get FQDN.
|
||||
2. Construct management FQDN ({host}.ds.gc.ca or .pre-ds.gc.ca).
|
||||
3. Resolve that management FQDN to an IP.
|
||||
4. Return the Management FQDN if successful.
|
||||
"""
|
||||
|
||||
# In Ruby script, input 'name' is often an IP address from the WIF source column.
|
||||
|
||||
# Step 1: Reverse Lookup
|
||||
fqdn = get_hostname(name_or_ip)
|
||||
if not fqdn:
|
||||
# If input is already a name, use it? Ruby script assumes it gets a name from Resolv.getname(ip)
|
||||
# If name_or_ip is NOT an IP, gethostbyaddr might fail or behave differently.
|
||||
# But if it's already a name, we can try using it.
|
||||
fqdn = name_or_ip
|
||||
|
||||
short_name = fqdn.split('.')[0]
|
||||
|
||||
# Step 2 & 3: Try suffixes
|
||||
suffixes = ['.ds.gc.ca', '.pre-ds.gc.ca']
|
||||
|
||||
for suffix in suffixes:
|
||||
mgt_dns = short_name + suffix
|
||||
resolved_ip = get_ip(mgt_dns)
|
||||
if resolved_ip:
|
||||
# Ruby: return mgt_dns if mgt_ip.to_s.length > 4
|
||||
return mgt_dns
|
||||
|
||||
# print(f"Warning: {name_or_ip} could not be resolved to a management address.")
|
||||
return None
|
||||
@@ -43,18 +43,12 @@ def parse_ports(port_str: str) -> List[int]:
|
||||
if range_match:
|
||||
start, end = map(int, range_match.groups())
|
||||
if start <= end:
|
||||
# Limitation: adding huge ranges might blow up inventory size
|
||||
# but for Ansible 'ports' list it's better to be explicit or use range syntax.
|
||||
# For now, let's keep it expanded if small, or maybe just keeps the start/end?
|
||||
# Ruby script logic: expanded it.
|
||||
# We'll limit expansion to avoid DOSing ourselves.
|
||||
if end - start < 1000:
|
||||
ports.update(range(start, end + 1))
|
||||
else:
|
||||
# Fallback: just add start and end to avoid massive lists?
|
||||
# Or maybe ansible allows ranges?
|
||||
# Usually we list ports. Let's expand for now.
|
||||
ports.update(range(start, end + 1))
|
||||
# User Request: "only add the first, last, and middle port"
|
||||
ports.add(start)
|
||||
ports.add(end)
|
||||
if end - start > 1:
|
||||
middle = start + (end - start) // 2
|
||||
ports.add(middle)
|
||||
continue
|
||||
|
||||
# Single port
|
||||
|
||||
Reference in New Issue
Block a user