Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions miniupdate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
email credentials and inventory paths.
"""

import toml
import os
from pathlib import Path
from typing import Dict, Any, Optional, List
import os

import toml


class Config:
Expand Down Expand Up @@ -55,7 +56,7 @@ def _load_config(self) -> Dict[str, Any]:
with open(self.config_path, "r", encoding="utf-8") as f:
return toml.load(f)
except Exception as e:
raise ValueError(f"Error parsing configuration file: {e}")
raise ValueError(f"Error parsing configuration file: {e}") from e

@property
def smtp_config(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -178,7 +179,8 @@ def create_example_config(path: str = "config.toml.example") -> None:
"snapshot_name_prefix": "pre-update",
"cleanup_snapshots": True,
"snapshot_retention_days": 7,
"opt_out_hosts": [], # List of hosts to exclude from automatic updates (check-only mode)
# List of hosts to exclude from automatic updates (check-only mode)
"opt_out_hosts": [],
},
}

Expand Down
44 changes: 25 additions & 19 deletions miniupdate/host_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
Provides utilities to check if hosts are reachable via ping and SSH.
"""

import subprocess
import logging
import subprocess
import time
from typing import Optional
from .ssh_manager import SSHManager

from .inventory import Host
from .ssh_manager import SSHManager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,7 +40,9 @@ def ping_host(self, hostname: str, timeout: int = 5) -> bool:
try:
# Use ping command with timeout
cmd = ["ping", "-c", "1", "-W", str(timeout), hostname]
result = subprocess.run(cmd, capture_output=True, timeout=timeout + 2)
result = subprocess.run(
cmd, capture_output=True, timeout=timeout + 2, check=False
)
return result.returncode == 0
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
return False
Expand All @@ -65,7 +67,9 @@ def wait_for_host_availability(
True if host becomes available, False if timeout
"""
logger.info(
f"Waiting for {host.name} to become available (timeout: {max_wait_time}s)"
"Waiting for %s to become available (timeout: %ss)",
host.name,
max_wait_time,
)

start_time = time.time()
Expand All @@ -76,34 +80,36 @@ def wait_for_host_availability(
elapsed = int(time.time() - start_time)

logger.debug(
f"Checking {host.name} availability - attempt {attempt} ({elapsed}s elapsed)"
"Checking %s availability - attempt %s (%ss elapsed)",
host.name,
attempt,
elapsed,
)

# First check ping
if not self.ping_host(host.hostname):
logger.debug(f"{host.name} not responding to ping")
logger.debug("%s not responding to ping", host.name)
time.sleep(check_interval)
continue

logger.debug(f"{host.name} responding to ping")
logger.debug("%s responding to ping", host.name)

# If SSH check is requested, verify SSH connectivity
if use_ssh:
if self._check_ssh_connectivity(host):
logger.info(
f"{host.name} is available (ping + SSH) after {elapsed}s"
"%s is available (ping + SSH) after %ss", host.name, elapsed
)
return True
else:
logger.debug(f"{host.name} ping OK but SSH not ready")
logger.debug("%s ping OK but SSH not ready", host.name)
else:
logger.info(f"{host.name} is available (ping only) after {elapsed}s")
logger.info("%s is available (ping only) after %ss", host.name, elapsed)
return True

time.sleep(check_interval)

elapsed = int(time.time() - start_time)
logger.warning(f"{host.name} did not become available within {elapsed}s")
logger.warning("%s did not become available within %ss", host.name, elapsed)
return False

def _check_ssh_connectivity(self, host: Host) -> bool:
Expand All @@ -125,7 +131,7 @@ def _check_ssh_connectivity(self, host: Host) -> bool:
return exit_code == 0
return False
except Exception as e:
logger.debug(f"SSH connectivity check failed for {host.name}: {e}")
logger.debug("SSH connectivity check failed for %s: %s", host.name, e)
return False

def reboot_host_via_ssh(self, host: Host, timeout: int = 30) -> bool:
Expand All @@ -143,21 +149,21 @@ def reboot_host_via_ssh(self, host: Host, timeout: int = 30) -> bool:
with SSHManager(self.ssh_config) as ssh_manager:
connection = ssh_manager.connect_to_host(host, timeout=timeout)
if not connection:
logger.error(f"Failed to connect to {host.name} for reboot")
logger.error("Failed to connect to %s for reboot", host.name)
return False

logger.info(f"Sending reboot command to {host.name}")
logger.info("Sending reboot command to %s", host.name)

# Send reboot command (don't wait for response as connection will drop)
exit_code, stdout, stderr = connection.execute_command(
_exit_code, _stdout, _stderr = connection.execute_command(
"shutdown -r now || reboot",
timeout=5, # Short timeout as system will reboot
)

# Command may not return exit code due to immediate reboot
logger.info(f"Reboot command sent to {host.name}")
logger.info("Reboot command sent to %s", host.name)
return True

except Exception as e:
logger.error(f"Failed to reboot {host.name} via SSH: {e}")
logger.error("Failed to reboot %s via SSH: %s", host.name, e)
return False
25 changes: 12 additions & 13 deletions miniupdate/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
Supports parsing YAML and INI format Ansible inventory files.
"""

import yaml
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
import configparser
import logging

import yaml


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -47,26 +47,25 @@ def parse(self) -> List[Host]:
"""Parse inventory file and return list of hosts."""
if self.inventory_path.suffix.lower() in [".yml", ".yaml"]:
return self._parse_yaml()
elif self.inventory_path.suffix.lower() in [
if self.inventory_path.suffix.lower() in [
".ini",
".cfg",
"",
] or self.inventory_path.name in ["hosts", "inventory"]:
return self._parse_ini()
else:
# Try YAML first, then INI
try:
return self._parse_yaml()
except Exception:
return self._parse_ini()
# Try YAML first, then INI
try:
return self._parse_yaml()
except Exception:
return self._parse_ini()

def _parse_yaml(self) -> List[Host]:
"""Parse YAML format inventory."""
try:
with open(self.inventory_path, "r", encoding="utf-8") as f:
inventory = yaml.safe_load(f)
except Exception as e:
raise ValueError(f"Error parsing YAML inventory: {e}")
raise ValueError(f"Error parsing YAML inventory: {e}") from e

hosts = []

Expand All @@ -80,7 +79,7 @@ def _parse_yaml(self) -> List[Host]:
if "hosts" in all_section:
hosts.extend(self._parse_yaml_hosts(all_section["hosts"]))
if "children" in all_section:
for group_name, group_data in all_section["children"].items():
for _group_name, group_data in all_section["children"].items():
if "hosts" in group_data:
hosts.extend(self._parse_yaml_hosts(group_data["hosts"]))
else:
Expand Down Expand Up @@ -122,7 +121,7 @@ def _parse_ini(self) -> List[Host]:
with open(self.inventory_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as e:
raise ValueError(f"Error reading INI inventory: {e}")
raise ValueError(f"Error reading INI inventory: {e}") from e

# Split into lines and process
lines = content.split("\n")
Expand Down
49 changes: 24 additions & 25 deletions miniupdate/os_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
Detects the operating system and distribution of remote hosts.
"""

import re
import logging
from typing import Optional, Dict, Any, Tuple
from typing import Optional, Dict, Tuple

from .ssh_manager import SSHConnection


Expand Down Expand Up @@ -112,16 +112,16 @@ def detect_os(self) -> Optional[OSInfo]:
architecture=architecture,
)

logger.info(f"Detected OS on {self.connection.host.name}: {os_info}")
logger.info("Detected OS on %s: %s", self.connection.host.name, os_info)
return os_info

except Exception as e:
logger.error(f"Failed to detect OS on {self.connection.host.name}: {e}")
logger.error("Failed to detect OS on %s: %s", self.connection.host.name, e)
return None

def _get_uname_info(self) -> Dict[str, str]:
"""Get uname information."""
exit_code, stdout, stderr = self.connection.execute_command("uname -a")
exit_code, stdout, _stderr = self.connection.execute_command("uname -a")
if exit_code != 0:
return {}

Expand All @@ -139,7 +139,7 @@ def _get_uname_info(self) -> Dict[str, str]:

def _get_os_release_info(self) -> Dict[str, str]:
"""Get information from /etc/os-release."""
exit_code, stdout, stderr = self.connection.execute_command(
exit_code, stdout, _stderr = self.connection.execute_command(
"cat /etc/os-release 2>/dev/null || true"
)
if exit_code != 0 or not stdout.strip():
Expand All @@ -157,7 +157,7 @@ def _get_os_release_info(self) -> Dict[str, str]:

def _get_lsb_info(self) -> Dict[str, str]:
"""Get LSB information."""
exit_code, stdout, stderr = self.connection.execute_command(
exit_code, stdout, _stderr = self.connection.execute_command(
"lsb_release -a 2>/dev/null || true"
)
if exit_code != 0 or not stdout.strip():
Expand Down Expand Up @@ -237,33 +237,33 @@ def _normalize_distribution_name(self, distribution: str) -> str:
# Handle common variations
if "red hat" in distribution or "redhat" in distribution:
return "rhel"
elif "centos" in distribution:
if "centos" in distribution:
return "centos"
elif "ubuntu" in distribution:
if "ubuntu" in distribution:
return "ubuntu"
elif (
if (
"linuxmint" in distribution
or "linux mint" in distribution
or distribution == "mint"
):
return "linuxmint"
elif "debian" in distribution:
if "debian" in distribution:
return "debian"
elif "fedora" in distribution:
if "fedora" in distribution:
return "fedora"
elif "opensuse" in distribution or "suse" in distribution:
if "opensuse" in distribution or "suse" in distribution:
return "opensuse"
elif "arch" in distribution:
if "arch" in distribution:
return "arch"
elif "manjaro" in distribution:
if "manjaro" in distribution:
return "manjaro"
elif "alpine" in distribution:
if "alpine" in distribution:
return "alpine"
elif "freebsd" in distribution:
if "freebsd" in distribution:
return "freebsd"
elif "openbsd" in distribution:
if "openbsd" in distribution:
return "openbsd"
elif "darwin" in distribution or "macos" in distribution:
if "darwin" in distribution or "macos" in distribution:
return "macos"

return distribution
Expand All @@ -278,7 +278,7 @@ def _detect_package_manager(self, distribution: str) -> str:
return default_pm

# Fallback: check for available package managers
for pm_name, commands in self.PACKAGE_MANAGERS.items():
for pm_name, _commands in self.PACKAGE_MANAGERS.items():
if self._check_package_manager_exists(pm_name):
return pm_name

Expand All @@ -303,13 +303,12 @@ def _get_architecture(self, uname_info: Dict[str, str]) -> str:
# Normalize common architectures
if arch in ["x86_64", "amd64"]:
return "x86_64"
elif arch in ["i386", "i686"]:
if arch in ["i386", "i686"]:
return "i386"
elif arch.startswith("arm"):
if arch.startswith("arm"):
return "arm"
elif arch.startswith("aarch64"):
if arch.startswith("aarch64"):
return "arm64"
else:
return arch
return arch

return "unknown"
Loading