#!/usr/bin/env python3 """ PVC Backup Script for Freeleaps Production Environment Creates snapshots for specified PVCs and monitors their status """ import os import sys import yaml import time import logging from datetime import datetime, timezone, timedelta from kubernetes import client, config from kubernetes.client.rest import ApiException # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) class PVCBackupManager: def __init__(self): """Initialize the backup manager with Kubernetes client""" try: # Load in-cluster config when running in Kubernetes config.load_incluster_config() logger.info("Loaded in-cluster Kubernetes configuration") except config.ConfigException: # Fallback to kubeconfig for local development try: config.load_kube_config() logger.info("Loaded kubeconfig for local development") except config.ConfigException: logger.error("Failed to load Kubernetes configuration") sys.exit(1) self.api_client = client.ApiClient() self.snapshot_api = client.CustomObjectsApi(self.api_client) self.core_api = client.CoreV1Api(self.api_client) # Backup configuration self.namespace = os.getenv("BACKUP_NAMESPACE", "freeleaps-prod") self.pvcs_to_backup = [ "gitea-shared-storage", "data-freeleaps-prod-gitea-postgresql-ha-postgresql-0" ] self.snapshot_class = os.getenv("SNAPSHOT_CLASS", "csi-azuredisk-vsc") self.timeout = int(os.getenv("TIMEOUT", "300")) def get_pst_date(self): """Get current date in PST timezone (UTC-8)""" pst_tz = timezone(timedelta(hours=-8)) return datetime.now(pst_tz).strftime("%Y%m%d") def generate_snapshot_name(self, pvc_name, timestamp): """Generate snapshot name with timestamp""" return f"{pvc_name}-snapshot-{timestamp}" def create_snapshot_yaml(self, pvc_name, snapshot_name): """Create VolumeSnapshot YAML configuration""" snapshot_yaml = { "apiVersion": "snapshot.storage.k8s.io/v1", "kind": "VolumeSnapshot", "metadata": { "name": snapshot_name, "namespace": self.namespace }, "spec": { "volumeSnapshotClassName": self.snapshot_class, "source": { "persistentVolumeClaimName": pvc_name } } } return snapshot_yaml def apply_snapshot(self, snapshot_yaml): """Apply snapshot to Kubernetes cluster""" try: logger.info(f"Creating snapshot: {snapshot_yaml['metadata']['name']}") # Create the snapshot result = self.snapshot_api.create_namespaced_custom_object( group="snapshot.storage.k8s.io", version="v1", namespace=self.namespace, plural="volumesnapshots", body=snapshot_yaml ) logger.info(f"Successfully created snapshot: {result['metadata']['name']}") return result except ApiException as e: logger.error(f"Failed to create snapshot: {e}") return None def wait_for_snapshot_ready(self, snapshot_name, timeout=None): if timeout is None: timeout = self.timeout """Wait for snapshot to be ready with timeout""" logger.info(f"Waiting for snapshot {snapshot_name} to be ready...") start_time = time.time() while time.time() - start_time < timeout: try: # Get snapshot status snapshot = self.snapshot_api.get_namespaced_custom_object( group="snapshot.storage.k8s.io", version="v1", namespace=self.namespace, plural="volumesnapshots", name=snapshot_name ) # Check if snapshot is ready if snapshot.get('status', {}).get('readyToUse', False): logger.info(f"Snapshot {snapshot_name} is ready!") return True # Check for error conditions error = snapshot.get('status', {}).get('error', {}) if error: logger.error(f"Snapshot {snapshot_name} failed: {error}") return False logger.info(f"Snapshot {snapshot_name} still processing...") time.sleep(10) except ApiException as e: logger.error(f"Error checking snapshot status: {e}") return False logger.error(f"Timeout waiting for snapshot {snapshot_name} to be ready") return False def verify_pvc_exists(self, pvc_name): """Verify that PVC exists in the namespace""" try: pvc = self.core_api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace=self.namespace ) logger.info(f"Found PVC: {pvc_name}") return True except ApiException as e: if e.status == 404: logger.error(f"PVC {pvc_name} not found in namespace {self.namespace}") else: logger.error(f"Error checking PVC {pvc_name}: {e}") return False def run_backup(self): """Main backup process""" logger.info("Starting PVC backup process...") timestamp = self.get_pst_date() successful_backups = [] failed_backups = [] for pvc_name in self.pvcs_to_backup: logger.info(f"Processing PVC: {pvc_name}") # Verify PVC exists if not self.verify_pvc_exists(pvc_name): failed_backups.append(pvc_name) continue # Generate snapshot name snapshot_name = self.generate_snapshot_name(pvc_name, timestamp) # Create snapshot YAML snapshot_yaml = self.create_snapshot_yaml(pvc_name, snapshot_name) # Apply snapshot result = self.apply_snapshot(snapshot_yaml) if not result: failed_backups.append(pvc_name) continue # Wait for snapshot to be ready if self.wait_for_snapshot_ready(snapshot_name): successful_backups.append(pvc_name) logger.info(f"Backup completed successfully for PVC: {pvc_name}") else: failed_backups.append(pvc_name) logger.error(f"Backup failed for PVC: {pvc_name}") # Summary logger.info("=== Backup Summary ===") logger.info(f"Successful backups: {len(successful_backups)}") logger.info(f"Failed backups: {len(failed_backups)}") if successful_backups: logger.info(f"Successfully backed up: {', '.join(successful_backups)}") if failed_backups: logger.error(f"Failed to backup: {', '.join(failed_backups)}") return False logger.info("All backups completed successfully!") return True def main(): """Main entry point""" try: backup_manager = PVCBackupManager() success = backup_manager.run_backup() if success: logger.info("Backup job completed successfully") sys.exit(0) else: logger.error("Backup job completed with errors") sys.exit(1) except Exception as e: logger.error(f"Unexpected error: {e}") sys.exit(1) if __name__ == "__main__": main()