Initial commit of wif2ansible

This commit is contained in:
2026-02-06 15:12:49 -05:00
commit aa299df41e
13 changed files with 1025 additions and 0 deletions
View File
+242
View File
@@ -0,0 +1,242 @@
import openpyxl
from openpyxl.worksheet.worksheet import Worksheet
from typing import List, Dict, Tuple, Optional
from .models import Server, Flow
from .parsers import parse_ports, parse_ip, clean_header
from openpyxl.utils import get_column_letter
def is_row_hidden(sheet: Worksheet, row_idx: int) -> bool:
dim = sheet.row_dimensions.get(row_idx)
return dim is not None and dim.hidden
def is_col_hidden(sheet: Worksheet, col_idx: int) -> bool:
letter = get_column_letter(col_idx)
dim = sheet.column_dimensions.get(letter)
return dim is not None and dim.hidden
def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]:
"""
Scans the first 20 rows to find the best matching header row.
Returns (row_index, column_mapping).
"""
best_row = None
best_map = {}
max_matches = 0
for r in range(1, 21):
if is_row_hidden(sheet, r):
continue
row_values = []
for c in range(1, sheet.max_column + 1):
if is_col_hidden(sheet, c):
row_values.append("") # Treat hidden column as empty
continue
val = sheet.cell(row=r, column=c).value
row_values.append(clean_header(val))
# Check matches
current_map = {}
for kw in keywords:
for idx, cell_val in enumerate(row_values):
# match if keyword is in cell value
if kw in cell_val:
# heuristic preference: prefer cells that are not too long?
# e.g. "Source IP" vs "This is a note about Source IP"
current_map[kw] = idx + 1
break
match_count = len(current_map)
if match_count > max_matches:
max_matches = match_count
best_row = r
best_map = current_map
# Threshold: Matches should be significant
if max_matches >= 2: # Found at least 2 keywords
return best_row, best_map
return None, {}
def read_servers(filename: str) -> Dict[str, Server]:
"""
Reads servers from the 'Servers' or similar tab.
Returns a dict keyed by IP or Hostname (preference to management IP).
"""
wb = openpyxl.load_workbook(filename, data_only=True)
# improved sheet finder
target_sheet = None
for sname in wb.sheetnames:
if 'server' in sname.lower():
target_sheet = wb[sname]
break
if not target_sheet:
print("Warning: No 'Servers' sheet found.")
return {}
# keywords: reference, platform, ip address, management ip?
# Ruby script looked for: reference, type, alias, platform, middleware
header_keywords = ['reference', 'platform', 'ip address']
header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
if not header_row_idx:
print("Error: Could not find Server table headers.")
return {}
servers = {} # Key: Reference (as primary key)
# Iterate rows
for r in range(header_row_idx + 1, target_sheet.max_row + 1):
if is_row_hidden(target_sheet, r):
print(f"Skipping hidden server row {r}")
continue
# Extract data
ref_idx = col_map.get('reference')
plat_idx = col_map.get('platform')
ip_idx = col_map.get('ip address') # Generic IP
# Helper to get value
def get_val(idx):
if not idx: return None
v = target_sheet.cell(row=r, column=idx).value
return str(v).strip() if v else None
ref = get_val(ref_idx)
if not ref or ref.lower() == 'example':
continue
plat = get_val(plat_idx) or 'unknown'
ip_raw = get_val(ip_idx)
ip_addr = None
if ip_raw:
ips = parse_ip(ip_raw)
if ips:
ip_addr = ips[0] # Take first valid IP
s = Server(
reference=ref,
hostname=ref, # Default hostname to reference
platform=plat,
ip_address=ip_addr
)
servers[ref] = s
# verify duplicate logic: The ruby script cached 'Server Reference' -> IP.
# We will key by reference.
return servers
def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> List[Flow]:
"""
Reads flows from flow tabs.
server_inventory: Optional, for validation if needed.
"""
wb = openpyxl.load_workbook(filename, data_only=True)
flows = []
# Find all sheets with 'flow' in name
flow_sheets = [s for s in wb.sheetnames if 'flow' in s.lower()]
for sname in flow_sheets:
sheet = wb[sname]
print(f"Processing sheet: {sname}")
# Keywords based on Ruby script: 'Source Public IP', 'Source Private IP', 'Destination Public IP', 'Port'
# Simplified: source, destination, port
# Simplified: source, destination, port, ip
keywords = ['source', 'destination', 'port', 'ip']
header_row_idx, col_map = find_header_row(sheet, keywords)
if not header_row_idx:
print(f"Warning: Could not find headers in {sname}")
continue
# Refine map - we need specific source/dest columns (IPs)
# Re-scan header row to get specific columns
# Note: find_header_row returned the *first* match for 'source', which might be 'Source Ref'.
# We need strictly 'Source * IP' or similar.
# Let's do a more specific map manually based on the header row found
header_row_values = []
for c in range(1, sheet.max_column + 1):
if is_col_hidden(sheet, c):
header_row_values.append("")
continue
header_row_values.append(clean_header(sheet.cell(row=header_row_idx, column=c).value))
# Find indices
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'source' in v and 'ip' in v]
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'destination' in v and 'ip' in v]
port_indices = [i+1 for i, v in enumerate(header_row_values) if 'port' in v]
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if 'flow' in v and '#' in v] # "Flow #"
if not src_ip_indices or not dst_ip_indices or not port_indices:
print(f"Skipping {sname}: Missing essential IP/Port columns.")
continue
# Iterate rows
for r in range(header_row_idx + 1, sheet.max_row + 1):
if is_row_hidden(sheet, r):
continue
# Helper
def get_val(idx):
v = sheet.cell(row=r, column=idx).value
return str(v).strip() if v else None
# Flow #
fid = "unknown"
if flow_id_indices:
fid = get_val(flow_id_indices[0]) or "unknown"
# Get valid Source IPs from the row
# There might be "Source Public IP" AND "Source Private IP".
# Logic: Collect ALL valid IPs from source columns.
src_ips = []
for idx in src_ip_indices:
val = get_val(idx)
if val:
found = parse_ip(val)
src_ips.extend(found)
# Destination IPs
dst_ips = []
for idx in dst_ip_indices:
val = get_val(idx)
if val:
found = parse_ip(val)
dst_ips.extend(found)
# Ports
ports = []
for idx in port_indices:
val = get_val(idx)
if val:
p = parse_ports(val)
ports.extend(p)
# Cartesian Product: Source x Dest
# If any are missing, skip
if not src_ips or not dst_ips or not ports:
# Debug Info?
# print(f"Row {r}: Missing data. Src: {src_ips}, Dst: {dst_ips}, Ports: {ports}")
continue
for s_ip in src_ips:
for d_ip in dst_ips:
f = Flow(
flow_id=fid,
source_ip=s_ip,
destination_ip=d_ip,
ports=sorted(list(set(ports))) # dedup ports
)
flows.append(f)
return flows
+73
View File
@@ -0,0 +1,73 @@
from typing import List, Dict, Any
from .models import Server, Flow
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
"""
Generates the Ansible inventory dictionary.
servers: Dict[Reference, Server]
flows: List[Flow]
"""
# Build Lookup Map: IP -> Server
# Note: A server might have multiple IPs (e.g. Mgt, Public, Private).
# The 'Server' object mainly captures the Management IP or the one listed in the "IP Address" column.
# If the WIF has "Source Public IP" and that differs from "IP Address" in Servers tab,
# we might miss it if we only index the primary IP.
# However, strict filtering means we trust the 'Servers' tab.
ip_to_server = {}
for s in servers.values():
if s.ip_address:
ip_to_server[s.ip_address] = s
# also index by hostname/reference potentially?
# ip_to_server[s.reference] = s
# But flows ususally have IPs.
inventory_hosts = {}
# Process flows
match_count = 0
drop_count = 0
for flow in flows:
# Find source server
server = ip_to_server.get(flow.source_ip)
if not server:
# Try finding by looking if source matches any server's reference/hostname?
# Unlikely for IPs.
drop_count += 1
if drop_count <= 5: # Debug spam limit
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} not found in Servers tab.")
continue
match_count += 1
# Prepare host entry if new
# We use the IP as the key in inventory 'hosts'
host_key = server.ip_address
if host_key not in inventory_hosts:
host_vars = server.get_ansible_vars()
host_vars['flows'] = []
inventory_hosts[host_key] = host_vars
# Add flow
flow_entry = {
'flow_id': flow.flow_id,
'dest': flow.destination_ip,
'ports': flow.ports,
'protocol': flow.protocol
}
# Dedup check?
# Ideally we shouldn't have exact duplicates, but appending is safe.
inventory_hosts[host_key]['flows'].append(flow_entry)
print(f"Inventory Generation Report: Matches={match_count}, Dropped={drop_count}")
return {
'all': {
'hosts': inventory_hosts
}
}
+39
View File
@@ -0,0 +1,39 @@
import sys
import yaml
import argparse
from datetime import datetime
from .excel_reader import read_servers, read_flows
from .inventory import generate_inventory
def main():
parser = argparse.ArgumentParser(description="Convert WIF Excel to Ansible Inventory")
parser.add_argument("wif_file", help="Path to the WIF Excel file (.xlsx)")
parser.add_argument("--output", "-o", help="Output YAML file path", default=None)
args = parser.parse_args()
print(f"Reading servers from {args.wif_file}...")
servers = read_servers(args.wif_file)
print(f"Found {len(servers)} servers in allowlist.")
print(f"Reading flows...")
flows = read_flows(args.wif_file, servers)
print(f"Found {len(flows)} raw flows.")
print("Generating inventory...")
inventory = generate_inventory(servers, flows)
# Determine output filename
if args.output:
outfile = args.output
else:
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M")
outfile = f"inventory_{timestamp}.yml"
with open(outfile, 'w') as f:
yaml.dump(inventory, f, default_flow_style=False)
print(f"Successfully wrote inventory to {outfile}")
if __name__ == "__main__":
main()
+36
View File
@@ -0,0 +1,36 @@
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
@dataclass
class Server:
reference: str
hostname: str # This might be same as reference
ip_address: Optional[str] = None
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
def get_ansible_vars(self) -> Dict[str, Any]:
"""Returns ansible variables based on platform."""
vars = {}
# Basic mapping - can be expanded
p = self.platform.lower()
if 'win' in p:
vars['ansible_connection'] = 'winrm'
vars['ansible_winrm_transport'] = 'ntlm'
vars['ansible_winrm_port'] = 5985
vars['ansible_winrm_server_cert_validation'] = 'ignore' # Common default, maybe safer to omit
elif 'lin' in p or 'rhel' in p or 'ubuntu' in p:
# Default ssh is usually fine, but being explicit doesn't hurt
pass
return vars
@dataclass
class Flow:
flow_id: str
source_ip: str
destination_ip: str
ports: List[int]
protocol: str = 'tcp'
def __hash__(self):
return hash((self.flow_id, self.source_ip, self.destination_ip, tuple(sorted(self.ports)), self.protocol))
+74
View File
@@ -0,0 +1,74 @@
import re
from typing import List
def clean_header(header: str) -> str:
if not header:
return ""
# Remove HTML tags if any (from Ruby script logic)
header = re.sub(r'<[^>]+>', '', str(header))
return header.strip().lower()
def parse_ports(port_str: str) -> List[int]:
"""
Parses a string containing ports, ranges, or 'any'.
Returns a list of integer ports.
"""
if not port_str:
return []
s = str(port_str).lower()
# Remove 'udp' if present to focus on port numbers,
# but arguably we might want to capture protocol.
# The Ruby script removed it. We'll strip it for port extraction.
s = re.sub(r'udp', '', s)
ports = set()
# Handle 'any' or 'all' - defaulting to common ports as per Ruby script
if 'any' in s or 'all' in s:
return [22, 3389, 80, 443, 3306, 5432, 8443, 60000]
# Split by common delimiters
parts = re.split(r'[,\n\s]+', s)
for part in parts:
part = part.strip()
if not part:
continue
# Range handling: 8000-8010
# The ruby script had issues with ranges, let's do it right.
range_match = re.match(r'^(\d+)[-](\d+)$', part)
if range_match:
start, end = map(int, range_match.groups())
if start <= end:
# Limitation: adding huge ranges might blow up inventory size
# but for Ansible 'ports' list it's better to be explicit or use range syntax.
# For now, let's keep it expanded if small, or maybe just keeps the start/end?
# Ruby script logic: expanded it.
# We'll limit expansion to avoid DOSing ourselves.
if end - start < 1000:
ports.update(range(start, end + 1))
else:
# Fallback: just add start and end to avoid massive lists?
# Or maybe ansible allows ranges?
# Usually we list ports. Let's expand for now.
ports.update(range(start, end + 1))
continue
# Single port
if part.isdigit():
ports.add(int(part))
return sorted(list(ports))
def parse_ip(ip_str: str) -> List[str]:
"""Finds all IPv4 addresses in a string."""
if not ip_str:
return []
s = str(ip_str)
# Simple regex for IPv4
ips = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', s)
return list(set(ips))