""" Jenkins service Provides interaction with Jenkins """ import aiohttp import structlog from typing import Optional, Dict, Any from app.config import get_settings logger = structlog.get_logger() class JenkinsService: """Jenkins service class""" def __init__(self): self.settings = get_settings() self.base_url = self.settings.jenkins.url self.username = self.settings.jenkins.username self.token = self.settings.jenkins.token self.timeout = self.settings.jenkins.timeout async def test_connection(self) -> bool: """Test Jenkins connection""" try: async with aiohttp.ClientSession() as session: auth = aiohttp.BasicAuth(self.username, self.token) async with session.get( f"{self.base_url}/api/json", auth=auth, timeout=aiohttp.ClientTimeout(total=self.timeout) ) as response: if response.status == 200: logger.info("Jenkins connection test successful") return True else: logger.warning(f"Jenkins connection test failed with status {response.status}") return False except Exception as e: logger.error(f"Jenkins connection test failed: {str(e)}") return False async def trigger_job(self, job_name: str, parameters: Optional[Dict[str, Any]] = None) -> bool: """Trigger Jenkins job""" try: async with aiohttp.ClientSession() as session: auth = aiohttp.BasicAuth(self.username, self.token) # Build request URL url = f"{self.base_url}/job/{job_name}/build" # If parameters, use parameterized build if parameters: url = f"{self.base_url}/job/{job_name}/buildWithParameters" async with session.post( url, auth=auth, params=parameters or {}, timeout=aiohttp.ClientTimeout(total=self.timeout) ) as response: if response.status in [200, 201]: logger.info(f"Successfully triggered Jenkins job: {job_name}") return True else: logger.error(f"Failed to trigger Jenkins job {job_name}: {response.status}") return False except Exception as e: logger.error(f"Error triggering Jenkins job {job_name}: {str(e)}") return False async def get_job_info(self, job_name: str) -> Optional[Dict[str, Any]]: """Get job info""" try: async with aiohttp.ClientSession() as session: auth = aiohttp.BasicAuth(self.username, self.token) async with session.get( f"{self.base_url}/job/{job_name}/api/json", auth=auth, timeout=aiohttp.ClientTimeout(total=self.timeout) ) as response: if response.status == 200: return await response.json() else: logger.warning(f"Failed to get job info for {job_name}: {response.status}") return None except Exception as e: logger.error(f"Error getting job info for {job_name}: {str(e)}") return None # Global service instance _jenkins_service = None def get_jenkins_service() -> JenkinsService: """Get Jenkins service instance""" global _jenkins_service if _jenkins_service is None: _jenkins_service = JenkinsService() return _jenkins_service