freeleaps-ops/apps/gitea-webhook-ambassador/main.go
zhenyus 70c4951bdb feat: add event cleanup configuration and functionality
Signed-off-by: zhenyus <zhenyus@mathmast.com>
2025-05-11 22:33:47 +08:00

777 lines
21 KiB
Go

package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/go-playground/validator/v10"
"github.com/panjf2000/ants/v2"
"gopkg.in/yaml.v2"
)
// Configuration holds application configuration
type Configuration struct {
Server struct {
Port int `yaml:"port" validate:"required,gt=0"`
WebhookPath string `yaml:"webhookPath" validate:"required"`
SecretHeader string `yaml:"secretHeader" default:"Authorization"`
SecretKey string `yaml:"secretKey"`
} `yaml:"server"`
Jenkins struct {
URL string `yaml:"url" validate:"required,url"`
Username string `yaml:"username"`
Token string `yaml:"token"`
Timeout int `yaml:"timeout" default:"30"`
} `yaml:"jenkins"`
Gitea struct {
SecretToken string `yaml:"secretToken"`
Projects map[string]ProjectConfig `yaml:"projects" validate:"required"` // repo name -> project config
} `yaml:"gitea"`
Logging struct {
Level string `yaml:"level" default:"info" validate:"oneof=debug info warn error"`
Format string `yaml:"format" default:"text" validate:"oneof=text json"`
File string `yaml:"file"`
} `yaml:"logging"`
Worker struct {
PoolSize int `yaml:"poolSize" default:"10" validate:"gt=0"`
QueueSize int `yaml:"queueSize" default:"100" validate:"gt=0"`
MaxRetries int `yaml:"maxRetries" default:"3" validate:"gte=0"`
RetryBackoff int `yaml:"retryBackoff" default:"1" validate:"gt=0"` // seconds
} `yaml:"worker"`
EventCleanup struct {
Interval int `yaml:"interval" default:"3600"` // seconds
ExpireAfter int `yaml:"expireAfter" default:"7200"` // seconds
} `yaml:"eventCleanup"`
}
// ProjectConfig represents the configuration for a specific repository
type ProjectConfig struct {
DefaultJob string `yaml:"defaultJob"` // Default Jenkins job to trigger
BranchJobs map[string]string `yaml:"branchJobs,omitempty"` // Branch-specific jobs
BranchPatterns []BranchPattern `yaml:"branchPatterns,omitempty"`
}
// BranchPattern defines a pattern-based branch to job mapping
type BranchPattern struct {
Pattern string `yaml:"pattern"` // Regex pattern for branch name
Job string `yaml:"job"` // Jenkins job to trigger
}
// GiteaWebhook represents the webhook payload from Gitea
type GiteaWebhook struct {
Secret string `json:"secret"`
Ref string `json:"ref"`
Before string `json:"before"`
After string `json:"after"`
CompareURL string `json:"compare_url"`
Commits []struct {
ID string `json:"id"`
Message string `json:"message"`
URL string `json:"url"`
Author struct {
Name string `json:"name"`
Email string `json:"email"`
Username string `json:"username"`
} `json:"author"`
} `json:"commits"`
Repository struct {
ID int `json:"id"`
Name string `json:"name"`
Owner struct {
ID int `json:"id"`
Login string `json:"login"`
FullName string `json:"full_name"`
} `json:"owner"`
FullName string `json:"full_name"`
Private bool `json:"private"`
CloneURL string `json:"clone_url"`
SSHURL string `json:"ssh_url"`
HTMLURL string `json:"html_url"`
DefaultBranch string `json:"default_branch"`
} `json:"repository"`
Pusher struct {
ID int `json:"id"`
Login string `json:"login"`
FullName string `json:"full_name"`
Email string `json:"email"`
} `json:"pusher"`
}
type jobRequest struct {
jobName string
parameters map[string]string
eventID string
attempts int
}
var (
configFile = flag.String("config", "config.yaml", "Path to configuration file")
config Configuration
configMutex sync.RWMutex
validate = validator.New()
jobQueue chan jobRequest
httpClient *http.Client
logger *log.Logger
workerPool *ants.PoolWithFunc
// For idempotency
processedEvents sync.Map
// For config reloading
watcher *fsnotify.Watcher
)
func main() {
flag.Parse()
// Initialize basic logger temporarily
logger = log.New(os.Stdout, "", log.LstdFlags)
logger.Println("Starting Gitea Webhook Ambassador...")
// Load initial configuration
if err := loadConfig(*configFile); err != nil {
logger.Fatalf("Failed to load configuration: %v", err)
}
// Configure proper logger based on configuration
setupLogger()
// Setup config file watcher for auto-reload
setupConfigWatcher(*configFile)
// Start event cleanup goroutine
go cleanupEvents()
// Configure HTTP client with timeout
configMutex.RLock()
httpClient = &http.Client{
Timeout: time.Duration(config.Jenkins.Timeout) * time.Second,
}
// Initialize job queue
jobQueue = make(chan jobRequest, config.Worker.QueueSize)
configMutex.RUnlock()
// Initialize worker pool
initWorkerPool()
// Configure webhook handler
http.HandleFunc(config.Server.WebhookPath, handleWebhook)
http.HandleFunc("/health", handleHealthCheck)
// Start HTTP server
serverAddr := fmt.Sprintf(":%d", config.Server.Port)
logger.Printf("Server listening on %s", serverAddr)
if err := http.ListenAndServe(serverAddr, nil); err != nil {
logger.Fatalf("HTTP server error: %v", err)
}
}
// setupLogger configures the logger based on application settings
func setupLogger() {
configMutex.RLock()
defer configMutex.RUnlock()
// Determine log output
var logOutput io.Writer = os.Stdout
if config.Logging.File != "" {
file, err := os.OpenFile(config.Logging.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
logger.Printf("Failed to open log file %s: %v, using stdout instead", config.Logging.File, err)
} else {
logOutput = file
// Create a multiwriter to also log to stdout for important messages
logOutput = io.MultiWriter(file, os.Stdout)
}
}
// Create new logger with proper format
var prefix string
var flags int
// Set log format based on configuration
if config.Logging.Format == "json" {
// For JSON logging, we'll handle formatting in the custom writer
prefix = ""
flags = 0
logOutput = &jsonLogWriter{out: logOutput}
} else {
// Text format with timestamp
prefix = ""
flags = log.LstdFlags | log.Lshortfile
}
// Create the new logger
logger = log.New(logOutput, prefix, flags)
// Log level will be checked in our custom log functions (not implemented here)
logger.Printf("Logger configured with level=%s, format=%s, output=%s",
config.Logging.Level,
config.Logging.Format,
func() string {
if config.Logging.File == "" {
return "stdout"
}
return config.Logging.File
}())
}
func setupConfigWatcher(configPath string) {
var err error
watcher, err = fsnotify.NewWatcher()
if err != nil {
logger.Fatalf("Failed to create file watcher: %v", err)
}
// Extract directory containing the config file
configDir := filepath.Dir(configPath)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
// Check if the config file was modified
if event.Op&fsnotify.Write == fsnotify.Write &&
filepath.Base(event.Name) == filepath.Base(configPath) {
logger.Printf("Config file modified, reloading configuration")
if err := reloadConfig(configPath); err != nil {
logger.Printf("Error reloading config: %v", err)
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
logger.Printf("Error watching config file: %v", err)
}
}
}()
// Start watching the directory containing the config file
err = watcher.Add(configDir)
if err != nil {
logger.Fatalf("Failed to watch config directory: %v", err)
}
logger.Printf("Watching config file for changes: %s", configPath)
}
func loadConfig(file string) error {
f, err := os.Open(file)
if err != nil {
return fmt.Errorf("cannot open config file: %v", err)
}
defer f.Close()
var newConfig Configuration
decoder := yaml.NewDecoder(f)
if err := decoder.Decode(&newConfig); err != nil {
return fmt.Errorf("cannot decode config: %v", err)
}
// Set defaults
if newConfig.Server.SecretHeader == "" {
newConfig.Server.SecretHeader = "X-Gitea-Signature"
}
if newConfig.Jenkins.Timeout == 0 {
newConfig.Jenkins.Timeout = 30
}
if newConfig.Worker.PoolSize == 0 {
newConfig.Worker.PoolSize = 10
}
if newConfig.Worker.QueueSize == 0 {
newConfig.Worker.QueueSize = 100
}
if newConfig.Worker.MaxRetries == 0 {
newConfig.Worker.MaxRetries = 3
}
if newConfig.Worker.RetryBackoff == 0 {
newConfig.Worker.RetryBackoff = 1
}
if newConfig.EventCleanup.Interval == 0 {
newConfig.EventCleanup.Interval = 3600
}
if newConfig.EventCleanup.ExpireAfter == 0 {
newConfig.EventCleanup.ExpireAfter = 7200
}
// Handle legacy configuration format (where Projects is map[string]string)
// This is to maintain backward compatibility with existing configs
if len(newConfig.Gitea.Projects) == 0 {
// Check if we're dealing with a legacy config
var legacyConfig struct {
Gitea struct {
Projects map[string]string `yaml:"projects"`
} `yaml:"gitea"`
}
// Reopen and reparse the file for legacy config
f.Seek(0, 0)
decoder = yaml.NewDecoder(f)
if err := decoder.Decode(&legacyConfig); err == nil && len(legacyConfig.Gitea.Projects) > 0 {
// Convert legacy config to new format
newConfig.Gitea.Projects = make(map[string]ProjectConfig)
for repo, jobName := range legacyConfig.Gitea.Projects {
newConfig.Gitea.Projects[repo] = ProjectConfig{
DefaultJob: jobName,
}
}
logWarn("Using legacy configuration format. Consider updating to new format.")
}
}
// Validate configuration
if err := validate.Struct(newConfig); err != nil {
return fmt.Errorf("invalid configuration: %v", err)
}
configMutex.Lock()
config = newConfig
configMutex.Unlock()
return nil
}
func reloadConfig(file string) error {
if err := loadConfig(file); err != nil {
return err
}
// Update logger configuration
setupLogger()
configMutex.RLock()
defer configMutex.RUnlock()
// Update HTTP client timeout
httpClient.Timeout = time.Duration(config.Jenkins.Timeout) * time.Second
// If worker pool size has changed, reinitialize worker pool
poolSize := workerPool.Cap()
if poolSize != config.Worker.PoolSize {
logger.Printf("Worker pool size changed from %d to %d, reinitializing",
poolSize, config.Worker.PoolSize)
// Must release the read lock before calling initWorkerPool which acquires a write lock
configMutex.RUnlock()
initWorkerPool()
configMutex.RLock()
}
// If queue size has changed, create a new channel and copy items
if cap(jobQueue) != config.Worker.QueueSize {
logger.Printf("Job queue size changed from %d to %d, recreating",
cap(jobQueue), config.Worker.QueueSize)
// Create new queue
newQueue := make(chan jobRequest, config.Worker.QueueSize)
// Close the current queue channel to stop accepting new items
close(jobQueue)
// Start a goroutine to drain the old queue and fill the new one
go func(oldQueue, newQueue chan jobRequest) {
for job := range oldQueue {
newQueue <- job
}
configMutex.Lock()
jobQueue = newQueue
configMutex.Unlock()
}(jobQueue, newQueue)
}
logger.Printf("Configuration reloaded successfully")
return nil
}
func initWorkerPool() {
configMutex.Lock()
defer configMutex.Unlock()
// Release existing pool if any
if workerPool != nil {
workerPool.Release()
}
var err error
workerPool, err = ants.NewPoolWithFunc(config.Worker.PoolSize, func(i interface{}) {
job := i.(jobRequest)
success := triggerJenkinsJob(job)
configMutex.RLock()
maxRetries := config.Worker.MaxRetries
retryBackoff := config.Worker.RetryBackoff
configMutex.RUnlock()
// If job failed but we haven't reached max retries
if !success && job.attempts < maxRetries {
job.attempts++
// Exponential backoff
backoff := time.Duration(retryBackoff<<uint(job.attempts-1)) * time.Second
time.Sleep(backoff)
configMutex.RLock()
select {
case jobQueue <- job:
logger.Printf("Retrying job %s (attempt %d/%d) after %v",
job.jobName, job.attempts, maxRetries, backoff)
default:
logger.Printf("Failed to queue retry for job %s: queue full", job.jobName)
}
configMutex.RUnlock()
}
})
if err != nil {
logger.Fatalf("Failed to initialize worker pool: %v", err)
}
logger.Printf("Worker pool initialized with %d workers", config.Worker.PoolSize)
// Start job queue processing
go processJobQueue()
}
func processJobQueue() {
for job := range jobQueue {
err := workerPool.Invoke(job)
if err != nil {
logger.Printf("Failed to process job: %v", err)
}
}
}
func handleHealthCheck(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
configMutex.RLock()
poolRunning := workerPool != nil
runningWorkers := workerPool.Running()
poolCap := workerPool.Cap()
queueSize := len(jobQueue)
queueCap := cap(jobQueue)
configMutex.RUnlock()
health := map[string]interface{}{
"status": "UP",
"time": time.Now().Format(time.RFC3339),
"workers": map[string]interface{}{
"running": poolRunning,
"active": runningWorkers,
"capacity": poolCap,
},
"queue": map[string]interface{}{
"size": queueSize,
"capacity": queueCap,
},
}
json.NewEncoder(w).Encode(health)
}
func handleWebhook(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Verify signature if secret token is set
configMutex.RLock()
secretHeader := config.Server.SecretHeader
serverSecretKey := config.Server.SecretKey
configMutex.RUnlock()
// If server secret key is set, use it as the secret token
receivedSecretKey := r.Header.Get(secretHeader)
if receivedSecretKey == "" {
http.Error(w, "Invalid server secret key", http.StatusUnauthorized)
logWarn("No secret key provided in header")
} else if receivedSecretKey != serverSecretKey {
http.Error(w, "Invalid server secret key", http.StatusUnauthorized)
logWarn("Invalid server secret key provided")
return
}
// Read and parse the webhook payload
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
logError("Failed to read webhook body: %v", err)
return
}
r.Body.Close()
var webhook GiteaWebhook
if err := json.Unmarshal(body, &webhook); err != nil {
http.Error(w, "Failed to parse webhook payload", http.StatusBadRequest)
logError("Failed to parse webhook payload: %v", err)
return
}
// Generate event ID for idempotency
eventID := webhook.Repository.FullName + "-" + webhook.After
// Check if we've already processed this event
if _, exists := processedEvents.Load(eventID); exists {
logInfo("Skipping already processed event: %s", eventID)
w.WriteHeader(http.StatusOK)
return
}
// Store in processed events with a TTL (we'll use a goroutine to remove after 1 hour)
processedEvents.Store(eventID, time.Now())
// Check if we have a Jenkins job mapping for this repository
configMutex.RLock()
projectConfig, exists := config.Gitea.Projects[webhook.Repository.FullName]
configMutex.RUnlock()
if !exists {
logInfo("No Jenkins job mapping for repository: %s", webhook.Repository.FullName)
w.WriteHeader(http.StatusOK) // Still return OK to not alarm Gitea
return
}
// Extract branch name from ref
branchName := strings.TrimPrefix(webhook.Ref, "refs/heads/")
// Determine which job to trigger based on branch name
jobName := determineJobName(projectConfig, branchName)
if jobName == "" {
logInfo("No job configured to trigger for repository %s, branch %s",
webhook.Repository.FullName, branchName)
w.WriteHeader(http.StatusOK)
return
}
// Prepare parameters for Jenkins job
params := map[string]string{
"BRANCH_NAME": branchName,
"COMMIT_SHA": webhook.After,
"REPOSITORY_URL": webhook.Repository.CloneURL,
"REPOSITORY_NAME": webhook.Repository.FullName,
"PUSHER_NAME": webhook.Pusher.Login,
"PUSHER_EMAIL": webhook.Pusher.Email,
}
// Queue the job for processing
configMutex.RLock()
select {
case jobQueue <- jobRequest{
jobName: jobName,
parameters: params,
eventID: eventID,
attempts: 0,
}:
logInfo("Webhook received and queued for repository %s, branch %s, commit %s, job %s",
webhook.Repository.FullName, branchName, webhook.After, jobName)
default:
logWarn("Failed to queue webhook: queue full")
http.Error(w, "Server busy, try again later", http.StatusServiceUnavailable)
configMutex.RUnlock()
return
}
configMutex.RUnlock()
w.WriteHeader(http.StatusAccepted)
}
// determineJobName selects the appropriate Jenkins job to trigger based on branch name
func determineJobName(config ProjectConfig, branchName string) string {
// First check for exact branch match
if jobName, ok := config.BranchJobs[branchName]; ok {
logDebug("Found exact branch match for %s: job %s", branchName, jobName)
return jobName
}
// Then check for pattern-based matches
for _, pattern := range config.BranchPatterns {
matched, err := regexp.MatchString(pattern.Pattern, branchName)
if err != nil {
logError("Error matching branch pattern %s: %v", pattern.Pattern, err)
continue
}
if matched {
logDebug("Branch %s matched pattern %s: job %s", branchName, pattern.Pattern, pattern.Job)
return pattern.Job
}
}
// Fall back to default job if available
if config.DefaultJob != "" {
logDebug("Using default job for branch %s: job %s", branchName, config.DefaultJob)
return config.DefaultJob
}
// No job found
logDebug("No job configured for branch %s", branchName)
return ""
}
func triggerJenkinsJob(job jobRequest) bool {
configMutex.RLock()
jenkinsBaseURL := strings.TrimSuffix(config.Jenkins.URL, "/")
jenkinsUser := config.Jenkins.Username
jenkinsToken := config.Jenkins.Token
configMutex.RUnlock()
// Handle Jenkins job paths correctly
// Jenkins jobs can be organized in folders, with proper URL format:
// /job/folder1/job/folder2/job/jobname
jobPath := job.jobName
// If job name contains slashes, format it properly for Jenkins URL
if strings.Contains(jobPath, "/") {
// Replace regular slashes with "/job/" for Jenkins URL format
parts := strings.Split(jobPath, "/")
jobPath = "job/" + strings.Join(parts, "/job/")
} else {
jobPath = "job/" + jobPath
}
jenkinsURL := fmt.Sprintf("%s/%s/build", jenkinsBaseURL, jobPath)
logDebug("Triggering Jenkins job URL: %s", jenkinsURL)
req, err := http.NewRequest("POST", jenkinsURL, nil)
if err != nil {
logError("Error creating Jenkins request for job %s: %v", job.jobName, err)
return false
}
// Add auth if credentials are provided
if jenkinsUser != "" && jenkinsToken != "" {
req.SetBasicAuth(jenkinsUser, jenkinsToken)
}
// Add parameters to URL query
q := req.URL.Query()
for key, value := range job.parameters {
q.Add(key, value)
}
req.URL.RawQuery = q.Encode()
// Execute request
resp, err := httpClient.Do(req)
if err != nil {
logError("Error triggering Jenkins job %s: %v", job.jobName, err)
return false
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
bodyBytes, _ := io.ReadAll(resp.Body)
logError("Jenkins returned error for job %s: status=%d, URL=%s, body=%s",
job.jobName, resp.StatusCode, jenkinsURL, string(bodyBytes))
return false
}
logInfo("Successfully triggered Jenkins job %s for event %s",
job.jobName, job.eventID)
return true
}
// Custom JSON log writer
type jsonLogWriter struct {
out io.Writer
}
func (w *jsonLogWriter) Write(p []byte) (n int, err error) {
// Parse the log message
message := string(p)
// Create JSON structure
entry := map[string]interface{}{
"timestamp": time.Now().Format(time.RFC3339),
"message": strings.TrimSpace(message),
"level": "info", // Default level, in a real implementation you'd parse this
}
// Convert to JSON
jsonData, err := json.Marshal(entry)
if err != nil {
return 0, err
}
// Write JSON with newline
return w.out.Write(append(jsonData, '\n'))
}
// Add these utility functions for level-based logging
func logDebug(format string, v ...interface{}) {
configMutex.RLock()
level := config.Logging.Level
configMutex.RUnlock()
if level == "debug" {
logger.Printf("[DEBUG] "+format, v...)
}
}
func logInfo(format string, v ...interface{}) {
configMutex.RLock()
level := config.Logging.Level
configMutex.RUnlock()
if level == "debug" || level == "info" {
logger.Printf("[INFO] "+format, v...)
}
}
func logWarn(format string, v ...interface{}) {
configMutex.RLock()
level := config.Logging.Level
configMutex.RUnlock()
if level == "debug" || level == "info" || level == "warn" {
logger.Printf("[WARN] "+format, v...)
}
}
func logError(format string, v ...interface{}) {
// Error level logs are always shown
logger.Printf("[ERROR] "+format, v...)
}
func cleanupEvents() {
for {
configMutex.RLock()
interval := time.Duration(config.EventCleanup.Interval) * time.Second
expireAfter := time.Duration(config.EventCleanup.ExpireAfter) * time.Second
configMutex.RUnlock()
time.Sleep(interval)
now := time.Now()
processedEvents.Range(func(key, value interface{}) bool {
if timestamp, ok := value.(time.Time); ok {
if now.Sub(timestamp) > expireAfter {
processedEvents.Delete(key)
logDebug("Cleaned up expired event: %v", key)
}
}
return true
})
}
}