Compare commits
4 Commits
v2026.02.0
...
v2026.02.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e28004d6c | |||
| 8b3584fa9e | |||
| a202e267f7 | |||
| 2ccf6c293a |
31
test_inventory_keys.py
Normal file
31
test_inventory_keys.py
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
import unittest
|
||||||
|
from wif2ansible.models import Server, Flow
|
||||||
|
from wif2ansible.inventory import generate_inventory
|
||||||
|
|
||||||
|
class TestInventoryKeys(unittest.TestCase):
|
||||||
|
def test_inventory_keys_are_hostnames(self):
|
||||||
|
# Create a server with Ref, Hostname, IP
|
||||||
|
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows")
|
||||||
|
|
||||||
|
# Create a flow matching this server
|
||||||
|
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
|
||||||
|
|
||||||
|
servers = {"SERVER_REF_01": s1}
|
||||||
|
flows = [f1]
|
||||||
|
|
||||||
|
inventory = generate_inventory(servers, flows)
|
||||||
|
|
||||||
|
# Verify stricture
|
||||||
|
hosts = inventory['all']['hosts']
|
||||||
|
|
||||||
|
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback)
|
||||||
|
self.assertIn("SERVER_REF_01", hosts)
|
||||||
|
self.assertNotIn("192.168.1.10", hosts)
|
||||||
|
|
||||||
|
# Check variables
|
||||||
|
host_vars = hosts["SERVER_REF_01"]
|
||||||
|
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
||||||
|
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
@@ -2,7 +2,7 @@ import openpyxl
|
|||||||
from openpyxl.worksheet.worksheet import Worksheet
|
from openpyxl.worksheet.worksheet import Worksheet
|
||||||
from typing import List, Dict, Tuple, Optional
|
from typing import List, Dict, Tuple, Optional
|
||||||
from .models import Server, Flow
|
from .models import Server, Flow
|
||||||
from .parsers import parse_ports, parse_ip, clean_header
|
from .parsers import parse_ports, parse_ip, clean_header, clean_reference
|
||||||
|
|
||||||
from openpyxl.utils import get_column_letter
|
from openpyxl.utils import get_column_letter
|
||||||
|
|
||||||
@@ -77,9 +77,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
|||||||
print("Warning: No 'Servers' sheet found.")
|
print("Warning: No 'Servers' sheet found.")
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
# keywords: reference, platform, ip address, management ip?
|
# keywords: reference, platform, ip address, management ip, production ip
|
||||||
# Ruby script looked for: reference, type, alias, platform, middleware
|
header_keywords = ['reference', 'platform', 'ip address', 'production ip']
|
||||||
header_keywords = ['reference', 'platform', 'ip address']
|
|
||||||
|
|
||||||
header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
|
header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
|
||||||
|
|
||||||
@@ -98,7 +97,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
|||||||
# Extract data
|
# Extract data
|
||||||
ref_idx = col_map.get('reference')
|
ref_idx = col_map.get('reference')
|
||||||
plat_idx = col_map.get('platform')
|
plat_idx = col_map.get('platform')
|
||||||
ip_idx = col_map.get('ip address') # Generic IP
|
ip_idx = col_map.get('ip address') # Generic/Management IP
|
||||||
|
prod_ip_idx = col_map.get('production ip') # Specific Production IP
|
||||||
|
|
||||||
# Helper to get value
|
# Helper to get value
|
||||||
def get_val(idx):
|
def get_val(idx):
|
||||||
@@ -111,19 +111,29 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
plat = get_val(plat_idx) or 'unknown'
|
plat = get_val(plat_idx) or 'unknown'
|
||||||
ip_raw = get_val(ip_idx)
|
|
||||||
|
|
||||||
|
# Parse Management IP
|
||||||
|
ip_raw = get_val(ip_idx)
|
||||||
ip_addr = None
|
ip_addr = None
|
||||||
if ip_raw:
|
if ip_raw:
|
||||||
ips = parse_ip(ip_raw)
|
ips = parse_ip(ip_raw)
|
||||||
if ips:
|
if ips:
|
||||||
ip_addr = ips[0] # Take first valid IP
|
ip_addr = ips[0]
|
||||||
|
|
||||||
|
# Parse Production IP
|
||||||
|
prod_ip_raw = get_val(prod_ip_idx)
|
||||||
|
prod_ip_addr = None
|
||||||
|
if prod_ip_raw:
|
||||||
|
ips = parse_ip(prod_ip_raw)
|
||||||
|
if ips:
|
||||||
|
prod_ip_addr = ips[0]
|
||||||
|
|
||||||
s = Server(
|
s = Server(
|
||||||
reference=ref,
|
reference=ref,
|
||||||
hostname=ref, # Default hostname to reference
|
hostname=clean_reference(ref),
|
||||||
platform=plat,
|
platform=plat,
|
||||||
ip_address=ip_addr
|
ip_address=ip_addr,
|
||||||
|
production_ip=prod_ip_addr
|
||||||
)
|
)
|
||||||
servers[ref] = s
|
servers[ref] = s
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
from typing import List, Dict, Any
|
from typing import List, Dict, Any
|
||||||
from .models import Server, Flow
|
from .models import Server, Flow
|
||||||
|
from .network import to_mgt_ip
|
||||||
|
|
||||||
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -19,9 +20,13 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
for s in servers.values():
|
for s in servers.values():
|
||||||
if s.ip_address:
|
if s.ip_address:
|
||||||
ip_to_server[s.ip_address] = s
|
ip_to_server[s.ip_address] = s
|
||||||
# also index by hostname/reference potentially?
|
if s.production_ip:
|
||||||
# ip_to_server[s.reference] = s
|
ip_to_server[s.production_ip] = s
|
||||||
# But flows ususally have IPs.
|
# Also index by reference/hostname for DNS matches
|
||||||
|
if s.reference:
|
||||||
|
ip_to_server[s.reference.lower()] = s
|
||||||
|
if s.hostname:
|
||||||
|
ip_to_server[s.hostname.lower()] = s
|
||||||
|
|
||||||
inventory_hosts = {}
|
inventory_hosts = {}
|
||||||
|
|
||||||
@@ -34,18 +39,30 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
server = ip_to_server.get(flow.source_ip)
|
server = ip_to_server.get(flow.source_ip)
|
||||||
|
|
||||||
if not server:
|
if not server:
|
||||||
# Try finding by looking if source matches any server's reference/hostname?
|
# Try DNS resolution (Public IP -> Management FQDN)
|
||||||
# Unlikely for IPs.
|
mgt_dns = to_mgt_ip(flow.source_ip)
|
||||||
|
if mgt_dns:
|
||||||
|
# mgt_dns might be "server.ds.gc.ca".
|
||||||
|
# Our keys might be "server" or "server.ds.gc.ca" or IPs
|
||||||
|
# Try exact match
|
||||||
|
server = ip_to_server.get(mgt_dns.lower())
|
||||||
|
|
||||||
|
# If not found, try shortname?
|
||||||
|
if not server:
|
||||||
|
short = mgt_dns.split('.')[0]
|
||||||
|
server = ip_to_server.get(short.lower())
|
||||||
|
|
||||||
|
if not server:
|
||||||
drop_count += 1
|
drop_count += 1
|
||||||
if drop_count <= 5: # Debug spam limit
|
if drop_count <= 5: # Debug spam limit
|
||||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} not found in Servers tab.")
|
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
match_count += 1
|
match_count += 1
|
||||||
|
|
||||||
# Prepare host entry if new
|
# Prepare host entry if new
|
||||||
# We use the IP as the key in inventory 'hosts'
|
# We use the Reference/Hostname as the key in inventory 'hosts'
|
||||||
host_key = server.ip_address
|
host_key = server.reference or server.hostname or server.ip_address
|
||||||
|
|
||||||
if host_key not in inventory_hosts:
|
if host_key not in inventory_hosts:
|
||||||
host_vars = server.get_ansible_vars()
|
host_vars = server.get_ansible_vars()
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ class Server:
|
|||||||
reference: str
|
reference: str
|
||||||
hostname: str # This might be same as reference
|
hostname: str # This might be same as reference
|
||||||
ip_address: Optional[str] = None
|
ip_address: Optional[str] = None
|
||||||
|
production_ip: Optional[str] = None
|
||||||
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
||||||
|
|
||||||
def get_ansible_vars(self) -> Dict[str, Any]:
|
def get_ansible_vars(self) -> Dict[str, Any]:
|
||||||
@@ -22,6 +23,9 @@ class Server:
|
|||||||
# Default ssh is usually fine, but being explicit doesn't hurt
|
# Default ssh is usually fine, but being explicit doesn't hurt
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
if self.ip_address:
|
||||||
|
vars['ansible_host'] = self.ip_address
|
||||||
|
|
||||||
return vars
|
return vars
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|||||||
50
wif2ansible/network.py
Normal file
50
wif2ansible/network.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
import socket
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
def get_hostname(ip: str) -> Optional[str]:
|
||||||
|
try:
|
||||||
|
# Python's equivalent to Resolv.getname(ip)
|
||||||
|
# returns (hostname, aliaslist, ipaddrlist)
|
||||||
|
return socket.gethostbyaddr(ip)[0]
|
||||||
|
except socket.error:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_ip(hostname: str) -> Optional[str]:
|
||||||
|
try:
|
||||||
|
return socket.gethostbyname(hostname)
|
||||||
|
except socket.error:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Mimics the Ruby script's to_mgt_ip logic:
|
||||||
|
1. Reverse lookup IP to get FQDN.
|
||||||
|
2. Construct management FQDN ({host}.ds.gc.ca or .pre-ds.gc.ca).
|
||||||
|
3. Resolve that management FQDN to an IP.
|
||||||
|
4. Return the Management FQDN if successful.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# In Ruby script, input 'name' is often an IP address from the WIF source column.
|
||||||
|
|
||||||
|
# Step 1: Reverse Lookup
|
||||||
|
fqdn = get_hostname(name_or_ip)
|
||||||
|
if not fqdn:
|
||||||
|
# If input is already a name, use it? Ruby script assumes it gets a name from Resolv.getname(ip)
|
||||||
|
# If name_or_ip is NOT an IP, gethostbyaddr might fail or behave differently.
|
||||||
|
# But if it's already a name, we can try using it.
|
||||||
|
fqdn = name_or_ip
|
||||||
|
|
||||||
|
short_name = fqdn.split('.')[0]
|
||||||
|
|
||||||
|
# Step 2 & 3: Try suffixes
|
||||||
|
suffixes = ['.ds.gc.ca', '.pre-ds.gc.ca']
|
||||||
|
|
||||||
|
for suffix in suffixes:
|
||||||
|
mgt_dns = short_name + suffix
|
||||||
|
resolved_ip = get_ip(mgt_dns)
|
||||||
|
if resolved_ip:
|
||||||
|
# Ruby: return mgt_dns if mgt_ip.to_s.length > 4
|
||||||
|
return mgt_dns
|
||||||
|
|
||||||
|
# print(f"Warning: {name_or_ip} could not be resolved to a management address.")
|
||||||
|
return None
|
||||||
@@ -43,18 +43,12 @@ def parse_ports(port_str: str) -> List[int]:
|
|||||||
if range_match:
|
if range_match:
|
||||||
start, end = map(int, range_match.groups())
|
start, end = map(int, range_match.groups())
|
||||||
if start <= end:
|
if start <= end:
|
||||||
# Limitation: adding huge ranges might blow up inventory size
|
# User Request: "only add the first, last, and middle port"
|
||||||
# but for Ansible 'ports' list it's better to be explicit or use range syntax.
|
ports.add(start)
|
||||||
# For now, let's keep it expanded if small, or maybe just keeps the start/end?
|
ports.add(end)
|
||||||
# Ruby script logic: expanded it.
|
if end - start > 1:
|
||||||
# We'll limit expansion to avoid DOSing ourselves.
|
middle = start + (end - start) // 2
|
||||||
if end - start < 1000:
|
ports.add(middle)
|
||||||
ports.update(range(start, end + 1))
|
|
||||||
else:
|
|
||||||
# Fallback: just add start and end to avoid massive lists?
|
|
||||||
# Or maybe ansible allows ranges?
|
|
||||||
# Usually we list ports. Let's expand for now.
|
|
||||||
ports.update(range(start, end + 1))
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Single port
|
# Single port
|
||||||
@@ -63,6 +57,21 @@ def parse_ports(port_str: str) -> List[int]:
|
|||||||
|
|
||||||
return sorted(list(ports))
|
return sorted(list(ports))
|
||||||
|
|
||||||
|
def clean_reference(ref: str) -> str:
|
||||||
|
"""
|
||||||
|
Cleans a server reference string.
|
||||||
|
Specifically removes 'SRV###' type prefixes if present.
|
||||||
|
Example: 'SRV123 MyServer' -> 'MyServer'
|
||||||
|
"""
|
||||||
|
if not ref:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
s = str(ref)
|
||||||
|
# Remove SRV followed by digits and whitespace
|
||||||
|
s = re.sub(r'SRV\d+\s*', '', s, flags=re.IGNORECASE)
|
||||||
|
# Remove leading/trailing whitespace
|
||||||
|
return s.strip()
|
||||||
|
|
||||||
def parse_ip(ip_str: str) -> List[str]:
|
def parse_ip(ip_str: str) -> List[str]:
|
||||||
"""Finds all IPv4 addresses in a string."""
|
"""Finds all IPv4 addresses in a string."""
|
||||||
if not ip_str:
|
if not ip_str:
|
||||||
|
|||||||
Reference in New Issue
Block a user