import os import sys import shutil import smtplib import re import socket import glob from email.message import EmailMessage import json import time # --- Configuration --- THRESHOLD_PERCENT = 95.0 WARNING_THRESHOLD_PERCENT = 80.0 # Regex to match somewhat standard log timestamps (e.g. YYYY-MM-DD, MMM DD, ISO8601) LOG_TIMESTAMP_REGEX = r'(\d{4}-\d{2}-\d{2}|\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})' # Email settings - placeholders SMTP_SERVER = "127.0.0.1" SMTP_PORT = 25 EMAIL_FROM = "diskcleaner@example.com" EMAIL_TO = ["admins@example.com"] STATE_FILE = "/tmp/disk_cleaner_state.json" RATE_LIMIT_SECONDS = 8 * 3600 def load_state(): try: if os.path.exists(STATE_FILE): with open(STATE_FILE, 'r') as f: return json.load(f) except Exception as e: print(f"Warning: Could not load state file: {e}") return {} def save_state(state): try: with open(STATE_FILE, 'w') as f: json.dump(state, f) except Exception as e: print(f"Warning: Could not save state file: {e}") def should_send_email(mountpoint, state): """ Returns True if we should send an email for this mountpoint. Checks against the 8-hour cooldown. """ last_sent = state.get(mountpoint, 0) if time.time() - last_sent < RATE_LIMIT_SECONDS: return False return True def record_email_sent(mountpoint, state): state[mountpoint] = time.time() save_state(state) def send_email(subject, body, mountpoint=None): # If mountpoint is provided, check rate limit if mountpoint: state = load_state() if not should_send_email(mountpoint, state): print(f"Rate limit active for {mountpoint}. Suppressing email: {subject}") return msg = EmailMessage() msg.set_content(body) msg['Subject'] = subject msg['From'] = EMAIL_FROM msg['To'] = ", ".join(EMAIL_TO) try: # In a real scenario, might need login/auth with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as s: s.send_message(msg) print(f"Email sent: {subject}") if mountpoint: record_email_sent(mountpoint, state) except Exception as e: print(f"Failed to send email: {e}") def get_partitions(): """ Parses /proc/mounts to get list of mounted filesystems. Returns list of (device, mountpoint, fstype). """ partitions = [] if not os.path.exists('/proc/mounts'): # Fallback for non-Linux or testing environments without /proc mocks return [] try: with open('/proc/mounts', 'r') as f: for line in f: parts = line.strip().split() if len(parts) >= 3: device, mountpoint, fstype = parts[0], parts[1], parts[2] # Filter out pseudo-filesystems if fstype not in ('proc', 'sysfs', 'devtmpfs', 'devpts', 'tmpfs', 'cgroup', 'squashfs'): # rudimentary check: usually we want physical devices or LVM if device.startswith('/dev/'): partitions.append((device, mountpoint)) except Exception as e: print(f"Error reading /proc/mounts: {e}") return partitions def get_process_name_pid(pid): """ Reads /proc/[pid]/comm or cmdline to get process name. """ try: with open(f'/proc/{pid}/comm', 'r') as f: return f.read().strip() except: return "unknown" def get_open_files_flat(): """ Walks /proc to find all open files. Returns a list of dicts: {'path': str, 'pid': int, 'size': int} """ open_files = [] # Iterate over all PIDs in /proc if not os.path.exists('/proc'): return [] for pid_dir in os.listdir('/proc'): if not pid_dir.isdigit(): continue pid = int(pid_dir) fd_dir = f'/proc/{pid}/fd' try: # os.listdir might fail if process vanishes fds = os.listdir(fd_dir) except (FileNotFoundError, PermissionError): continue for fd in fds: try: # Resolve the symlink to get the real file path link_path = os.path.join(fd_dir, fd) real_path = os.readlink(link_path) # Check if it's a regular file (not a pipe/socket) if real_path.startswith('/') and os.path.isfile(real_path): # Get size size = os.path.getsize(real_path) open_files.append({ 'path': real_path, 'pid': pid, 'size': size }) except (OSError, FileNotFoundError): continue return open_files def is_log_file(file_path): """ Intelligent check: 1. 'log' in path (case insensitive) 2. Read first chunk, look for timestamp-like patterns. """ if "log" not in file_path.lower(): return False, "Filename does not contain 'log'" try: with open(file_path, 'r', errors='ignore') as f: chunk = f.read(4096) if re.search(LOG_TIMESTAMP_REGEX, chunk): return True, "Found timestamps" else: return False, "No timestamps found in header" except Exception as e: return False, f"Read error: {e}" def shrink_file_inplace(file_path): """ Removes the first 50% of the file data in-place. """ try: file_size = os.path.getsize(file_path) if file_size == 0: return False, "File is empty" midpoint = file_size // 2 chunk_size = 1024 * 1024 * 10 # 10MB chunks print(f"Shrinking {file_path} ({file_size} bytes). Removing first {midpoint} bytes.") with open(file_path, "r+b") as f: read_pos = midpoint write_pos = 0 while read_pos < file_size: f.seek(read_pos) data = f.read(chunk_size) bytes_read = len(data) if bytes_read == 0: break f.seek(write_pos) f.write(data) read_pos += bytes_read write_pos += bytes_read f.truncate(write_pos) print(f"Successfully shrunk {file_path} to {write_pos} bytes.") return True, f"Removed first {midpoint} bytes. New size: {write_pos}" except Exception as e: return False, f"Error shrinking file: {e}" def is_rotated_log(filename): """ Checks if a filename looks like a rotated log. Common patterns: - .gz, .zip, .tar, .bz2 - .old, .bak - .log.1, .log.2, ... - .log-20240101, ... """ # Simple extensions if filename.lower().endswith(('.gz', '.zip', '.tar', '.bz2', '.old', '.bak')): return True # Numeric suffixes (.1, .2, etc) if re.search(r'\.log\.\d+$', filename, re.IGNORECASE): return True # Date suffixes (log-YYYYMMDD, etc) # This is a bit loose, be careful not to match everything. # Look for 8 digits at end or near end? if re.search(r'[-_.]\d{8}([-_.]|$)', filename): return True return False def find_rotated_logs(mountpoint): """ Walks the mountpoint to find rotated logs. Returns list of (path, size, mtime). """ candidates = [] print(f"Scanning {mountpoint} for rotated logs...") try: mount_dev = os.stat(mountpoint).st_dev except OSError: return [] for root, dirs, files in os.walk(mountpoint): # Don't cross filesystems try: if os.stat(root).st_dev != mount_dev: # Remove subdirs from traversal to prevent descending dirs[:] = [] continue except OSError: continue for file in files: if is_rotated_log(file): full_path = os.path.join(root, file) try: stats = os.stat(full_path) candidates.append((full_path, stats.st_size, stats.st_mtime)) except OSError: pass # Sort old -> new candidates.sort(key=lambda x: x[2]) return candidates def check_disk_usage_percent(mountpoint): try: usage = shutil.disk_usage(mountpoint) return (usage.used / usage.total) * 100 except OSError: return 100.0 def cleanup_rotated_logs(mountpoint, hostname): """ Deletes oldest rotated logs until usage < 80%. """ candidates = find_rotated_logs(mountpoint) deleted_count = 0 deleted_bytes = 0 deleted_files = [] current_usage = check_disk_usage_percent(mountpoint) for path, size, mtime in candidates: if current_usage <= THRESHOLD_PERCENT: break print(f"Deleting old rotated log: {path} ({size} bytes, mtime: {mtime})") try: os.remove(path) deleted_count += 1 deleted_bytes += size # Re-check usage current_usage = check_disk_usage_percent(mountpoint) deleted_files.append(os.path.basename(path)) except OSError as e: print(f"Failed to delete {path}: {e}") if deleted_count > 0: subject = f"URGENT: Rotated Log Cleanup - {hostname} - {mountpoint}" # Truncate list if too long file_list_str = ", ".join(deleted_files[:10]) if len(deleted_files) > 10: file_list_str += f" and {len(deleted_files)-10} others" body = (f"Volume {mountpoint} was full.\n" f"Action: Deleted {deleted_count} old rotated log files.\n" f"Total freed: {deleted_bytes / 1024 / 1024:.2f} MB.\n" f"Files: {file_list_str}\n" f"Current Usage: {current_usage:.1f}%") send_email(subject, body, mountpoint) return True return False def check_and_clean(): if os.name == 'nt': print("Note: This script is designed for Linux (/proc). Windows execution will miss process data.") hostname = socket.gethostname() # 1. Get Partitions partitions = get_partitions() if not partitions: print("No partitions found via /proc/mounts. (Are you on Windows?)") partitions = [('/dev/root', '/')] # 2. Identify Metadata critical_partitions = [] warning_partitions = [] for device, mountpoint in partitions: try: percent = check_disk_usage_percent(mountpoint) except OSError: continue if percent > THRESHOLD_PERCENT: print(f"CRITICAL: Volume {mountpoint} ({device}) is at {percent:.1f}% usage.") critical_partitions.append(mountpoint) elif percent > WARNING_THRESHOLD_PERCENT: print(f"WARNING: Volume {mountpoint} ({device}) is at {percent:.1f}% usage.") warning_partitions.append(mountpoint) if not critical_partitions and not warning_partitions: print("All volumes are healthy.") return # 3. Found partitions. Now scan processes. print("High usage detected. Scanning /proc for open files...") all_open_files = get_open_files_flat() # --- PROCESS CRITICAL --- for mountpoint in critical_partitions: current_percent = check_disk_usage_percent(mountpoint) if current_percent <= THRESHOLD_PERCENT: continue # Strategy A: Shrink Open Files candidates = [] for file_info in all_open_files: path = file_info['path'] if mountpoint == '/': try: if os.stat(path).st_dev == os.stat(mountpoint).st_dev: candidates.append(file_info) except OSError: pass else: if path.startswith(mountpoint): candidates.append(file_info) candidates.sort(key=lambda x: x['size'], reverse=True) shrunk_something = False for candidate in candidates: path = candidate['path'] size = candidate['size'] is_log, reason = is_log_file(path) if is_log: pid = candidate['pid'] proc_name = get_process_name_pid(pid) print(f"Found candidate: {path} ({size} bytes), held by {proc_name} (PID {pid})") success, msg = shrink_file_inplace(path) if success: subject = f"URGENT: Disk Cleanup Action - {hostname} - {mountpoint}" body = (f"Volume {mountpoint} was >{THRESHOLD_PERCENT}%.\n" f"Identified large log file: {path}\n" f"Process holding file: {proc_name} (PID {pid})\n" f"Action: {msg}\n") send_email(subject, body, mountpoint) shrunk_something = True break # Re-evaluate usage # Check if Strategy A was enough if check_disk_usage_percent(mountpoint) <= THRESHOLD_PERCENT: print(f"Volume {mountpoint} is now safe.") continue # Strategy B: Rotated Logs Fallback print(f"Active log shrinking insufficient or unavailable. Checking for rotated logs on {mountpoint}...") cleanup_success = cleanup_rotated_logs(mountpoint, hostname) if not cleanup_success and not shrunk_something: print(f"No suitable log file found to clean on {mountpoint}.") subject = f"CRITICAL: Disk Full - {hostname} - {mountpoint}" suspected_culprit = "Unknown" if candidates: top_cand = candidates[0] suspected_culprit = f"{top_cand['path']} ({top_cand['size'] / 1024 / 1024:.2f} MB)" body = (f"Volume {mountpoint} is >{THRESHOLD_PERCENT}%.\n" f"Could not find any suitable open log files or rotated logs to clean automatically.\n" f"Suspected largest open file: {suspected_culprit}\n" f"Usage is still {check_disk_usage_percent(mountpoint):.1f}%.") send_email(subject, body, mountpoint) # --- PROCESS WARNINGS --- for mountpoint in warning_partitions: # Find culprits but DO NOT TOUCH candidates = [] for file_info in all_open_files: path = file_info['path'] if mountpoint == '/': try: if os.stat(path).st_dev == os.stat(mountpoint).st_dev: candidates.append(file_info) except OSError: pass else: if path.startswith(mountpoint): candidates.append(file_info) candidates.sort(key=lambda x: x['size'], reverse=True) suspected_culprit = "Unknown" if candidates: top_cand = candidates[0] suspected_culprit = f"{top_cand['path']} ({top_cand['size'] / 1024 / 1024:.2f} MB)" subject = f"WARNING: Disk Usage High - {hostname} - {mountpoint}" body = (f"Volume {mountpoint} is >{WARNING_THRESHOLD_PERCENT}% (Current: {check_disk_usage_percent(mountpoint):.1f}%).\n" f"Threshold for automatic cleanup is {THRESHOLD_PERCENT}%.\n" f"Suspected largest open file: {suspected_culprit}\n" f"Please investigate.") send_email(subject, body, mountpoint) if __name__ == "__main__": check_and_clean()