Compare commits
10 Commits
v2026.02.0
...
v2026.02.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e28004d6c | |||
| 8b3584fa9e | |||
| a202e267f7 | |||
| 2ccf6c293a | |||
| b6266bea81 | |||
| 284e6b1fbf | |||
| b3c5f3a6fd | |||
| 2634c87dcd | |||
| f28af4de7a | |||
| 7e01ec7073 |
@@ -9,14 +9,16 @@ jobs:
|
||||
build:
|
||||
name: Build Windows Exe
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Build with PyInstaller (Wine)
|
||||
- name: Build with PyInstaller
|
||||
uses: docker://cdrx/pyinstaller-windows:python3
|
||||
with:
|
||||
args: "python -m pip install -r requirements.txt && pyinstaller --onefile --clean --name wif2ansible run.py"
|
||||
entrypoint: /bin/sh
|
||||
args: -c "python -m pip install -r requirements.txt && pyinstaller --onefile --clean --name wif2ansible run.py"
|
||||
|
||||
- name: Generate Version Tag
|
||||
id: version
|
||||
@@ -27,7 +29,7 @@ jobs:
|
||||
with:
|
||||
tag_name: ${{ steps.version.outputs.TAG }}
|
||||
name: Release ${{ steps.version.outputs.TAG }}
|
||||
files: dist/windows/wif2ansible.exe
|
||||
files: dist/**/*.exe
|
||||
draft: false
|
||||
prerelease: false
|
||||
env:
|
||||
|
||||
31
test_inventory_keys.py
Normal file
31
test_inventory_keys.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import unittest
|
||||
from wif2ansible.models import Server, Flow
|
||||
from wif2ansible.inventory import generate_inventory
|
||||
|
||||
class TestInventoryKeys(unittest.TestCase):
|
||||
def test_inventory_keys_are_hostnames(self):
|
||||
# Create a server with Ref, Hostname, IP
|
||||
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows")
|
||||
|
||||
# Create a flow matching this server
|
||||
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
|
||||
|
||||
servers = {"SERVER_REF_01": s1}
|
||||
flows = [f1]
|
||||
|
||||
inventory = generate_inventory(servers, flows)
|
||||
|
||||
# Verify stricture
|
||||
hosts = inventory['all']['hosts']
|
||||
|
||||
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback)
|
||||
self.assertIn("SERVER_REF_01", hosts)
|
||||
self.assertNotIn("192.168.1.10", hosts)
|
||||
|
||||
# Check variables
|
||||
host_vars = hosts["SERVER_REF_01"]
|
||||
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
||||
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -2,7 +2,7 @@ import openpyxl
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
from .models import Server, Flow
|
||||
from .parsers import parse_ports, parse_ip, clean_header
|
||||
from .parsers import parse_ports, parse_ip, clean_header, clean_reference
|
||||
|
||||
from openpyxl.utils import get_column_letter
|
||||
|
||||
@@ -77,9 +77,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
print("Warning: No 'Servers' sheet found.")
|
||||
return {}
|
||||
|
||||
# keywords: reference, platform, ip address, management ip?
|
||||
# Ruby script looked for: reference, type, alias, platform, middleware
|
||||
header_keywords = ['reference', 'platform', 'ip address']
|
||||
# keywords: reference, platform, ip address, management ip, production ip
|
||||
header_keywords = ['reference', 'platform', 'ip address', 'production ip']
|
||||
|
||||
header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
|
||||
|
||||
@@ -98,7 +97,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
# Extract data
|
||||
ref_idx = col_map.get('reference')
|
||||
plat_idx = col_map.get('platform')
|
||||
ip_idx = col_map.get('ip address') # Generic IP
|
||||
ip_idx = col_map.get('ip address') # Generic/Management IP
|
||||
prod_ip_idx = col_map.get('production ip') # Specific Production IP
|
||||
|
||||
# Helper to get value
|
||||
def get_val(idx):
|
||||
@@ -111,19 +111,29 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
continue
|
||||
|
||||
plat = get_val(plat_idx) or 'unknown'
|
||||
ip_raw = get_val(ip_idx)
|
||||
|
||||
# Parse Management IP
|
||||
ip_raw = get_val(ip_idx)
|
||||
ip_addr = None
|
||||
if ip_raw:
|
||||
ips = parse_ip(ip_raw)
|
||||
if ips:
|
||||
ip_addr = ips[0] # Take first valid IP
|
||||
ip_addr = ips[0]
|
||||
|
||||
# Parse Production IP
|
||||
prod_ip_raw = get_val(prod_ip_idx)
|
||||
prod_ip_addr = None
|
||||
if prod_ip_raw:
|
||||
ips = parse_ip(prod_ip_raw)
|
||||
if ips:
|
||||
prod_ip_addr = ips[0]
|
||||
|
||||
s = Server(
|
||||
reference=ref,
|
||||
hostname=ref, # Default hostname to reference
|
||||
hostname=clean_reference(ref),
|
||||
platform=plat,
|
||||
ip_address=ip_addr
|
||||
ip_address=ip_addr,
|
||||
production_ip=prod_ip_addr
|
||||
)
|
||||
servers[ref] = s
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from typing import List, Dict, Any
|
||||
from .models import Server, Flow
|
||||
from .network import to_mgt_ip
|
||||
|
||||
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -19,9 +20,13 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
for s in servers.values():
|
||||
if s.ip_address:
|
||||
ip_to_server[s.ip_address] = s
|
||||
# also index by hostname/reference potentially?
|
||||
# ip_to_server[s.reference] = s
|
||||
# But flows ususally have IPs.
|
||||
if s.production_ip:
|
||||
ip_to_server[s.production_ip] = s
|
||||
# Also index by reference/hostname for DNS matches
|
||||
if s.reference:
|
||||
ip_to_server[s.reference.lower()] = s
|
||||
if s.hostname:
|
||||
ip_to_server[s.hostname.lower()] = s
|
||||
|
||||
inventory_hosts = {}
|
||||
|
||||
@@ -34,18 +39,30 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
server = ip_to_server.get(flow.source_ip)
|
||||
|
||||
if not server:
|
||||
# Try finding by looking if source matches any server's reference/hostname?
|
||||
# Unlikely for IPs.
|
||||
# Try DNS resolution (Public IP -> Management FQDN)
|
||||
mgt_dns = to_mgt_ip(flow.source_ip)
|
||||
if mgt_dns:
|
||||
# mgt_dns might be "server.ds.gc.ca".
|
||||
# Our keys might be "server" or "server.ds.gc.ca" or IPs
|
||||
# Try exact match
|
||||
server = ip_to_server.get(mgt_dns.lower())
|
||||
|
||||
# If not found, try shortname?
|
||||
if not server:
|
||||
short = mgt_dns.split('.')[0]
|
||||
server = ip_to_server.get(short.lower())
|
||||
|
||||
if not server:
|
||||
drop_count += 1
|
||||
if drop_count <= 5: # Debug spam limit
|
||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} not found in Servers tab.")
|
||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.")
|
||||
continue
|
||||
|
||||
match_count += 1
|
||||
|
||||
# Prepare host entry if new
|
||||
# We use the IP as the key in inventory 'hosts'
|
||||
host_key = server.ip_address
|
||||
# We use the Reference/Hostname as the key in inventory 'hosts'
|
||||
host_key = server.reference or server.hostname or server.ip_address
|
||||
|
||||
if host_key not in inventory_hosts:
|
||||
host_vars = server.get_ansible_vars()
|
||||
|
||||
@@ -6,6 +6,7 @@ class Server:
|
||||
reference: str
|
||||
hostname: str # This might be same as reference
|
||||
ip_address: Optional[str] = None
|
||||
production_ip: Optional[str] = None
|
||||
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
||||
|
||||
def get_ansible_vars(self) -> Dict[str, Any]:
|
||||
@@ -22,6 +23,9 @@ class Server:
|
||||
# Default ssh is usually fine, but being explicit doesn't hurt
|
||||
pass
|
||||
|
||||
if self.ip_address:
|
||||
vars['ansible_host'] = self.ip_address
|
||||
|
||||
return vars
|
||||
|
||||
@dataclass
|
||||
|
||||
50
wif2ansible/network.py
Normal file
50
wif2ansible/network.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import socket
|
||||
from typing import Optional
|
||||
|
||||
def get_hostname(ip: str) -> Optional[str]:
|
||||
try:
|
||||
# Python's equivalent to Resolv.getname(ip)
|
||||
# returns (hostname, aliaslist, ipaddrlist)
|
||||
return socket.gethostbyaddr(ip)[0]
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
def get_ip(hostname: str) -> Optional[str]:
|
||||
try:
|
||||
return socket.gethostbyname(hostname)
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
||||
"""
|
||||
Mimics the Ruby script's to_mgt_ip logic:
|
||||
1. Reverse lookup IP to get FQDN.
|
||||
2. Construct management FQDN ({host}.ds.gc.ca or .pre-ds.gc.ca).
|
||||
3. Resolve that management FQDN to an IP.
|
||||
4. Return the Management FQDN if successful.
|
||||
"""
|
||||
|
||||
# In Ruby script, input 'name' is often an IP address from the WIF source column.
|
||||
|
||||
# Step 1: Reverse Lookup
|
||||
fqdn = get_hostname(name_or_ip)
|
||||
if not fqdn:
|
||||
# If input is already a name, use it? Ruby script assumes it gets a name from Resolv.getname(ip)
|
||||
# If name_or_ip is NOT an IP, gethostbyaddr might fail or behave differently.
|
||||
# But if it's already a name, we can try using it.
|
||||
fqdn = name_or_ip
|
||||
|
||||
short_name = fqdn.split('.')[0]
|
||||
|
||||
# Step 2 & 3: Try suffixes
|
||||
suffixes = ['.ds.gc.ca', '.pre-ds.gc.ca']
|
||||
|
||||
for suffix in suffixes:
|
||||
mgt_dns = short_name + suffix
|
||||
resolved_ip = get_ip(mgt_dns)
|
||||
if resolved_ip:
|
||||
# Ruby: return mgt_dns if mgt_ip.to_s.length > 4
|
||||
return mgt_dns
|
||||
|
||||
# print(f"Warning: {name_or_ip} could not be resolved to a management address.")
|
||||
return None
|
||||
@@ -43,18 +43,12 @@ def parse_ports(port_str: str) -> List[int]:
|
||||
if range_match:
|
||||
start, end = map(int, range_match.groups())
|
||||
if start <= end:
|
||||
# Limitation: adding huge ranges might blow up inventory size
|
||||
# but for Ansible 'ports' list it's better to be explicit or use range syntax.
|
||||
# For now, let's keep it expanded if small, or maybe just keeps the start/end?
|
||||
# Ruby script logic: expanded it.
|
||||
# We'll limit expansion to avoid DOSing ourselves.
|
||||
if end - start < 1000:
|
||||
ports.update(range(start, end + 1))
|
||||
else:
|
||||
# Fallback: just add start and end to avoid massive lists?
|
||||
# Or maybe ansible allows ranges?
|
||||
# Usually we list ports. Let's expand for now.
|
||||
ports.update(range(start, end + 1))
|
||||
# User Request: "only add the first, last, and middle port"
|
||||
ports.add(start)
|
||||
ports.add(end)
|
||||
if end - start > 1:
|
||||
middle = start + (end - start) // 2
|
||||
ports.add(middle)
|
||||
continue
|
||||
|
||||
# Single port
|
||||
@@ -63,6 +57,21 @@ def parse_ports(port_str: str) -> List[int]:
|
||||
|
||||
return sorted(list(ports))
|
||||
|
||||
def clean_reference(ref: str) -> str:
|
||||
"""
|
||||
Cleans a server reference string.
|
||||
Specifically removes 'SRV###' type prefixes if present.
|
||||
Example: 'SRV123 MyServer' -> 'MyServer'
|
||||
"""
|
||||
if not ref:
|
||||
return ""
|
||||
|
||||
s = str(ref)
|
||||
# Remove SRV followed by digits and whitespace
|
||||
s = re.sub(r'SRV\d+\s*', '', s, flags=re.IGNORECASE)
|
||||
# Remove leading/trailing whitespace
|
||||
return s.strip()
|
||||
|
||||
def parse_ip(ip_str: str) -> List[str]:
|
||||
"""Finds all IPv4 addresses in a string."""
|
||||
if not ip_str:
|
||||
|
||||
Reference in New Issue
Block a user