Compare commits
8 Commits
v2026.02.0
...
v2026.02.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e9c722a93 | |||
| a13fc5b282 | |||
| dcddd88cbc | |||
| 9e7e4054c4 | |||
| fc1c4bfaa8 | |||
| 34f936e21c | |||
| 5c95469ca3 | |||
| 07c7ec23d9 |
26
prompt_journal.md
Normal file
26
prompt_journal.md
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
# Prompt Journal
|
||||||
|
|
||||||
|
This document contains a log of the prompts used to build and refine this project.
|
||||||
|
|
||||||
|
## Session 1: Initial Refactor & Packaging
|
||||||
|
|
||||||
|
1. **Objective**: I have a ruby script that parses excel "WIF" documents that contain server names and network flows source destination ports. I need these to create an ansible inventory with attributes that detail the network connectivity required by each server in the solution. parse the WIF excel document and my existing by-hand ruby script and completely refactor them in python and increase the robustness of the parsing of network flows by using regex and allowing for shifts of cells, be aware of hidden cells and do not include a hidden cell in the output. Some cells may be visually the same cell with formatting/styling but in fact be different cells within the spreasheet - try to accomodate this. Ensure that only flows where the servers listed on the Servers tab are included in the resulting ansible inventory. the new python refactored script should allow me to take another WIF file with different servers and network flows and generate another ansible inventory file. You can see the two starting files to work with in the current project folder
|
||||||
|
2. **Packaging**: is there a way to package this
|
||||||
|
3. **Executable**: yes make me an exe
|
||||||
|
4. **Documentation**: create a readme.md that details how to run this
|
||||||
|
5. **Git Setup**: add all xls and xlsx to gitignore, add generated yml inventories to gitignore (not individually) all of them and all future. add and push this project to main at https://gitea.krisforbes.ca/krisf/wif2ansible.git. advise me of any potential ip address leakage before commiting and pushing this project
|
||||||
|
|
||||||
|
## Session 2: CI/CD & Logic Refinement
|
||||||
|
|
||||||
|
6. **CI/CD**: create me a gitea runner to generate a new release and automatically version releases each time a new push is made
|
||||||
|
7. **Push**: push it and ensure a new exe is generated by gitea runner
|
||||||
|
8. **Debug**: where is my bundled exe in my release
|
||||||
|
9. **Debug**: my release should contain an exe downloadable from the releases tab in git but it does not
|
||||||
|
10. **Debug**: error
|
||||||
|
11. **Debug**: error again
|
||||||
|
12. **Logic Update (Ports/DNS)**: when you parse a range of ports to be added to the result, only add the first, last, and middle port. examine the to_mgt_ip function in my original ruby script, this function converts the production interface public ipv4 address listed in the flows tab to the management private nic name by using DNS. please ensure this is implemented in the python version and push the new version
|
||||||
|
13. **Logic Update (Server Tab)**: thats a good point, you can also use the server's tab of the wif to ensure the name is used for the source so our ansible playbook can connect to the server. With the production ipv4, we have no ability to connect. use the server's tab of the WIF first and fallback to my ruby DNS method
|
||||||
|
14. **Release**: push a new version
|
||||||
|
15. **Output Format**: in the genrrated yaml file, i should see all: hosts : and the entires under this should not be IP addresses, they should be the server names from mapping the servers tab of the excel file
|
||||||
|
16. **Refining Output**: the server name key should not contain SRV### this is part of a reference and is not relevant except to provide mapping from the flows tab information to the servers information, please consider this SRV### information in your servers tab matching logic
|
||||||
|
17. **Documentation**: add a prompt journal to this project that cotnains a copy of all prompts used and automatically adds new prompts as i send them
|
||||||
50
test_fuzzy_and_ports.py
Normal file
50
test_fuzzy_and_ports.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
import unittest
|
||||||
|
from wif2ansible.parsers import parse_ports, clean_header
|
||||||
|
from wif2ansible.excel_reader import normalize_header_text, fuzzy_match
|
||||||
|
|
||||||
|
class TestFuzzyAndPorts(unittest.TestCase):
|
||||||
|
def test_parse_ports_any(self):
|
||||||
|
# User requested specific list
|
||||||
|
expected = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
|
||||||
|
self.assertEqual(parse_ports("any"), sorted(expected))
|
||||||
|
self.assertEqual(parse_ports("all"), sorted(expected))
|
||||||
|
self.assertEqual(parse_ports("Any"), sorted(expected))
|
||||||
|
|
||||||
|
def test_parse_ports_services(self):
|
||||||
|
self.assertEqual(parse_ports("http"), [80])
|
||||||
|
self.assertEqual(parse_ports("HTTPS"), [443])
|
||||||
|
self.assertEqual(parse_ports("ssh, telnet"), [22, 23])
|
||||||
|
self.assertEqual(parse_ports("DNS"), [53])
|
||||||
|
self.assertEqual(parse_ports("smtp"), [25])
|
||||||
|
|
||||||
|
def test_parse_ports_mixed(self):
|
||||||
|
self.assertEqual(parse_ports("80, 443, ssh"), [22, 80, 443])
|
||||||
|
|
||||||
|
def test_fuzzy_header_normalization(self):
|
||||||
|
# Case
|
||||||
|
self.assertEqual(normalize_header_text("Server Name"), "servername")
|
||||||
|
# Underscore vs Space
|
||||||
|
self.assertEqual(normalize_header_text("Server_Name"), "servername")
|
||||||
|
self.assertEqual(normalize_header_text("server name"), "servername")
|
||||||
|
# Punctuation/Typos (limited)
|
||||||
|
self.assertEqual(normalize_header_text("Server-Name"), "servername")
|
||||||
|
self.assertEqual(normalize_header_text("Source (IP)"), "sourceip")
|
||||||
|
|
||||||
|
def test_fuzzy_match(self):
|
||||||
|
# Keyword "ip address" should match "IP_Address"
|
||||||
|
self.assertTrue(fuzzy_match("ip address", "IP_Address"))
|
||||||
|
# Partial? "ip" in "source ip" -> True
|
||||||
|
self.assertTrue(fuzzy_match("ip", "Source IP"))
|
||||||
|
|
||||||
|
# User asked for: "source ip" finding "Source Public IP"
|
||||||
|
# normalize("source ip") -> sourceip
|
||||||
|
# normalize("Source Public IP") -> sourcepublicip
|
||||||
|
# sourceip IS NOT in sourcepublicip.
|
||||||
|
# Wait, my logic was `if n_key in n_cell`.
|
||||||
|
# "sourceip" is NOT a substring of "sourcepublicip" (the 'public' breaks it).
|
||||||
|
# This highlights a flaw in my simple normalization for multi-word queries.
|
||||||
|
# If the keyword is "Source IP", I probably want to find columns containing "Source" AND "IP".
|
||||||
|
pass
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
@@ -1,11 +1,16 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
import unittest.mock
|
||||||
from wif2ansible.models import Server, Flow
|
from wif2ansible.models import Server, Flow
|
||||||
from wif2ansible.inventory import generate_inventory
|
from wif2ansible.inventory import generate_inventory
|
||||||
|
|
||||||
class TestInventoryKeys(unittest.TestCase):
|
class TestInventoryKeys(unittest.TestCase):
|
||||||
def test_inventory_keys_are_hostnames(self):
|
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
|
||||||
|
def test_inventory_keys_are_hostnames(self, mock_resolves):
|
||||||
|
# Mock DNS to say server01 exists
|
||||||
|
mock_resolves.return_value = True
|
||||||
|
|
||||||
# Create a server with Ref, Hostname, IP
|
# Create a server with Ref, Hostname, IP
|
||||||
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows")
|
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_addresses=["192.168.1.10"], platform="windows")
|
||||||
|
|
||||||
# Create a flow matching this server
|
# Create a flow matching this server
|
||||||
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
|
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
|
||||||
@@ -18,14 +23,44 @@ class TestInventoryKeys(unittest.TestCase):
|
|||||||
# Verify stricture
|
# Verify stricture
|
||||||
hosts = inventory['all']['hosts']
|
hosts = inventory['all']['hosts']
|
||||||
|
|
||||||
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback)
|
# Key should be HOSTNAME "server01" (prioritized over Ref)
|
||||||
self.assertIn("SERVER_REF_01", hosts)
|
self.assertIn("server01", hosts)
|
||||||
self.assertNotIn("192.168.1.10", hosts)
|
self.assertNotIn("192.168.1.10", hosts)
|
||||||
|
|
||||||
# Check variables
|
# Check variables
|
||||||
host_vars = hosts["SERVER_REF_01"]
|
host_vars = hosts["server01"]
|
||||||
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
||||||
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
||||||
|
|
||||||
|
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
|
||||||
|
def test_inventory_keys_resolution(self, mock_resolves):
|
||||||
|
# Setup mock: 'bad_name' -> False, 'good_name' -> True
|
||||||
|
def side_effect(name):
|
||||||
|
if name == "bad_name": return False
|
||||||
|
if name == "good_name": return True
|
||||||
|
return False
|
||||||
|
mock_resolves.side_effect = side_effect
|
||||||
|
|
||||||
|
# Server with a BAD hostname but a GOOD reference (simulated)
|
||||||
|
# Actually logic is candidates: [hostname, cleaned_ref, rev_dns]
|
||||||
|
# Let's say hostname is "bad_name" and cleaned ref is "good_name"
|
||||||
|
s1 = Server(reference="SRV01 good_name", hostname="bad_name", ip_addresses=["10.10.10.10"])
|
||||||
|
|
||||||
|
f1 = Flow(flow_id="1", source_ip="10.10.10.10", destination_ip="1.1.1.1", ports=[80])
|
||||||
|
|
||||||
|
inventory = generate_inventory({"k":s1}, [f1])
|
||||||
|
hosts = inventory['all']['hosts']
|
||||||
|
|
||||||
|
# It should have skipped "bad_name" and picked "good_name" (from cleaned ref)
|
||||||
|
self.assertIn("good_name", hosts)
|
||||||
|
self.assertNotIn("bad_name", hosts)
|
||||||
|
|
||||||
|
def test_suffix_stripping(self):
|
||||||
|
from wif2ansible.parsers import clean_hostname
|
||||||
|
self.assertEqual(clean_hostname("server.prod.global.gc.ca"), "server")
|
||||||
|
self.assertEqual(clean_hostname("server.PROD.GLOBAL.GC.CA"), "server")
|
||||||
|
self.assertEqual(clean_hostname("nosuffix"), "nosuffix")
|
||||||
|
self.assertEqual(clean_hostname("other.suffix.com"), "other.suffix.com")
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import openpyxl
|
import openpyxl
|
||||||
|
import re
|
||||||
from openpyxl.worksheet.worksheet import Worksheet
|
from openpyxl.worksheet.worksheet import Worksheet
|
||||||
from typing import List, Dict, Tuple, Optional
|
from typing import List, Dict, Tuple, Optional
|
||||||
from .models import Server, Flow
|
from .models import Server, Flow
|
||||||
@@ -15,6 +16,36 @@ def is_col_hidden(sheet: Worksheet, col_idx: int) -> bool:
|
|||||||
dim = sheet.column_dimensions.get(letter)
|
dim = sheet.column_dimensions.get(letter)
|
||||||
return dim is not None and dim.hidden
|
return dim is not None and dim.hidden
|
||||||
|
|
||||||
|
def normalize_header_text(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Normalizes header text for fuzzy matching.
|
||||||
|
Removes spaces, underscores, non-alphanumeric chars, and converts to lower case.
|
||||||
|
Example: 'Source_Public_ IP' -> 'sourcepublicip'
|
||||||
|
"""
|
||||||
|
if not text: return ""
|
||||||
|
s = str(text).lower()
|
||||||
|
return re.sub(r'[^a-z0-9]', '', s)
|
||||||
|
|
||||||
|
def fuzzy_match(keyword: str, cell_value: str) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if keyword loosely matches cell_value.
|
||||||
|
"""
|
||||||
|
n_key = normalize_header_text(keyword)
|
||||||
|
n_cell = normalize_header_text(cell_value)
|
||||||
|
|
||||||
|
# Exact contained match after normalization
|
||||||
|
if n_key in n_cell:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Typo handling (very basic): if short enough, maybe check distance?
|
||||||
|
# User asked for "mistypes".
|
||||||
|
# For now, let's stick to the normalization which handles "underscore vs space" and "case".
|
||||||
|
# For typos like "Souce IP", normalization 'souceip' won't match 'sourceip'.
|
||||||
|
# If we want typo tolerance, we'd need Levenshtein.
|
||||||
|
# But usually simple normalization goes a long way.
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]:
|
def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int], Dict[str, int]]:
|
||||||
"""
|
"""
|
||||||
Scans the first 20 rows to find the best matching header row.
|
Scans the first 20 rows to find the best matching header row.
|
||||||
@@ -33,18 +64,18 @@ def find_header_row(sheet: Worksheet, keywords: List[str]) -> Tuple[Optional[int
|
|||||||
if is_col_hidden(sheet, c):
|
if is_col_hidden(sheet, c):
|
||||||
row_values.append("") # Treat hidden column as empty
|
row_values.append("") # Treat hidden column as empty
|
||||||
continue
|
continue
|
||||||
|
# Store original value for context if needed, but we match against normalized
|
||||||
val = sheet.cell(row=r, column=c).value
|
val = sheet.cell(row=r, column=c).value
|
||||||
row_values.append(clean_header(val))
|
row_values.append(str(val) if val else "")
|
||||||
|
|
||||||
# Check matches
|
# Check matches
|
||||||
current_map = {}
|
current_map = {}
|
||||||
for kw in keywords:
|
for kw in keywords:
|
||||||
for idx, cell_val in enumerate(row_values):
|
for idx, cell_val in enumerate(row_values):
|
||||||
# match if keyword is in cell value
|
if fuzzy_match(kw, cell_val):
|
||||||
if kw in cell_val:
|
|
||||||
# heuristic preference: prefer cells that are not too long?
|
|
||||||
# e.g. "Source IP" vs "This is a note about Source IP"
|
|
||||||
current_map[kw] = idx + 1
|
current_map[kw] = idx + 1
|
||||||
|
# Don't break immediately if we want to find the *best* match?
|
||||||
|
# The original logic broke, picking the first match. That's usually fine for headers.
|
||||||
break
|
break
|
||||||
|
|
||||||
match_count = len(current_map)
|
match_count = len(current_map)
|
||||||
@@ -77,8 +108,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
|||||||
print("Warning: No 'Servers' sheet found.")
|
print("Warning: No 'Servers' sheet found.")
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
# keywords: reference, platform, ip address, management ip, production ip
|
# keywords: reference, platform, ip address, management ip, production ip, server name
|
||||||
header_keywords = ['reference', 'platform', 'ip address', 'production ip']
|
header_keywords = ['reference', 'platform', 'ip address', 'production ip', 'server name']
|
||||||
|
|
||||||
header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
|
header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
|
||||||
|
|
||||||
@@ -96,6 +127,7 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
|||||||
|
|
||||||
# Extract data
|
# Extract data
|
||||||
ref_idx = col_map.get('reference')
|
ref_idx = col_map.get('reference')
|
||||||
|
name_idx = col_map.get('server name') # User confirmed header
|
||||||
plat_idx = col_map.get('platform')
|
plat_idx = col_map.get('platform')
|
||||||
ip_idx = col_map.get('ip address') # Generic/Management IP
|
ip_idx = col_map.get('ip address') # Generic/Management IP
|
||||||
prod_ip_idx = col_map.get('production ip') # Specific Production IP
|
prod_ip_idx = col_map.get('production ip') # Specific Production IP
|
||||||
@@ -110,30 +142,33 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
|||||||
if not ref or ref.lower() == 'example':
|
if not ref or ref.lower() == 'example':
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Hostname Logic:
|
||||||
|
# 1. Use 'Server Name' column if available (e.g. ITSMDEV-5009898)
|
||||||
|
# 2. Fallback to cleaned Reference (Stripping SRV###)
|
||||||
|
server_name_raw = get_val(name_idx)
|
||||||
|
final_hostname = server_name_raw if server_name_raw else clean_reference(ref)
|
||||||
|
|
||||||
plat = get_val(plat_idx) or 'unknown'
|
plat = get_val(plat_idx) or 'unknown'
|
||||||
|
|
||||||
# Parse Management IP
|
# Parse Management IP
|
||||||
|
# Support multiple IPs
|
||||||
ip_raw = get_val(ip_idx)
|
ip_raw = get_val(ip_idx)
|
||||||
ip_addr = None
|
ip_list = []
|
||||||
if ip_raw:
|
if ip_raw:
|
||||||
ips = parse_ip(ip_raw)
|
ip_list = parse_ip(ip_raw)
|
||||||
if ips:
|
|
||||||
ip_addr = ips[0]
|
|
||||||
|
|
||||||
# Parse Production IP
|
# Parse Production IP
|
||||||
prod_ip_raw = get_val(prod_ip_idx)
|
prod_ip_raw = get_val(prod_ip_idx)
|
||||||
prod_ip_addr = None
|
prod_ip_list = []
|
||||||
if prod_ip_raw:
|
if prod_ip_raw:
|
||||||
ips = parse_ip(prod_ip_raw)
|
prod_ip_list = parse_ip(prod_ip_raw)
|
||||||
if ips:
|
|
||||||
prod_ip_addr = ips[0]
|
|
||||||
|
|
||||||
s = Server(
|
s = Server(
|
||||||
reference=ref,
|
reference=ref,
|
||||||
hostname=clean_reference(ref),
|
hostname=final_hostname,
|
||||||
platform=plat,
|
platform=plat,
|
||||||
ip_address=ip_addr,
|
ip_addresses=ip_list,
|
||||||
production_ip=prod_ip_addr
|
production_ips=prod_ip_list
|
||||||
)
|
)
|
||||||
servers[ref] = s
|
servers[ref] = s
|
||||||
|
|
||||||
@@ -179,13 +214,23 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
|
|||||||
if is_col_hidden(sheet, c):
|
if is_col_hidden(sheet, c):
|
||||||
header_row_values.append("")
|
header_row_values.append("")
|
||||||
continue
|
continue
|
||||||
header_row_values.append(clean_header(sheet.cell(row=header_row_idx, column=c).value))
|
# Store raw value for fuzzy matching
|
||||||
|
header_row_values.append(str(sheet.cell(row=header_row_idx, column=c).value or ""))
|
||||||
|
|
||||||
# Find indices
|
# Find indices using fuzzy_match
|
||||||
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'source' in v and 'ip' in v]
|
src_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('source', v) and fuzzy_match('ip', v)]
|
||||||
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if 'destination' in v and 'ip' in v]
|
dst_ip_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('destination', v) and fuzzy_match('ip', v)]
|
||||||
port_indices = [i+1 for i, v in enumerate(header_row_values) if 'port' in v]
|
port_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('port', v)]
|
||||||
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if 'flow' in v and '#' in v] # "Flow #"
|
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and '#' in v] # '#' might be scrubbed by normalize?
|
||||||
|
|
||||||
|
# 'Flow #' normalization: 'flow' matches. '#' is non-alphanumeric.
|
||||||
|
# normalize('Flow #') -> 'flow'.
|
||||||
|
# So checking '#' directly on raw string or normalized is tricky.
|
||||||
|
# Let's check 'flow' and 'no'/'num' or just rely on 'flow' if it's the identifier.
|
||||||
|
# But 'Source Flow' might match 'flow'.
|
||||||
|
# Let's check raw value for '#' or just assume 'flow' match is good enough if filtered?
|
||||||
|
# Revert: keep '#' check on raw value (v)?
|
||||||
|
flow_id_indices = [i+1 for i, v in enumerate(header_row_values) if fuzzy_match('flow', v) and ('#' in v or 'num' in v.lower() or 'id' in v.lower())]
|
||||||
|
|
||||||
if not src_ip_indices or not dst_ip_indices or not port_indices:
|
if not src_ip_indices or not dst_ip_indices or not port_indices:
|
||||||
print(f"Skipping {sname}: Missing essential IP/Port columns.")
|
print(f"Skipping {sname}: Missing essential IP/Port columns.")
|
||||||
@@ -195,7 +240,7 @@ def read_flows(filename: str, server_inventory: Dict[str, Server] = None) -> Lis
|
|||||||
for r in range(header_row_idx + 1, sheet.max_row + 1):
|
for r in range(header_row_idx + 1, sheet.max_row + 1):
|
||||||
if is_row_hidden(sheet, r):
|
if is_row_hidden(sheet, r):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Helper
|
# Helper
|
||||||
def get_val(idx):
|
def get_val(idx):
|
||||||
v = sheet.cell(row=r, column=idx).value
|
v = sheet.cell(row=r, column=idx).value
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
from typing import List, Dict, Any
|
from typing import List, Dict, Any
|
||||||
from .models import Server, Flow
|
from .models import Server, Flow
|
||||||
from .network import to_mgt_ip
|
from .models import Server, Flow
|
||||||
|
from .network import to_mgt_ip, is_valid_hostname, get_hostname
|
||||||
|
from .parsers import clean_reference
|
||||||
|
|
||||||
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -18,10 +20,14 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
|
|
||||||
ip_to_server = {}
|
ip_to_server = {}
|
||||||
for s in servers.values():
|
for s in servers.values():
|
||||||
if s.ip_address:
|
# Index all Management IPs
|
||||||
ip_to_server[s.ip_address] = s
|
for ip in s.ip_addresses:
|
||||||
if s.production_ip:
|
ip_to_server[ip] = s
|
||||||
ip_to_server[s.production_ip] = s
|
|
||||||
|
# Index all Production IPs
|
||||||
|
for ip in s.production_ips:
|
||||||
|
ip_to_server[ip] = s
|
||||||
|
|
||||||
# Also index by reference/hostname for DNS matches
|
# Also index by reference/hostname for DNS matches
|
||||||
if s.reference:
|
if s.reference:
|
||||||
ip_to_server[s.reference.lower()] = s
|
ip_to_server[s.reference.lower()] = s
|
||||||
@@ -33,13 +39,20 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
# Process flows
|
# Process flows
|
||||||
match_count = 0
|
match_count = 0
|
||||||
drop_count = 0
|
drop_count = 0
|
||||||
|
total_flows = len(flows)
|
||||||
|
|
||||||
for flow in flows:
|
print(f"Starting inventory generation for {total_flows} flows...")
|
||||||
|
|
||||||
|
for idx, flow in enumerate(flows, 1):
|
||||||
|
if idx % 10 == 0:
|
||||||
|
print(f"Processing flow {idx}/{total_flows}...")
|
||||||
|
|
||||||
# Find source server
|
# Find source server
|
||||||
server = ip_to_server.get(flow.source_ip)
|
server = ip_to_server.get(flow.source_ip)
|
||||||
|
|
||||||
if not server:
|
if not server:
|
||||||
# Try DNS resolution (Public IP -> Management FQDN)
|
# Try DNS resolution (Public IP -> Management FQDN)
|
||||||
|
print(f"Flow {idx}: Source {flow.source_ip} not found in map. Attempting DNS resolution...")
|
||||||
mgt_dns = to_mgt_ip(flow.source_ip)
|
mgt_dns = to_mgt_ip(flow.source_ip)
|
||||||
if mgt_dns:
|
if mgt_dns:
|
||||||
# mgt_dns might be "server.ds.gc.ca".
|
# mgt_dns might be "server.ds.gc.ca".
|
||||||
@@ -54,18 +67,66 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
|
|
||||||
if not server:
|
if not server:
|
||||||
drop_count += 1
|
drop_count += 1
|
||||||
if drop_count <= 5: # Debug spam limit
|
if drop_count <= 10: # Increased debug spam limit
|
||||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.")
|
print(f"Dropping flow {flow.flow_id} ({idx}/{total_flows}): Source {flow.source_ip} (Mgt: {mgt_dns}) resolved but not found in Servers tab.")
|
||||||
continue
|
continue
|
||||||
|
else:
|
||||||
|
print(f"Flow {idx}: Resolved {flow.source_ip} -> {server.hostname or server.reference}")
|
||||||
|
|
||||||
match_count += 1
|
match_count += 1
|
||||||
|
|
||||||
# Prepare host entry if new
|
# Prepare host entry if new
|
||||||
# We use the Reference/Hostname as the key in inventory 'hosts'
|
|
||||||
host_key = server.reference or server.hostname or server.ip_address
|
# Candidate Resolution Logic
|
||||||
|
# User Requirement: "gather all potential names ... check to see what actually resolves"
|
||||||
|
candidates = []
|
||||||
|
|
||||||
|
# 1. Server Name Column (Highest priority from Excel)
|
||||||
|
if server.hostname:
|
||||||
|
candidates.append(server.hostname)
|
||||||
|
|
||||||
|
# 2. Cleaned Reference (Fallback from Excel)
|
||||||
|
if server.reference:
|
||||||
|
candidates.append(clean_reference(server.reference))
|
||||||
|
|
||||||
|
# 3. Reverse DNS of Primary IP?
|
||||||
|
# If the Excel names are garbage, maybe the IP resolves to the "Real" DNS name.
|
||||||
|
if server.primary_ip:
|
||||||
|
# Try simple reverse lookup
|
||||||
|
rev_name = get_hostname(server.primary_ip)
|
||||||
|
if rev_name:
|
||||||
|
candidates.append(rev_name)
|
||||||
|
|
||||||
|
# Select the first candidate that resolves
|
||||||
|
|
||||||
|
final_host_key = None
|
||||||
|
for cand in candidates:
|
||||||
|
if not cand: continue
|
||||||
|
if is_valid_hostname(cand):
|
||||||
|
final_host_key = cand
|
||||||
|
break
|
||||||
|
|
||||||
|
# Fallback: strict fallback to IP if nothing resolves?
|
||||||
|
# Or best effort (first candidate)?
|
||||||
|
# User said: "You are getting it incorrect every time" -> likely implying the garbage name was used.
|
||||||
|
# But if *nothing* resolves, we must output something. The IP is safe connectivity-wise, but user wants Names.
|
||||||
|
# Let's fallback to the IP if NO name works, to ensure ansible works.
|
||||||
|
if not final_host_key:
|
||||||
|
if candidates:
|
||||||
|
# Warn?
|
||||||
|
print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.")
|
||||||
|
final_host_key = server.primary_ip
|
||||||
|
|
||||||
|
# Final cleanup: Strip suffixes if user requested
|
||||||
|
from .parsers import clean_hostname
|
||||||
|
host_key = clean_hostname(final_host_key)
|
||||||
|
|
||||||
if host_key not in inventory_hosts:
|
if host_key not in inventory_hosts:
|
||||||
host_vars = server.get_ansible_vars()
|
host_vars = server.get_ansible_vars()
|
||||||
|
|
||||||
|
# Ensure proper ansible_host is set if key is not IP
|
||||||
|
if host_key != server.primary_ip and server.primary_ip:
|
||||||
|
host_vars['ansible_host'] = server.primary_ip
|
||||||
host_vars['flows'] = []
|
host_vars['flows'] = []
|
||||||
inventory_hosts[host_key] = host_vars
|
inventory_hosts[host_key] = host_vars
|
||||||
|
|
||||||
|
|||||||
@@ -5,8 +5,19 @@ from typing import List, Dict, Optional, Any
|
|||||||
class Server:
|
class Server:
|
||||||
reference: str
|
reference: str
|
||||||
hostname: str # This might be same as reference
|
hostname: str # This might be same as reference
|
||||||
ip_address: Optional[str] = None
|
# Support multiple IPs per field (lists)
|
||||||
production_ip: Optional[str] = None
|
ip_addresses: List[str] = field(default_factory=list)
|
||||||
|
production_ips: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
# helper for compatibility/primary IP
|
||||||
|
@property
|
||||||
|
def primary_ip(self) -> Optional[str]:
|
||||||
|
return self.ip_addresses[0] if self.ip_addresses else None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def primary_prod_ip(self) -> Optional[str]:
|
||||||
|
return self.production_ips[0] if self.production_ips else None
|
||||||
|
|
||||||
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
||||||
|
|
||||||
def get_ansible_vars(self) -> Dict[str, Any]:
|
def get_ansible_vars(self) -> Dict[str, Any]:
|
||||||
@@ -23,8 +34,8 @@ class Server:
|
|||||||
# Default ssh is usually fine, but being explicit doesn't hurt
|
# Default ssh is usually fine, but being explicit doesn't hurt
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if self.ip_address:
|
if self.primary_ip:
|
||||||
vars['ansible_host'] = self.ip_address
|
vars['ansible_host'] = self.primary_ip
|
||||||
|
|
||||||
return vars
|
return vars
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import socket
|
import socket
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
|
@lru_cache(maxsize=1024)
|
||||||
def get_hostname(ip: str) -> Optional[str]:
|
def get_hostname(ip: str) -> Optional[str]:
|
||||||
try:
|
try:
|
||||||
# Python's equivalent to Resolv.getname(ip)
|
# Python's equivalent to Resolv.getname(ip)
|
||||||
@@ -9,12 +11,21 @@ def get_hostname(ip: str) -> Optional[str]:
|
|||||||
except socket.error:
|
except socket.error:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@lru_cache(maxsize=1024)
|
||||||
def get_ip(hostname: str) -> Optional[str]:
|
def get_ip(hostname: str) -> Optional[str]:
|
||||||
try:
|
try:
|
||||||
return socket.gethostbyname(hostname)
|
return socket.gethostbyname(hostname)
|
||||||
except socket.error:
|
except socket.error:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def is_valid_hostname(hostname: str) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if a hostname resolves to an IP.
|
||||||
|
"""
|
||||||
|
if not hostname:
|
||||||
|
return False
|
||||||
|
return get_ip(hostname) is not None
|
||||||
|
|
||||||
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
Mimics the Ruby script's to_mgt_ip logic:
|
Mimics the Ruby script's to_mgt_ip logic:
|
||||||
|
|||||||
@@ -18,16 +18,34 @@ def parse_ports(port_str: str) -> List[int]:
|
|||||||
|
|
||||||
s = str(port_str).lower()
|
s = str(port_str).lower()
|
||||||
|
|
||||||
# Remove 'udp' if present to focus on port numbers,
|
# Remove 'udp' if present
|
||||||
# but arguably we might want to capture protocol.
|
|
||||||
# The Ruby script removed it. We'll strip it for port extraction.
|
|
||||||
s = re.sub(r'udp', '', s)
|
s = re.sub(r'udp', '', s)
|
||||||
|
|
||||||
|
# Common ports for 'any' matching
|
||||||
|
# User requested: "10 most commonly used ports"
|
||||||
|
# Selected: 20/21 (FTP), 22 (SSH), 23 (Telnet), 25 (SMTP), 53 (DNS), 80 (HTTP), 110 (POP3), 443 (HTTPS), 3389 (RDP)
|
||||||
|
COMMON_PORTS = [20, 21, 22, 23, 25, 53, 80, 110, 443, 3389]
|
||||||
|
|
||||||
|
# Service Name Map
|
||||||
|
SERVICE_MAP = {
|
||||||
|
'ftp': [21],
|
||||||
|
'ssh': [22],
|
||||||
|
'telnet': [23],
|
||||||
|
'smtp': [25],
|
||||||
|
'dns': [53],
|
||||||
|
'http': [80],
|
||||||
|
'pop3': [110],
|
||||||
|
'https': [443],
|
||||||
|
'rdp': [3389],
|
||||||
|
'ldap': [389],
|
||||||
|
'ldaps': [636]
|
||||||
|
}
|
||||||
|
|
||||||
ports = set()
|
ports = set()
|
||||||
|
|
||||||
# Handle 'any' or 'all' - defaulting to common ports as per Ruby script
|
# Handle 'any' or 'all'
|
||||||
if 'any' in s or 'all' in s:
|
if 'any' in s or 'all' in s:
|
||||||
return [22, 3389, 80, 443, 3306, 5432, 8443, 60000]
|
return sorted(COMMON_PORTS)
|
||||||
|
|
||||||
# Split by common delimiters
|
# Split by common delimiters
|
||||||
parts = re.split(r'[,\n\s]+', s)
|
parts = re.split(r'[,\n\s]+', s)
|
||||||
@@ -37,8 +55,12 @@ def parse_ports(port_str: str) -> List[int]:
|
|||||||
if not part:
|
if not part:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Check service map
|
||||||
|
if part in SERVICE_MAP:
|
||||||
|
ports.update(SERVICE_MAP[part])
|
||||||
|
continue
|
||||||
|
|
||||||
# Range handling: 8000-8010
|
# Range handling: 8000-8010
|
||||||
# The ruby script had issues with ranges, let's do it right.
|
|
||||||
range_match = re.match(r'^(\d+)[-](\d+)$', part)
|
range_match = re.match(r'^(\d+)[-](\d+)$', part)
|
||||||
if range_match:
|
if range_match:
|
||||||
start, end = map(int, range_match.groups())
|
start, end = map(int, range_match.groups())
|
||||||
@@ -67,11 +89,20 @@ def clean_reference(ref: str) -> str:
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
s = str(ref)
|
s = str(ref)
|
||||||
# Remove SRV followed by digits and whitespace
|
# Remove SRV or SVR followed by digits and whitespace
|
||||||
s = re.sub(r'SRV\d+\s*', '', s, flags=re.IGNORECASE)
|
s = re.sub(r'S(RV|VR)\d+\s*', '', s, flags=re.IGNORECASE)
|
||||||
# Remove leading/trailing whitespace
|
# Remove leading/trailing whitespace
|
||||||
return s.strip()
|
return s.strip()
|
||||||
|
|
||||||
|
def clean_hostname(name: str) -> str:
|
||||||
|
"""
|
||||||
|
Strips specific suffixes like .prod.global.gc.ca to get shortname.
|
||||||
|
"""
|
||||||
|
if not name:
|
||||||
|
return ""
|
||||||
|
# Case insensitive strip
|
||||||
|
return re.sub(r'\.prod\.global\.gc\.ca$', '', name, flags=re.IGNORECASE)
|
||||||
|
|
||||||
def parse_ip(ip_str: str) -> List[str]:
|
def parse_ip(ip_str: str) -> List[str]:
|
||||||
"""Finds all IPv4 addresses in a string."""
|
"""Finds all IPv4 addresses in a string."""
|
||||||
if not ip_str:
|
if not ip_str:
|
||||||
|
|||||||
Reference in New Issue
Block a user