93 lines
3.6 KiB
Python
93 lines
3.6 KiB
Python
import unittest
|
|
from unittest.mock import MagicMock, patch, mock_open
|
|
import os
|
|
import time
|
|
import subprocess
|
|
import shutil
|
|
import disk_cleaner
|
|
import sys
|
|
import tempfile
|
|
|
|
def run_test():
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
print(f"Running test in {temp_dir}")
|
|
|
|
log_file = os.path.join(temp_dir, "test_app.log")
|
|
rotated_logs = [os.path.join(temp_dir, f"test_app.log.{i}") for i in range(1, 4)]
|
|
|
|
# 1. Create Rotated Logs
|
|
print("Creating rotated logs...")
|
|
for i, path in enumerate(rotated_logs):
|
|
with open(path, "w") as f:
|
|
f.write("Old data " * 1000)
|
|
t = time.time() - (i + 1) * 3600
|
|
os.utime(path, (t, t))
|
|
print(f"Created {os.path.basename(path)} with mtime {t}")
|
|
|
|
# 2. Mocking
|
|
usage_calls = 0
|
|
def mock_disk_usage(path):
|
|
nonlocal usage_calls
|
|
# Always return stateful usage for ANY path in this test (since we only care about the one)
|
|
usage_calls += 1
|
|
# Call 1: Init scan
|
|
# Call 2: Loop scan
|
|
# Call 3: Post-Strategy A check
|
|
# Call 4: Pre-Strategy B check (inside cleanup_rotated_logs) -> MUST BE HIGH
|
|
if usage_calls <= 4:
|
|
return MagicMock(total=1000, used=900, free=100) # 90%
|
|
# Call 5: After delete 1
|
|
elif usage_calls == 5:
|
|
return MagicMock(total=1000, used=850, free=150) # 85%
|
|
# Call 6: After delete 2 -> 75% (Stop)
|
|
else:
|
|
return MagicMock(total=1000, used=750, free=250) # 75%
|
|
|
|
disk_cleaner.shutil.disk_usage = mock_disk_usage
|
|
|
|
# Mock Partitions -> Return our temp_dir as the ONLY partition
|
|
# disk_cleaner.os.path.exists = lambda p: True # REMOVED: Breaks verification!
|
|
# We simulate that the temp_dir IS the mountpoint
|
|
disk_cleaner.get_partitions = MagicMock(return_value=[('/dev/test', temp_dir)])
|
|
disk_cleaner.get_open_files_flat = MagicMock(return_value=[])
|
|
|
|
# We don't need to mock os.walk if we point it to a real dir!
|
|
# We don't need to mock os.stat if we point to a real dir!
|
|
# We just need to make sure disk_cleaner uses the REAL functions.
|
|
# BUT, previous test run might have left disk_cleaner.os.walk mocked?
|
|
# Since we just import disk_cleaner, if it was modified in memory by previous run...
|
|
# Wait, this is a script execution. `python test_driver.py`. Fresh process.
|
|
# So disk_cleaner is clean.
|
|
|
|
# However, we DO need os.stat to behave well for the device check.
|
|
# os.stat(temp_dir).st_dev should equal os.stat(root).st_dev.
|
|
# Since they are real directories, this works natively!
|
|
|
|
# 3. Run Cleaner
|
|
print("Running check_and_clean()...")
|
|
disk_cleaner.check_and_clean()
|
|
|
|
# 4. Verify
|
|
# Rotated logs are:
|
|
# log.1 (newest)
|
|
# log.2
|
|
# log.3 (oldest)
|
|
|
|
if not os.path.exists(rotated_logs[2]):
|
|
print("SUCCESS: Oldest log (log.3) was deleted.")
|
|
else:
|
|
print("FAILURE: Oldest log (log.3) still exists.")
|
|
|
|
if not os.path.exists(rotated_logs[1]):
|
|
print("SUCCESS: 2nd Oldest log (log.2) was deleted.")
|
|
else:
|
|
print("FAILURE: 2nd Oldest log (log.2) still exists.")
|
|
|
|
if os.path.exists(rotated_logs[0]):
|
|
print("SUCCESS: Newest log (log.1) was PRESERVED.")
|
|
else:
|
|
print("FAILURE: Newest log (log.1) was unexpectedly deleted.")
|
|
|
|
if __name__ == "__main__":
|
|
run_test()
|