Compare commits
1 Commits
v2026.02.0
...
v2026.02.0
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e7e4054c4 |
@@ -1,9 +1,14 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
import unittest.mock
|
||||||
from wif2ansible.models import Server, Flow
|
from wif2ansible.models import Server, Flow
|
||||||
from wif2ansible.inventory import generate_inventory
|
from wif2ansible.inventory import generate_inventory
|
||||||
|
|
||||||
class TestInventoryKeys(unittest.TestCase):
|
class TestInventoryKeys(unittest.TestCase):
|
||||||
def test_inventory_keys_are_hostnames(self):
|
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
|
||||||
|
def test_inventory_keys_are_hostnames(self, mock_resolves):
|
||||||
|
# Mock DNS to say server01 exists
|
||||||
|
mock_resolves.return_value = True
|
||||||
|
|
||||||
# Create a server with Ref, Hostname, IP
|
# Create a server with Ref, Hostname, IP
|
||||||
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_addresses=["192.168.1.10"], platform="windows")
|
s1 = Server(reference="SERVER_REF_01", hostname="server01", ip_addresses=["192.168.1.10"], platform="windows")
|
||||||
|
|
||||||
@@ -27,20 +32,28 @@ class TestInventoryKeys(unittest.TestCase):
|
|||||||
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
self.assertEqual(host_vars['ansible_host'], "192.168.1.10")
|
||||||
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
self.assertEqual(host_vars['ansible_connection'], "winrm")
|
||||||
|
|
||||||
def test_clean_reference_logic(self):
|
@unittest.mock.patch('wif2ansible.inventory.is_valid_hostname')
|
||||||
from wif2ansible.parsers import clean_reference
|
def test_inventory_keys_resolution(self, mock_resolves):
|
||||||
|
# Setup mock: 'bad_name' -> False, 'good_name' -> True
|
||||||
|
def side_effect(name):
|
||||||
|
if name == "bad_name": return False
|
||||||
|
if name == "good_name": return True
|
||||||
|
return False
|
||||||
|
mock_resolves.side_effect = side_effect
|
||||||
|
|
||||||
# Test cases
|
# Server with a BAD hostname but a GOOD reference (simulated)
|
||||||
self.assertEqual(clean_reference("SRV123 MyServer"), "MyServer")
|
# Actually logic is candidates: [hostname, cleaned_ref, rev_dns]
|
||||||
self.assertEqual(clean_reference("SVR999 AnotherServer"), "AnotherServer")
|
# Let's say hostname is "bad_name" and cleaned ref is "good_name"
|
||||||
self.assertEqual(clean_reference("srv001 lowercase"), "lowercase")
|
s1 = Server(reference="SRV01 good_name", hostname="bad_name", ip_addresses=["10.10.10.10"])
|
||||||
self.assertEqual(clean_reference("SvR555 MixedCase"), "MixedCase")
|
|
||||||
self.assertEqual(clean_reference("JustName"), "JustName")
|
|
||||||
self.assertEqual(clean_reference("SRV123"), "") # Should be empty? or handle?
|
|
||||||
# If it's just SRV123, strip returns empty.
|
|
||||||
# User said "never include these in output".
|
|
||||||
# If the server is ONLY named SRV123, what then?
|
|
||||||
# Assuming there is usually a name.
|
|
||||||
|
|
||||||
|
f1 = Flow(flow_id="1", source_ip="10.10.10.10", destination_ip="1.1.1.1", ports=[80])
|
||||||
|
|
||||||
|
inventory = generate_inventory({"k":s1}, [f1])
|
||||||
|
hosts = inventory['all']['hosts']
|
||||||
|
|
||||||
|
# It should have skipped "bad_name" and picked "good_name" (from cleaned ref)
|
||||||
|
self.assertIn("good_name", hosts)
|
||||||
|
self.assertNotIn("bad_name", hosts)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
from typing import List, Dict, Any
|
from typing import List, Dict, Any
|
||||||
from .models import Server, Flow
|
from .models import Server, Flow
|
||||||
from .network import to_mgt_ip
|
from .models import Server, Flow
|
||||||
|
from .network import to_mgt_ip, is_valid_hostname, get_hostname
|
||||||
|
from .parsers import clean_reference
|
||||||
|
|
||||||
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -65,8 +67,48 @@ def generate_inventory(servers: Dict[str, Server], flows: List[Flow]) -> Dict[st
|
|||||||
match_count += 1
|
match_count += 1
|
||||||
|
|
||||||
# Prepare host entry if new
|
# Prepare host entry if new
|
||||||
# We use the Hostname (from Server Name col) -> Reference (cleaned) -> IP match
|
|
||||||
host_key = server.hostname or server.reference or server.primary_ip
|
# Candidate Resolution Logic
|
||||||
|
# User Requirement: "gather all potential names ... check to see what actually resolves"
|
||||||
|
candidates = []
|
||||||
|
|
||||||
|
# 1. Server Name Column (Highest priority from Excel)
|
||||||
|
if server.hostname:
|
||||||
|
candidates.append(server.hostname)
|
||||||
|
|
||||||
|
# 2. Cleaned Reference (Fallback from Excel)
|
||||||
|
if server.reference:
|
||||||
|
candidates.append(clean_reference(server.reference))
|
||||||
|
|
||||||
|
# 3. Reverse DNS of Primary IP?
|
||||||
|
# If the Excel names are garbage, maybe the IP resolves to the "Real" DNS name.
|
||||||
|
if server.primary_ip:
|
||||||
|
# Try simple reverse lookup
|
||||||
|
rev_name = get_hostname(server.primary_ip)
|
||||||
|
if rev_name:
|
||||||
|
candidates.append(rev_name)
|
||||||
|
|
||||||
|
# Select the first candidate that resolves
|
||||||
|
|
||||||
|
final_host_key = None
|
||||||
|
for cand in candidates:
|
||||||
|
if not cand: continue
|
||||||
|
if is_valid_hostname(cand):
|
||||||
|
final_host_key = cand
|
||||||
|
break
|
||||||
|
|
||||||
|
# Fallback: strict fallback to IP if nothing resolves?
|
||||||
|
# Or best effort (first candidate)?
|
||||||
|
# User said: "You are getting it incorrect every time" -> likely implying the garbage name was used.
|
||||||
|
# But if *nothing* resolves, we must output something. The IP is safe connectivity-wise, but user wants Names.
|
||||||
|
# Let's fallback to the IP if NO name works, to ensure ansible works.
|
||||||
|
if not final_host_key:
|
||||||
|
if candidates:
|
||||||
|
# Warn?
|
||||||
|
print(f"Warning: No resolvable name found for {server.primary_ip} (Candidates: {candidates}). Using IP.")
|
||||||
|
final_host_key = server.primary_ip
|
||||||
|
|
||||||
|
host_key = final_host_key
|
||||||
|
|
||||||
if host_key not in inventory_hosts:
|
if host_key not in inventory_hosts:
|
||||||
host_vars = server.get_ansible_vars()
|
host_vars = server.get_ansible_vars()
|
||||||
|
|||||||
@@ -15,6 +15,14 @@ def get_ip(hostname: str) -> Optional[str]:
|
|||||||
except socket.error:
|
except socket.error:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def is_valid_hostname(hostname: str) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if a hostname resolves to an IP.
|
||||||
|
"""
|
||||||
|
if not hostname:
|
||||||
|
return False
|
||||||
|
return get_ip(hostname) is not None
|
||||||
|
|
||||||
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
def to_mgt_ip(name_or_ip: str) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
Mimics the Ruby script's to_mgt_ip logic:
|
Mimics the Ruby script's to_mgt_ip logic:
|
||||||
|
|||||||
Reference in New Issue
Block a user