Compare commits
11 Commits
v2026.02.0
...
v2026.02.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 5c95469ca3 | |||
| 07c7ec23d9 | |||
| 9e28004d6c | |||
| 8b3584fa9e | |||
| a202e267f7 | |||
| 2ccf6c293a | |||
| b6266bea81 | |||
| 284e6b1fbf | |||
| b3c5f3a6fd | |||
| 2634c87dcd | |||
| f28af4de7a |
@@ -9,14 +9,16 @@ jobs:
|
||||
build:
|
||||
name: Build Windows Exe
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Build with PyInstaller (Wine)
|
||||
- name: Build with PyInstaller
|
||||
uses: docker://cdrx/pyinstaller-windows:python3
|
||||
with:
|
||||
args: "python -m pip install -r requirements.txt && pyinstaller --onefile --clean --name wif2ansible run.py"
|
||||
entrypoint: /bin/sh
|
||||
args: -c "python -m pip install -r requirements.txt && pyinstaller --onefile --clean --name wif2ansible run.py"
|
||||
|
||||
- name: Generate Version Tag
|
||||
id: version
|
||||
@@ -27,7 +29,7 @@ jobs:
|
||||
with:
|
||||
tag_name: ${{ steps.version.outputs.TAG }}
|
||||
name: Release ${{ steps.version.outputs.TAG }}
|
||||
files: dist/wif2ansible.exe
|
||||
files: dist/**/*.exe
|
||||
draft: false
|
||||
prerelease: false
|
||||
env:
|
||||
|
||||
26
prompt_journal.md
Normal file
26
prompt_journal.md
Normal file
@@ -0,0 +1,26 @@
|
||||
# Prompt Journal
|
||||
|
||||
This document contains a log of the prompts used to build and refine this project.
|
||||
|
||||
## Session 1: Initial Refactor & Packaging
|
||||
|
||||
1. **Objective**: I have a ruby script that parses excel "WIF" documents that contain server names and network flows source destination ports. I need these to create an ansible inventory with attributes that detail the network connectivity required by each server in the solution. parse the WIF excel document and my existing by-hand ruby script and completely refactor them in python and increase the robustness of the parsing of network flows by using regex and allowing for shifts of cells, be aware of hidden cells and do not include a hidden cell in the output. Some cells may be visually the same cell with formatting/styling but in fact be different cells within the spreasheet - try to accomodate this. Ensure that only flows where the servers listed on the Servers tab are included in the resulting ansible inventory. the new python refactored script should allow me to take another WIF file with different servers and network flows and generate another ansible inventory file. You can see the two starting files to work with in the current project folder
|
||||
2. **Packaging**: is there a way to package this
|
||||
3. **Executable**: yes make me an exe
|
||||
4. **Documentation**: create a readme.md that details how to run this
|
||||
5. **Git Setup**: add all xls and xlsx to gitignore, add generated yml inventories to gitignore (not individually) all of them and all future. add and push this project to main at https://gitea.krisforbes.ca/krisf/wif2ansible.git. advise me of any potential ip address leakage before commiting and pushing this project
|
||||
|
||||
## Session 2: CI/CD & Logic Refinement
|
||||
|
||||
6. **CI/CD**: create me a gitea runner to generate a new release and automatically version releases each time a new push is made
|
||||
7. **Push**: push it and ensure a new exe is generated by gitea runner
|
||||
8. **Debug**: where is my bundled exe in my release
|
||||
9. **Debug**: my release should contain an exe downloadable from the releases tab in git but it does not
|
||||
10. **Debug**: error
|
||||
11. **Debug**: error again
|
||||
12. **Logic Update (Ports/DNS)**: when you parse a range of ports to be added to the result, only add the first, last, and middle port. examine the to_mgt_ip function in my original ruby script, this function converts the production interface public ipv4 address listed in the flows tab to the management private nic name by using DNS. please ensure this is implemented in the python version and push the new version
|
||||
13. **Logic Update (Server Tab)**: thats a good point, you can also use the server's tab of the wif to ensure the name is used for the source so our ansible playbook can connect to the server. With the production ipv4, we have no ability to connect. use the server's tab of the WIF first and fallback to my ruby DNS method
|
||||
14. **Release**: push a new version
|
||||
15. **Output Format**: in the genrrated yaml file, i should see all: hosts : and the entires under this should not be IP addresses, they should be the server names from mapping the servers tab of the excel file
|
||||
16. **Refining Output**: the server name key should not contain SRV### this is part of a reference and is not relevant except to provide mapping from the flows tab information to the servers information, please consider this SRV### information in your servers tab matching logic
|
||||
17. **Documentation**: add a prompt journal to this project that cotnains a copy of all prompts used and automatically adds new prompts as i send them
|
||||
46
test_inventory_keys.py
Normal file
46
test_inventory_keys.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import unittest
|
||||
from wif2ansible.models import Server, Flow
|
||||
from wif2ansible.inventory import generate_inventory
|
||||
|
||||
class TestInventoryKeys(unittest.TestCase):
|
||||
def test_inventory_keys_are_hostnames(self):
|
||||
# Create a server with Ref, Hostname, IP
|
||||
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_address="192.168.1.10", platform="windows")
|
||||
|
||||
# Create a flow matching this server
|
||||
f1 = Flow(flow_id="1", source_ip="192.168.1.10", destination_ip="10.0.0.1", ports=[80])
|
||||
|
||||
servers = {"SERVER_REF_01": s1}
|
||||
flows = [f1]
|
||||
|
||||
inventory = generate_inventory(servers, flows)
|
||||
|
||||
# Verify stricture
|
||||
hosts = inventory['all']['hosts']
|
||||
|
||||
# Key should be REFERENCE "SERVER_REF_01" (or hostname/ip fallback)
|
||||
self.assertIn("SERVER_REF_01", hosts)
|
||||
self.assertNotIn("192.168.1.10", hosts)
|
||||
|
||||
# Check variables
|
||||
host_vars = hosts["SERVER_REF_01"]
|
||||
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
||||
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
||||
|
||||
def test_clean_reference_logic(self):
|
||||
from wif2ansible.parsers import clean_reference
|
||||
|
||||
# Test cases
|
||||
self.assertEqual(clean_reference("SRV123 MyServer"), "MyServer")
|
||||
self.assertEqual(clean_reference("SVR999 AnotherServer"), "AnotherServer")
|
||||
self.assertEqual(clean_reference("srv001 lowercase"), "lowercase")
|
||||
self.assertEqual(clean_reference("SvR555 MixedCase"), "MixedCase")
|
||||
self.assertEqual(clean_reference("JustName"), "JustName")
|
||||
self.assertEqual(clean_reference("SRV123"), "") # Should be empty? or handle?
|
||||
# If it's just SRV123, strip returns empty.
|
||||
# User said "never include these in output".
|
||||
# If the server is ONLY named SRV123, what then?
|
||||
# Assuming there is usually a name.
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -2,7 +2,7 @@ import openpyxl
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
from .models import Server, Flow
|
||||
from .parsers import parse_ports, parse_ip, clean_header
|
||||
from .parsers import parse_ports, parse_ip, clean_header, clean_reference
|
||||
|
||||
from openpyxl.utils import get_column_letter
|
||||
|
||||
@@ -77,9 +77,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
print("Warning: No 'Servers' sheet found.")
|
||||
return {}
|
||||
|
||||
# keywords: reference, platform, ip address, management ip?
|
||||
# Ruby script looked for: reference, type, alias, platform, middleware
|
||||
header_keywords = ['reference', 'platform', 'ip address']
|
||||
# keywords: reference, platform, ip address, management ip, production ip
|
||||
header_keywords = ['reference', 'platform', 'ip address', 'production ip']
|
||||
|
||||
header_row_idx, col_map = find_header_row(target_sheet, header_keywords)
|
||||
|
||||
@@ -98,7 +97,8 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
# Extract data
|
||||
ref_idx = col_map.get('reference')
|
||||
plat_idx = col_map.get('platform')
|
||||
ip_idx = col_map.get('ip address') # Generic IP
|
||||
ip_idx = col_map.get('ip address') # Generic/Management IP
|
||||
prod_ip_idx = col_map.get('production ip') # Specific Production IP
|
||||
|
||||
# Helper to get value
|
||||
def get_val(idx):
|
||||
@@ -111,19 +111,29 @@ def read_servers(filename: str) -> Dict[str, Server]:
|
||||
continue
|
||||
|
||||
plat = get_val(plat_idx) or 'unknown'
|
||||
ip_raw = get_val(ip_idx)
|
||||
|
||||
# Parse Management IP
|
||||
ip_raw = get_val(ip_idx)
|
||||
ip_addr = None
|
||||
if ip_raw:
|
||||
ips = parse_ip(ip_raw)
|
||||
if ips:
|
||||
ip_addr = ips[0] # Take first valid IP
|
||||
ip_addr = ips[0]
|
||||
|
||||
# Parse Production IP
|
||||
prod_ip_raw = get_val(prod_ip_idx)
|
||||
prod_ip_addr = None
|
||||
if prod_ip_raw:
|
||||
ips = parse_ip(prod_ip_raw)
|
||||
if ips:
|
||||
prod_ip_addr = ips[0]
|
||||
|
||||
s = Server(
|
||||
reference=ref,
|
||||
hostname=ref, # Default hostname to reference
|
||||
hostname=clean_reference(ref),
|
||||
platform=plat,
|
||||
ip_address=ip_addr
|
||||
ip_address=ip_addr,
|
||||
production_ip=prod_ip_addr
|
||||
)
|
||||
servers[ref] = s
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from typing import List, Dict, Any
|
||||
from .models import Server, Flow
|
||||
from .network import to_mgt_ip
|
||||
|
||||
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -19,9 +20,13 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
for s in servers.values():
|
||||
if s.ip_address:
|
||||
ip_to_server[s.ip_address] = s
|
||||
# also index by hostname/reference potentially?
|
||||
# ip_to_server[s.reference] = s
|
||||
# But flows ususally have IPs.
|
||||
if s.production_ip:
|
||||
ip_to_server[s.production_ip] = s
|
||||
# Also index by reference/hostname for DNS matches
|
||||
if s.reference:
|
||||
ip_to_server[s.reference.lower()] = s
|
||||
if s.hostname:
|
||||
ip_to_server[s.hostname.lower()] = s
|
||||
|
||||
inventory_hosts = {}
|
||||
|
||||
@@ -34,18 +39,30 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
||||
server = ip_to_server.get(flow.source_ip)
|
||||
|
||||
if not server:
|
||||
# Try finding by looking if source matches any server's reference/hostname?
|
||||
# Unlikely for IPs.
|
||||
# Try DNS resolution (Public IP -> Management FQDN)
|
||||
mgt_dns = to_mgt_ip(flow.source_ip)
|
||||
if mgt_dns:
|
||||
# mgt_dns might be "server.ds.gc.ca".
|
||||
# Our keys might be "server" or "server.ds.gc.ca" or IPs
|
||||
# Try exact match
|
||||
server = ip_to_server.get(mgt_dns.lower())
|
||||
|
||||
# If not found, try shortname?
|
||||
if not server:
|
||||
short = mgt_dns.split('.')[0]
|
||||
server = ip_to_server.get(short.lower())
|
||||
|
||||
if not server:
|
||||
drop_count += 1
|
||||
if drop_count <= 5: # Debug spam limit
|
||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} not found in Servers tab.")
|
||||
print(f"Dropping flow {flow.flow_id}: Source {flow.source_ip} (Mgt: {mgt_dns}) not found in Servers tab.")
|
||||
continue
|
||||
|
||||
match_count += 1
|
||||
|
||||
# Prepare host entry if new
|
||||
# We use the IP as the key in inventory 'hosts'
|
||||
host_key = server.ip_address
|
||||
# We use the Reference/Hostname as the key in inventory 'hosts'
|
||||
host_key = server.reference or server.hostname or server.ip_address
|
||||
|
||||
if host_key not in inventory_hosts:
|
||||
host_vars = server.get_ansible_vars()
|
||||
|
||||
@@ -6,6 +6,7 @@ class Server:
|
||||
reference: str
|
||||
hostname: str # This might be same as reference
|
||||
ip_address: Optional[str] = None
|
||||
production_ip: Optional[str] = None
|
||||
platform: str = 'unknown' # e.g. 'Windows', 'Linux'
|
||||
|
||||
def get_ansible_vars(self) -> Dict[str, Any]:
|
||||
@@ -22,6 +23,9 @@ class Server:
|
||||
# Default ssh is usually fine, but being explicit doesn't hurt
|
||||
pass
|
||||
|
||||
if self.ip_address:
|
||||
vars['ansible_host'] = self.ip_address
|
||||
|
||||
return vars
|
||||
|
||||
@dataclass
|
||||
|
||||
50
wif2ansible/network.py
Normal file
50
wif2ansible/network.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import socket
|
||||
from typing import Optional
|
||||
|
||||
def get_hostname(ip: str) -> Optional[str]:
|
||||
try:
|
||||
# Python's equivalent to Resolv.getname(ip)
|
||||
# returns (hostname, aliaslist, ipaddrlist)
|
||||
return socket.gethostbyaddr(ip)[0]
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
def get_ip(hostname: str) -> Optional[str]:
|
||||
try:
|
||||
return socket.gethostbyname(hostname)
|
||||
except socket.error:
|
||||
return None
|
||||
|
||||
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
||||
"""
|
||||
Mimics the Ruby script's to_mgt_ip logic:
|
||||
1. Reverse lookup IP to get FQDN.
|
||||
2. Construct management FQDN ({host}.ds.gc.ca or .pre-ds.gc.ca).
|
||||
3. Resolve that management FQDN to an IP.
|
||||
4. Return the Management FQDN if successful.
|
||||
"""
|
||||
|
||||
# In Ruby script, input 'name' is often an IP address from the WIF source column.
|
||||
|
||||
# Step 1: Reverse Lookup
|
||||
fqdn = get_hostname(name_or_ip)
|
||||
if not fqdn:
|
||||
# If input is already a name, use it? Ruby script assumes it gets a name from Resolv.getname(ip)
|
||||
# If name_or_ip is NOT an IP, gethostbyaddr might fail or behave differently.
|
||||
# But if it's already a name, we can try using it.
|
||||
fqdn = name_or_ip
|
||||
|
||||
short_name = fqdn.split('.')[0]
|
||||
|
||||
# Step 2 & 3: Try suffixes
|
||||
suffixes = ['.ds.gc.ca', '.pre-ds.gc.ca']
|
||||
|
||||
for suffix in suffixes:
|
||||
mgt_dns = short_name + suffix
|
||||
resolved_ip = get_ip(mgt_dns)
|
||||
if resolved_ip:
|
||||
# Ruby: return mgt_dns if mgt_ip.to_s.length > 4
|
||||
return mgt_dns
|
||||
|
||||
# print(f"Warning: {name_or_ip} could not be resolved to a management address.")
|
||||
return None
|
||||
@@ -43,18 +43,12 @@ def parse_ports(port_str: str) -> List[int]:
|
||||
if range_match:
|
||||
start, end = map(int, range_match.groups())
|
||||
if start <= end:
|
||||
# Limitation: adding huge ranges might blow up inventory size
|
||||
# but for Ansible 'ports' list it's better to be explicit or use range syntax.
|
||||
# For now, let's keep it expanded if small, or maybe just keeps the start/end?
|
||||
# Ruby script logic: expanded it.
|
||||
# We'll limit expansion to avoid DOSing ourselves.
|
||||
if end - start < 1000:
|
||||
ports.update(range(start, end + 1))
|
||||
else:
|
||||
# Fallback: just add start and end to avoid massive lists?
|
||||
# Or maybe ansible allows ranges?
|
||||
# Usually we list ports. Let's expand for now.
|
||||
ports.update(range(start, end + 1))
|
||||
# User Request: "only add the first, last, and middle port"
|
||||
ports.add(start)
|
||||
ports.add(end)
|
||||
if end - start > 1:
|
||||
middle = start + (end - start) // 2
|
||||
ports.add(middle)
|
||||
continue
|
||||
|
||||
# Single port
|
||||
@@ -63,6 +57,21 @@ def parse_ports(port_str: str) -> List[int]:
|
||||
|
||||
return sorted(list(ports))
|
||||
|
||||
def clean_reference(ref: str) -> str:
|
||||
"""
|
||||
Cleans a server reference string.
|
||||
Specifically removes 'SRV###' type prefixes if present.
|
||||
Example: 'SRV123 MyServer' -> 'MyServer'
|
||||
"""
|
||||
if not ref:
|
||||
return ""
|
||||
|
||||
s = str(ref)
|
||||
# Remove SRV or SVR followed by digits and whitespace
|
||||
s = re.sub(r'S(RV|VR)\d+\s*', '', s, flags=re.IGNORECASE)
|
||||
# Remove leading/trailing whitespace
|
||||
return s.strip()
|
||||
|
||||
def parse_ip(ip_str: str) -> List[str]:
|
||||
"""Finds all IPv4 addresses in a string."""
|
||||
if not ip_str:
|
||||
|
||||
Reference in New Issue
Block a user