diff --git a/README.md b/README.md index 9ea4af5..e126aed 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,6 @@ [![Go](https://img.shields.io/badge/Go-1.24+-blue?logo=go)](https://golang.org/) [![Docker](https://img.shields.io/badge/Docker-Container-blue?logo=docker)](https://www.docker.com/) -[![Beta](https://img.shields.io/badge/Status-Beta-yellow?style=for-the-badge)](https://github.com/gateixeira/live-actions/issues) - # Live Actions - GitHub Actions Monitoring 🚀 > ⚠️ **Beta Software Notice**: Live Actions is currently in beta. While functional and actively developed, expect potential instabilities. Please report issues and provide feedback to help us improve! @@ -100,6 +98,13 @@ The UI updates in real time via Server-Sent Events — no manual refresh needed. | `TLS_ENABLED` | `false` | Enable HTTPS cookie flags | | `DATA_RETENTION_DAYS` | `30` | How long to keep historical data | | `CLEANUP_INTERVAL_HOURS` | `24` | How often to run data cleanup | +| `WEBHOOK_TRANSPORT` | `http` | How webhooks reach the app: `http` (public endpoint) or `websocket` (relay, no public endpoint needed) | +| `GITHUB_TOKEN` | *(required for `websocket`)* | Token with `admin:repo_hook` (per-repo) or `admin:org_hook` (per-org) scope. `gh auth token` works. | +| `GITHUB_REPO` | | `owner/repo` to subscribe to. Mutually exclusive with `GITHUB_ORG` and `GITHUB_ENTERPRISE`. | +| `GITHUB_ORG` | | Org login to subscribe to. Mutually exclusive with `GITHUB_REPO` and `GITHUB_ENTERPRISE`. | +| `GITHUB_ENTERPRISE` | | Enterprise slug to subscribe to. Mutually exclusive with `GITHUB_REPO` and `GITHUB_ORG`. See enterprise caveat below. | +| `GITHUB_EVENTS` | `workflow_run,workflow_job` | Comma-separated event types for the WebSocket subscription (use `*` for all). | +| `GITHUB_HOST` | `github.com` | GitHub host. Use your `.ghe.com` subdomain for Enterprise Cloud with data residency, or your GHES hostname for GitHub Enterprise Server. | ## GitHub Webhook Configuration @@ -133,6 +138,69 @@ ngrok http 8080 Update your GitHub webhook URL to the ngrok HTTPS URL (e.g., `https://a1b2c3d4.ngrok.io/webhook`). +### WebSocket transport (no public endpoint) + +If the app runs somewhere GitHub can't reach (behind NAT, on a laptop, in a +private VPC), set `WEBHOOK_TRANSPORT=websocket` and the app will subscribe +to GitHub's relay over an outbound WebSocket connection. This mirrors the +approach used by the [`gh webhook`](https://github.com/cli/gh-webhook) CLI: +on startup a temporary webhook is created on the target repo or org with +`active=false`, the relay is dialed over `wss://`, the hook is activated, +and deliveries arrive as JSON frames on the open connection. The HTTP +endpoint is not registered when this mode is enabled by the operator on the +GitHub side, but the app keeps `POST /webhook` available either way so you +can still send manual replays from `curl`. + +**Quick start (per-repo):** + +```bash +export WEBHOOK_TRANSPORT=websocket +export GITHUB_TOKEN=$(gh auth token) # needs admin:repo_hook scope +export GITHUB_REPO=owner/repo +export GITHUB_EVENTS=workflow_run,workflow_job +make run +``` + +**Per-org:** + +```bash +export WEBHOOK_TRANSPORT=websocket +export GITHUB_TOKEN=$(gh auth token) # needs admin:org_hook scope +export GITHUB_ORG=my-org +make run +``` + +**Per-enterprise:** + +```bash +export WEBHOOK_TRANSPORT=websocket +export GITHUB_TOKEN=$(gh auth token) # needs manage_webhooks (or site_admin) scope +export GITHUB_ENTERPRISE=my-enterprise +make run +``` + +> **Note on enterprise mode:** the upstream `gh webhook` CLI only supports +> repo and org hooks; enterprise support here uses the same protocol against +> `POST /enterprises/{slug}/hooks` but the relay (`webhook.gh.io`) is not +> known to be exercised at the enterprise level. It may or may not return a +> usable `ws_url` depending on whether your account has the feature enabled +> at that scope. Test in a non-critical environment first. + +**Caveats:** + +- The relay endpoint (`webhook.gh.io`) is GitHub-managed and undocumented; + the protocol can change without notice. +- Each subscription is single-subscriber: only one process at a time can + consume a given hook's WebSocket stream. +- Per-frame HMAC verification is skipped because the relay itself is + authenticated by `GITHUB_TOKEN` at connection time. +- The temporary hook is best-effort deleted on graceful shutdown. A hard + kill (`SIGKILL`, OOM) will leak the hook in the repo or org settings; + delete it manually under *Settings → Webhooks* if that happens. +- Deliveries arriving over WebSocket are not visible in GitHub's "Recent + Deliveries" UI as redeliverable, since the hook stays inactive between + reconnects. + ## API Endpoints | Endpoint | Description | diff --git a/cmd/server/server.go b/cmd/server/server.go index 5e0c2ec..f0af347 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -3,10 +3,12 @@ package server import ( "context" "embed" + "errors" "io/fs" "net/http" "os" "path/filepath" + "strings" "time" "github.com/gateixeira/live-actions/handlers" @@ -14,12 +16,23 @@ import ( "github.com/gateixeira/live-actions/internal/database" "github.com/gateixeira/live-actions/internal/middleware" "github.com/gateixeira/live-actions/internal/services" + "github.com/gateixeira/live-actions/internal/services/ghws" "github.com/gateixeira/live-actions/pkg/logger" pkgmetrics "github.com/gateixeira/live-actions/pkg/metrics" "github.com/gin-gonic/gin" "go.uber.org/zap" ) +// ingestAdapter bridges the *handlers.WebhookHandler.Ingest signature into +// the ghws.Ingester interface so the ghws package has no compile-time +// dependency on the handlers package. +type ingestAdapter struct{ h *handlers.WebhookHandler } + +func (a ingestAdapter) Ingest(eventType, deliveryID string, body []byte) ghws.IngestResult { + r := a.h.Ingest(eventType, deliveryID, body) + return ghws.IngestResult{Status: r.Status, Message: r.Message} +} + // SetupAndRun configures the router and starts the server func SetupAndRun(staticFS embed.FS) { cfg, err := config.NewConfig() @@ -144,6 +157,42 @@ func SetupAndRun(staticFS embed.FS) { go metricsService.Start() go gracefulShutdown.Start() + // Optional: open a WebSocket relay subscription so deliveries can flow + // without a publicly reachable HTTP endpoint. + var ( + wsCancel context.CancelFunc + wsDone = make(chan struct{}) + ) + if cfg.Vars.WebhookTransport == "websocket" { + sub, err := ghws.NewSubscriber(ghws.Config{ + Token: cfg.Vars.GitHubToken, + Host: cfg.Vars.GitHubHost, + Repo: cfg.Vars.GitHubRepo, + Org: cfg.Vars.GitHubOrg, + Enterprise: cfg.Vars.GitHubEnterprise, + Events: splitEvents(cfg.Vars.GitHubEvents), + Secret: cfg.Vars.WebhookSecret, + }, ingestAdapter{h: webhookHandler}) + if err != nil { + logger.Logger.Fatal("Invalid WebSocket subscriber config", zap.Error(err)) + } + var wsCtx context.Context + wsCtx, wsCancel = context.WithCancel(context.Background()) + go func() { + defer close(wsDone) + if err := sub.Run(wsCtx); err != nil && !errors.Is(err, context.Canceled) { + logger.Logger.Error("WebSocket subscriber exited with error", zap.Error(err)) + } + }() + logger.Logger.Info("WebSocket transport enabled", + zap.String("repo", cfg.Vars.GitHubRepo), + zap.String("org", cfg.Vars.GitHubOrg), + zap.String("enterprise", cfg.Vars.GitHubEnterprise), + zap.String("events", cfg.Vars.GitHubEvents)) + } else { + close(wsDone) + } + logger.Logger.Info("Starting server", zap.String("port", cfg.Vars.Port), zap.String("environment", cfg.Vars.Environment), @@ -163,6 +212,10 @@ func SetupAndRun(staticFS embed.FS) { gracefulShutdown.Wait() // Stop services + if wsCancel != nil { + wsCancel() + <-wsDone + } webhookHandler.Shutdown() cleanupService.Stop() metricsService.Stop() @@ -180,3 +233,17 @@ func spaFallbackHandler(indexHTML []byte) gin.HandlerFunc { c.Data(http.StatusOK, "text/html; charset=utf-8", indexHTML) } } + +// splitEvents parses a comma-separated GITHUB_EVENTS value into a slice +// suitable for ghws.Config. Whitespace is trimmed and empty entries are +// dropped so trailing/duplicate commas are forgiving. +func splitEvents(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + if t := strings.TrimSpace(p); t != "" { + out = append(out, t) + } + } + return out +} diff --git a/go.mod b/go.mod index 0d66ad3..abd7132 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.24.0 require ( github.com/gin-gonic/gin v1.9.1 + github.com/gorilla/websocket v1.5.3 github.com/prometheus/client_golang v1.22.0 - github.com/prometheus/client_model v0.6.1 github.com/stretchr/testify v1.10.0 go.uber.org/zap v1.27.0 modernc.org/sqlite v1.45.0 @@ -35,6 +35,7 @@ require ( github.com/ncruces/go-strftime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect diff --git a/go.sum b/go.sum index 49b0698..d42eb6d 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17k github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= diff --git a/handlers/webhook_handler.go b/handlers/webhook_handler.go index 5b923c5..debf562 100644 --- a/handlers/webhook_handler.go +++ b/handlers/webhook_handler.go @@ -114,6 +114,136 @@ func ValidateGitHubWebhook(config *config.Config) gin.HandlerFunc { } } +// IngestResult is the structured outcome of accepting one webhook delivery, +// independent of the transport (HTTP POST or WebSocket relay frame). +// `Status` mirrors the HTTP status code semantics so a transport can map +// it directly into its own response (HTTP status code, WS ack frame, etc.). +// - 202: accepted and enqueued for processing +// - 200: ignored (no handler registered for this event type) +// - 400: payload rejected (bad JSON, missing fields, unknown status) +// - 503: ingest queue saturated; the caller should retry the delivery +// - 500: internal failure not classified above +type IngestResult struct { + Status int + Message string +} + +// Ingest performs the transport-agnostic part of accepting a webhook +// delivery: payload decoding, validation, extraction of ordering metadata +// and enqueue onto the ordering service. Signature verification is the +// caller's responsibility (HTTP middleware verifies HMAC; the WebSocket +// relay is authenticated by token at connection time and does not need +// per-frame verification). +func (h *WebhookHandler) Ingest(eventType, deliveryID string, body []byte) IngestResult { + if eventType == "" { + return IngestResult{Status: http.StatusBadRequest, Message: "Missing event type"} + } + if deliveryID == "" { + return IngestResult{Status: http.StatusBadRequest, Message: "Missing delivery ID"} + } + + // GitHub may send either application/json or application/x-www-form-urlencoded; + // only the form variant needs payload= unwrapping. + jsonData := body + if bodyStr := string(body); strings.HasPrefix(bodyStr, "payload=") { + decodedBody, err := url.QueryUnescape(bodyStr) + if err != nil { + logger.Logger.Error("Failed to decode URL-encoded payload", zap.Error(err)) + return IngestResult{Status: http.StatusBadRequest, Message: "Failed to decode URL-encoded payload"} + } + const prefix = "payload=" + if !strings.HasPrefix(decodedBody, prefix) { + logger.Logger.Error("URL-encoded payload does not start with expected prefix", + zap.String("expected_prefix", prefix), + zap.String("payload_start", decodedBody[:min(len(decodedBody), 50)])) + return IngestResult{Status: http.StatusBadRequest, Message: "Invalid URL-encoded payload format"} + } + jsonData = []byte(decodedBody[len(prefix):]) + } + + var payload map[string]interface{} + if err := json.Unmarshal(jsonData, &payload); err != nil { + logger.Logger.Error("Failed to parse JSON payload", + zap.Error(err), + zap.String("payload_start", string(jsonData[:min(len(jsonData), 100)]))) + return IngestResult{Status: http.StatusBadRequest, Message: "Invalid JSON payload"} + } + + handler, exists := h.handlers[eventType] + if !exists { + logger.Logger.Warn("No handler registered for event type", zap.String("event_type", eventType)) + metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventType, "ignored").Inc() + return IngestResult{Status: http.StatusOK, Message: "Event type not supported"} + } + + extractedTime, err := handler.ExtractEventTimestamp(jsonData) + if err != nil { + logger.Logger.Error("Failed to extract event timestamp", + zap.Error(err), + zap.String("event_type", eventType), + zap.String("delivery_id", deliveryID)) + metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventType, "rejected_invalid").Inc() + return IngestResult{Status: http.StatusBadRequest, Message: "Failed to extract event timestamp"} + } + + orderingKey, err := handler.ExtractOrderingKey(jsonData) + if err != nil { + logger.Logger.Error("Failed to extract ordering key", + zap.Error(err), + zap.String("event_type", eventType), + zap.String("delivery_id", deliveryID)) + metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventType, "rejected_invalid").Inc() + return IngestResult{Status: http.StatusBadRequest, Message: "Failed to extract ordering key"} + } + + statusPriority, err := handler.GetStatusPriority(jsonData) + if err != nil { + logger.Logger.Error("Failed to extract status priority", + zap.Error(err), + zap.String("event_type", eventType), + zap.String("delivery_id", deliveryID)) + metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventType, "rejected_invalid").Inc() + return IngestResult{Status: http.StatusBadRequest, Message: "Failed to extract status priority"} + } + + orderedEvent := &models.OrderedEvent{ + Sequence: models.EventSequence{ + EventID: deliveryID, + Timestamp: extractedTime, + DeliveryID: deliveryID, + ReceivedAt: time.Now(), + }, + EventType: eventType, + RawPayload: jsonData, + OrderingKey: orderingKey, + StatusPriority: statusPriority, + } + + if err := h.orderingService.AddEvent(orderedEvent); err != nil { + if errors.Is(err, services.ErrIngestQueueFull) { + logger.Logger.Error("Webhook ingest queue full; rejecting delivery", + zap.String("delivery_id", deliveryID), + zap.String("event_type", eventType)) + metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventType, "rejected_queue_full").Inc() + return IngestResult{Status: http.StatusServiceUnavailable, Message: "Server overloaded; manual redelivery required"} + } + logger.Logger.Error("Failed to add event to ordering service", zap.Error(err)) + metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventType, "rejected_invalid").Inc() + return IngestResult{Status: http.StatusInternalServerError, Message: "Failed to process event"} + } + + metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventType, "accepted").Inc() + + logger.Logger.Debug("Event queued for ordered processing", + zap.String("event_type", orderedEvent.EventType), + zap.String("delivery_id", orderedEvent.Sequence.DeliveryID), + zap.String("ordering_key", orderedEvent.OrderingKey), + zap.Int("status_priority", orderedEvent.StatusPriority), + ) + + return IngestResult{Status: http.StatusAccepted, Message: "Event queued for processing"} +} + // Handle processes incoming webhook events func (h *WebhookHandler) Handle() gin.HandlerFunc { return func(c *gin.Context) { @@ -124,14 +254,12 @@ func (h *WebhookHandler) Handle() gin.HandlerFunc { return } - // Parse event type from context eventTypeVal, exists := c.Get("eventType") if !exists { logger.Logger.Error("Event type not found in context") c.JSON(http.StatusBadRequest, gin.H{"error": "Missing event type"}) return } - eventTypeStr, ok := eventTypeVal.(string) if !ok { logger.Logger.Error("Event type is not a string") @@ -140,129 +268,16 @@ func (h *WebhookHandler) Handle() gin.HandlerFunc { } deliveryID := c.GetHeader(GitHubDeliveryHeader) - if deliveryID == "" { - logger.Logger.Error("Missing X-GitHub-Delivery header") - c.JSON(http.StatusBadRequest, gin.H{"error": "Missing delivery ID"}) - return - } - // Handle different payload formats - var jsonData []byte - bodyStr := string(body) - - // Check if this is a URL-encoded payload - if strings.HasPrefix(bodyStr, "payload=") { - // URL-encoded payload - extract the JSON part - decodedBody, err := url.QueryUnescape(bodyStr) - if err != nil { - logger.Logger.Error("Failed to decode URL-encoded payload", zap.Error(err)) - c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to decode URL-encoded payload"}) - return - } - - const prefix = "payload=" - if !strings.HasPrefix(decodedBody, prefix) { - logger.Logger.Error("URL-encoded payload does not start with expected prefix", - zap.String("expected_prefix", prefix), - zap.String("payload_start", decodedBody[:min(len(decodedBody), 50)])) - c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid URL-encoded payload format"}) - return - } - jsonData = []byte(decodedBody[len(prefix):]) - } else { - // Direct JSON payload - jsonData = body + result := h.Ingest(eventTypeStr, deliveryID, body) + switch { + case result.Status == http.StatusAccepted: + c.JSON(result.Status, gin.H{"status": "queued", "message": result.Message}) + case result.Status == http.StatusOK: + c.JSON(result.Status, gin.H{"status": "ignored", "message": result.Message}) + default: + c.JSON(result.Status, gin.H{"error": result.Message}) } - - // Validate that we have valid JSON - var payload map[string]interface{} - if err := json.Unmarshal(jsonData, &payload); err != nil { - logger.Logger.Error("Failed to parse JSON payload", - zap.Error(err), - zap.String("payload_start", string(jsonData[:min(len(jsonData), 100)]))) - c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid JSON payload"}) - return - } - - handler, exists := h.handlers[eventTypeStr] - if !exists { - logger.Logger.Warn("No handler registered for event type", zap.String("event_type", eventTypeStr)) - metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventTypeStr, "ignored").Inc() - c.JSON(http.StatusOK, gin.H{"status": "ignored", "message": "Event type not supported"}) - return - } - extractedTime, err := handler.ExtractEventTimestamp(jsonData) - - if err != nil { - logger.Logger.Error("Failed to extract event timestamp", - zap.Error(err), - zap.String("event_type", eventTypeStr), - zap.String("delivery_id", deliveryID)) - metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventTypeStr, "rejected_invalid").Inc() - c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to extract event timestamp"}) - return - } - - orderingKey, err := handler.ExtractOrderingKey(jsonData) - if err != nil { - logger.Logger.Error("Failed to extract ordering key", - zap.Error(err), - zap.String("event_type", eventTypeStr), - zap.String("delivery_id", deliveryID)) - metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventTypeStr, "rejected_invalid").Inc() - c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to extract ordering key"}) - return - } - - statusPriority, err := handler.GetStatusPriority(jsonData) - if err != nil { - logger.Logger.Error("Failed to extract status priority", - zap.Error(err), - zap.String("event_type", eventTypeStr), - zap.String("delivery_id", deliveryID)) - metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventTypeStr, "rejected_invalid").Inc() - c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to extract status priority"}) - return - } - - orderedEvent := &models.OrderedEvent{ - Sequence: models.EventSequence{ - EventID: deliveryID, - Timestamp: extractedTime, - DeliveryID: deliveryID, - ReceivedAt: time.Now(), - }, - EventType: eventTypeStr, - RawPayload: jsonData, - OrderingKey: orderingKey, - StatusPriority: statusPriority, - } - - if err := h.orderingService.AddEvent(orderedEvent); err != nil { - if errors.Is(err, services.ErrIngestQueueFull) { - logger.Logger.Error("Webhook ingest queue full; rejecting delivery", - zap.String("delivery_id", deliveryID), - zap.String("event_type", eventTypeStr)) - metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventTypeStr, "rejected_queue_full").Inc() - c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Server overloaded; manual redelivery required"}) - return - } - logger.Logger.Error("Failed to add event to ordering service", zap.Error(err)) - metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventTypeStr, "rejected_invalid").Inc() - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to process event"}) - return - } - - metrics.GetRegistry().WebhookEventsTotal.WithLabelValues(eventTypeStr, "accepted").Inc() - - logger.Logger.Debug("Event queued for ordered processing", - zap.String("event_type", orderedEvent.EventType), - zap.String("delivery_id", orderedEvent.Sequence.DeliveryID), - zap.String("ordering_key", orderedEvent.OrderingKey), - zap.Int("status_priority", orderedEvent.StatusPriority), - ) - - c.JSON(http.StatusAccepted, gin.H{"status": "queued", "message": "Event queued for processing"}) } } diff --git a/handlers/workflow_job_handler_test.go b/handlers/workflow_job_handler_test.go index f1a2711..ebcf03a 100644 --- a/handlers/workflow_job_handler_test.go +++ b/handlers/workflow_job_handler_test.go @@ -90,7 +90,6 @@ func TestWorkflowJobHandler_HandleEvent_Success(t *testing.T) { job.RunID == 67890 }), mock.AnythingOfType("time.Time")).Return(true, nil) - // Execute the handler err = handler.HandleEvent(eventData, sequence) @@ -154,7 +153,6 @@ func TestWorkflowJobHandler_HandleEvent_DatabaseGetJobError(t *testing.T) { job.Status == models.JobStatus("queued") }), mock.AnythingOfType("time.Time")).Return(true, nil) - // Execute the handler err = handler.HandleEvent(eventData, sequence) @@ -282,7 +280,6 @@ func TestWorkflowJobHandler_HandleEvent_DifferentActions(t *testing.T) { job.Status == tc.expectedStatus }), mock.AnythingOfType("time.Time")).Return(true, nil) - // Execute the handler err = handler.HandleEvent(eventData, sequence) @@ -366,7 +363,6 @@ func TestWorkflowJobHandler_HandleEvent_StatusTransitions(t *testing.T) { job.Status == tc.expectedStatus }), mock.AnythingOfType("time.Time")).Return(true, nil) - // Execute the handler err = handler.HandleEvent(eventData, sequence) @@ -422,7 +418,6 @@ func TestWorkflowJobHandler_HandleEvent_WithStartedAtTime(t *testing.T) { job.Status == models.JobStatus("in_progress") }), mock.AnythingOfType("time.Time")).Return(true, nil) - // Execute the handler err = handler.HandleEvent(eventData, sequence) @@ -486,7 +481,6 @@ func TestWorkflowJobHandler_HandleEvent_MinimalRequiredFields(t *testing.T) { job.Status == models.JobStatus("queued") }), mock.AnythingOfType("time.Time")).Return(true, nil) - // Execute the handler err = handler.HandleEvent(eventData, sequence) diff --git a/internal/config/config.go b/internal/config/config.go index b1811e0..ed353b4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,6 +17,20 @@ type Vars struct { DataRetentionDays int CleanupIntervalHours int StaleJobThresholdHours int + + // WebhookTransport selects how GitHub deliveries reach this server. + // "http" (default): the server only listens on POST /webhook and + // relies on GitHub being able to reach it. + // "websocket": additionally opens a long-lived WebSocket + // relay (see internal/services/ghws). The HTTP + // endpoint stays registered either way. + WebhookTransport string + GitHubToken string + GitHubHost string + GitHubRepo string // owner/repo + GitHubOrg string + GitHubEnterprise string // enterprise slug + GitHubEvents string // comma-separated list, default "workflow_job,workflow_run" } type Config struct { @@ -26,15 +40,23 @@ type Config struct { // NewConfig creates and initializes a new application config. func NewConfig() (*Config, error) { vars := Vars{ - WebhookSecret: os.Getenv("WEBHOOK_SECRET"), - Port: getEnvOrDefault("PORT", "8080"), - DatabasePath: getEnvOrDefault("DATABASE_PATH", "./data/live-actions.db"), - LogLevel: getEnvOrDefault("LOG_LEVEL", "info"), - TLSEnabled: getEnvOrDefault("TLS_ENABLED", "false") == "true", - Environment: getEnvOrDefault("ENVIRONMENT", "development"), - DataRetentionDays: getEnvOrDefaultInt("DATA_RETENTION_DAYS", 30), // Default 1 month - CleanupIntervalHours: getEnvOrDefaultInt("CLEANUP_INTERVAL_HOURS", 24), // Daily cleanup + WebhookSecret: os.Getenv("WEBHOOK_SECRET"), + Port: getEnvOrDefault("PORT", "8080"), + DatabasePath: getEnvOrDefault("DATABASE_PATH", "./data/live-actions.db"), + LogLevel: getEnvOrDefault("LOG_LEVEL", "info"), + TLSEnabled: getEnvOrDefault("TLS_ENABLED", "false") == "true", + Environment: getEnvOrDefault("ENVIRONMENT", "development"), + DataRetentionDays: getEnvOrDefaultInt("DATA_RETENTION_DAYS", 30), // Default 1 month + CleanupIntervalHours: getEnvOrDefaultInt("CLEANUP_INTERVAL_HOURS", 24), // Daily cleanup StaleJobThresholdHours: getEnvOrDefaultInt("STALE_JOB_THRESHOLD_HOURS", 24), // Jobs queued/in_progress longer than this are considered stale + + WebhookTransport: getEnvOrDefault("WEBHOOK_TRANSPORT", "http"), + GitHubToken: os.Getenv("GITHUB_TOKEN"), + GitHubHost: getEnvOrDefault("GITHUB_HOST", "github.com"), + GitHubRepo: os.Getenv("GITHUB_REPO"), + GitHubOrg: os.Getenv("GITHUB_ORG"), + GitHubEnterprise: os.Getenv("GITHUB_ENTERPRISE"), + GitHubEvents: getEnvOrDefault("GITHUB_EVENTS", "workflow_job,workflow_run"), } config := &Config{Vars: vars} @@ -46,6 +68,34 @@ func NewConfig() (*Config, error) { } } + switch vars.WebhookTransport { + case "http", "websocket": + default: + return nil, fmt.Errorf("WEBHOOK_TRANSPORT must be \"http\" or \"websocket\", got %q", vars.WebhookTransport) + } + + if vars.WebhookTransport == "websocket" { + if vars.GitHubToken == "" { + return nil, fmt.Errorf("GITHUB_TOKEN is required when WEBHOOK_TRANSPORT=websocket") + } + set := 0 + if vars.GitHubRepo != "" { + set++ + } + if vars.GitHubOrg != "" { + set++ + } + if vars.GitHubEnterprise != "" { + set++ + } + if set == 0 { + return nil, fmt.Errorf("one of GITHUB_REPO, GITHUB_ORG, or GITHUB_ENTERPRISE is required when WEBHOOK_TRANSPORT=websocket") + } + if set > 1 { + return nil, fmt.Errorf("set only one of GITHUB_REPO, GITHUB_ORG, and GITHUB_ENTERPRISE") + } + } + return config, nil } diff --git a/internal/database/db.go b/internal/database/db.go index 6da270f..e7281ef 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -11,8 +11,8 @@ import ( "strings" "github.com/gateixeira/live-actions/pkg/logger" - _ "modernc.org/sqlite" "go.uber.org/zap" + _ "modernc.org/sqlite" ) //go:embed migrations/*.up.sql diff --git a/internal/database/pool_smoke_test.go b/internal/database/pool_smoke_test.go index 2595c67..58e97d3 100644 --- a/internal/database/pool_smoke_test.go +++ b/internal/database/pool_smoke_test.go @@ -1,21 +1,21 @@ package database import ( -"path/filepath" -"testing" + "path/filepath" + "testing" -"github.com/gateixeira/live-actions/pkg/logger" + "github.com/gateixeira/live-actions/pkg/logger" ) func TestInitDB_PragmasAppliedToBothPools(t *testing.T) { -logger.InitLogger("error") -dir := t.TempDir() -w, r, err := InitDB(filepath.Join(dir, "t.db")) -if err != nil { -t.Fatalf("InitDB: %v", err) -} -defer w.Close() -defer r.Close() + logger.InitLogger("error") + dir := t.TempDir() + w, r, err := InitDB(filepath.Join(dir, "t.db")) + if err != nil { + t.Fatalf("InitDB: %v", err) + } + defer w.Close() + defer r.Close() cases := map[string]string{ "journal_mode": "wal", @@ -26,19 +26,19 @@ defer r.Close() "temp_store": "2", // MEMORY "mmap_size": "268435456", } -for p, want := range cases { -var wv, rv string -if err := w.QueryRow("PRAGMA " + p).Scan(&wv); err != nil { -t.Fatalf("write PRAGMA %s: %v", p, err) -} -if err := r.QueryRow("PRAGMA " + p).Scan(&rv); err != nil { -t.Fatalf("read PRAGMA %s: %v", p, err) -} -if wv != want { -t.Errorf("write PRAGMA %s = %q, want %q", p, wv, want) -} -if rv != want { -t.Errorf("read PRAGMA %s = %q, want %q", p, rv, want) -} -} + for p, want := range cases { + var wv, rv string + if err := w.QueryRow("PRAGMA " + p).Scan(&wv); err != nil { + t.Fatalf("write PRAGMA %s: %v", p, err) + } + if err := r.QueryRow("PRAGMA " + p).Scan(&rv); err != nil { + t.Fatalf("read PRAGMA %s: %v", p, err) + } + if wv != want { + t.Errorf("write PRAGMA %s = %q, want %q", p, wv, want) + } + if rv != want { + t.Errorf("read PRAGMA %s = %q, want %q", p, rv, want) + } + } } diff --git a/internal/services/event_ordering_service.go b/internal/services/event_ordering_service.go index 1188e43..f1d2672 100644 --- a/internal/services/event_ordering_service.go +++ b/internal/services/event_ordering_service.go @@ -62,12 +62,12 @@ type EventOrderingService struct { batchSize int // ingest worker (buffers AddEvent calls and batch-INSERTs them) - ingestCh chan *models.OrderedEvent - ingestBatchSize int - ingestBatchWait time.Duration - enqueueTimeout time.Duration - ingestDrainWait time.Duration - ingestDoneCh chan struct{} + ingestCh chan *models.OrderedEvent + ingestBatchSize int + ingestBatchWait time.Duration + enqueueTimeout time.Duration + ingestDrainWait time.Duration + ingestDoneCh chan struct{} mutex sync.Mutex wg sync.WaitGroup @@ -78,19 +78,19 @@ type EventOrderingService struct { func NewEventOrderingService(db database.DatabaseInterface, processFunc func(*models.OrderedEvent) error) *EventOrderingService { ctx, cancel := context.WithCancel(context.Background()) return &EventOrderingService{ - db: db, - processFunc: processFunc, - flushInterval: 5 * time.Second, - maxAge: 10 * time.Second, - batchSize: 500, - ingestCh: make(chan *models.OrderedEvent, defaultIngestChannelSize), - ingestBatchSize: defaultIngestBatchSize, - ingestBatchWait: defaultIngestBatchWait, - enqueueTimeout: defaultEnqueueTimeout, - ingestDrainWait: 5 * time.Second, - ingestDoneCh: make(chan struct{}), - ctx: ctx, - cancel: cancel, + db: db, + processFunc: processFunc, + flushInterval: 5 * time.Second, + maxAge: 10 * time.Second, + batchSize: 500, + ingestCh: make(chan *models.OrderedEvent, defaultIngestChannelSize), + ingestBatchSize: defaultIngestBatchSize, + ingestBatchWait: defaultIngestBatchWait, + enqueueTimeout: defaultEnqueueTimeout, + ingestDrainWait: 5 * time.Second, + ingestDoneCh: make(chan struct{}), + ctx: ctx, + cancel: cancel, } } diff --git a/internal/services/ghws/subscriber.go b/internal/services/ghws/subscriber.go new file mode 100644 index 0000000..b48c664 --- /dev/null +++ b/internal/services/ghws/subscriber.go @@ -0,0 +1,418 @@ +// Package ghws implements a WebSocket-based webhook transport for live-actions. +// +// It mirrors the protocol used by https://github.com/cli/gh-webhook: +// +// 1. Create a webhook on a repo or org via the REST API with name="cli" +// and active=false. The response includes a one-time ws_url. +// 2. Dial that ws_url over TLS with Authorization: . +// 3. Activate the webhook so GitHub starts emitting deliveries. +// 4. Read JSON frames of the form {"Header": http.Header, "Body": []byte} +// from the relay, hand the body off to the local Ingester, and write +// back a JSON ack frame describing the outcome. +// 5. On disconnect (CloseAbnormalClosure / network error) redial with +// exponential backoff up to maxBackoff. +// +// The relay endpoint is GitHub-managed and undocumented; this transport is +// best suited to development or to single-instance deployments behind NAT +// where a public HTTP endpoint isn't available. Production-scale ingest +// should still use the HTTP path. +package ghws + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/gateixeira/live-actions/pkg/logger" + "github.com/gateixeira/live-actions/pkg/metrics" + "github.com/gorilla/websocket" + "go.uber.org/zap" +) + +// Ingester is the seam between this transport and the rest of the system. +// In production it is satisfied by *handlers.WebhookHandler.Ingest, but the +// package itself only depends on the function signature so it can be tested +// against a fake. +type Ingester interface { + Ingest(eventType, deliveryID string, body []byte) IngestResult +} + +// IngestResult mirrors handlers.IngestResult to avoid an import cycle. +type IngestResult struct { + Status int + Message string +} + +// Config describes a single relay subscription. Either Repo or Org must be +// non-empty; the rest have sensible defaults. +type Config struct { + // Token is a GitHub PAT or OAuth token with admin:repo_hook (Repo) or + // admin:org_hook (Org) scope. + Token string + // Host is the GitHub API host. Use "github.com" for github.com, or a + // GHES hostname. + Host string + // Repo is "owner/repo". Mutually exclusive with Org and Enterprise. + Repo string + // Org is "org-name". Mutually exclusive with Repo and Enterprise. + Org string + // Enterprise is the enterprise slug. Mutually exclusive with Repo and + // Org. Note: GitHub's relay support for enterprise-level hooks is not + // exercised by the upstream `gh webhook` CLI, so behaviour here is + // best-effort and may require an account that has the relay feature + // enabled at the enterprise level. + Enterprise string + // Events are the webhook event types to subscribe to. Use ["*"] for all. + Events []string + // Secret is the optional webhook signing secret. Currently informational; + // the transport does not verify per-frame signatures because the relay + // is already authenticated by Token. + Secret string +} + +// Validate returns an error if the configuration is unusable. +func (c *Config) Validate() error { + if c.Token == "" { + return errors.New("GitHub token is required") + } + set := 0 + if c.Repo != "" { + set++ + } + if c.Org != "" { + set++ + } + if c.Enterprise != "" { + set++ + } + if set == 0 { + return errors.New("one of Repo, Org, or Enterprise is required") + } + if set > 1 { + return errors.New("only one of Repo, Org, or Enterprise may be set") + } + if len(c.Events) == 0 { + return errors.New("at least one event type is required") + } + if c.Host == "" { + c.Host = "github.com" + } + return nil +} + +// apiBase returns the REST API base for the configured host. Tests can +// override this by setting Subscriber.apiBaseOverride. +func (s *Subscriber) apiBase() string { + if s.apiBaseOverride != "" { + return s.apiBaseOverride + } + return s.cfg.apiBase() +} + +// apiBase returns the REST API base for the configured host. Three shapes +// are supported: +// +// - github.com → https://api.github.com (public GitHub) +// - .ghe.com → https://api..ghe.com (Enterprise Cloud with data residency) +// - everything else → https:///api/v3 (GitHub Enterprise Server) +func (c *Config) apiBase() string { + switch { + case c.Host == "github.com": + return "https://api.github.com" + case strings.HasSuffix(c.Host, ".ghe.com"): + return "https://api." + c.Host + default: + return "https://" + c.Host + "/api/v3" + } +} + +// Subscriber owns the lifecycle of one relay subscription. +type Subscriber struct { + cfg Config + ingester Ingester + + httpClient *http.Client + dialer *websocket.Dialer + + // apiBaseOverride, when non-empty, replaces the GitHub REST base URL. + // Tests use this to point at an httptest server. + apiBaseOverride string + + // Tunable retry/backoff knobs for tests. + initialBackoff time.Duration + maxBackoff time.Duration +} + +// NewSubscriber returns a ready-to-Run subscriber. cfg is validated and +// normalised; an error is returned only if cfg is invalid. +func NewSubscriber(cfg Config, ingester Ingester) (*Subscriber, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + return &Subscriber{ + cfg: cfg, + ingester: ingester, + httpClient: &http.Client{Timeout: 30 * time.Second}, + dialer: websocket.DefaultDialer, + initialBackoff: 1 * time.Second, + maxBackoff: 60 * time.Second, + }, nil +} + +// Run blocks until ctx is cancelled. It creates a hook, opens the WebSocket, +// reads frames, and dispatches them to Ingester. On disconnect it retries +// with exponential backoff. Each redial creates a fresh hook (the relay +// ws_url is single-use) and the previous hook is best-effort deleted. +func (s *Subscriber) Run(ctx context.Context) error { + backoff := s.initialBackoff + for { + if ctx.Err() != nil { + return ctx.Err() + } + + err := s.runOnce(ctx) + if ctx.Err() != nil { + return ctx.Err() + } + if err == nil { + // runOnce only returns nil on a clean close; treat as exit. + return nil + } + + logger.Logger.Warn("WebSocket subscriber disconnected; will retry", + zap.Duration("backoff", backoff), + zap.Error(err)) + + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + backoff *= 2 + if backoff > s.maxBackoff { + backoff = s.maxBackoff + } + } +} + +// runOnce performs a single create-hook → dial → read-loop cycle. It returns +// nil only on a websocket.CloseNormalClosure; any other condition is wrapped +// and returned for the caller to back off and retry. +func (s *Subscriber) runOnce(ctx context.Context) error { + hook, err := s.createHook(ctx) + if err != nil { + return fmt.Errorf("create hook: %w", err) + } + defer s.deleteHook(context.Background(), hook) + + conn, _, err := s.dialer.DialContext(ctx, hook.WsURL, http.Header{ + "Authorization": []string{s.cfg.Token}, + }) + if err != nil { + return fmt.Errorf("dial relay: %w", err) + } + defer conn.Close() + + if err := s.activateHook(ctx, hook); err != nil { + return fmt.Errorf("activate hook: %w", err) + } + + logger.Logger.Info("WebSocket subscriber connected to GitHub relay", + zap.String("repo", s.cfg.Repo), + zap.String("org", s.cfg.Org), + zap.String("enterprise", s.cfg.Enterprise), + zap.Int("hook_id", hook.ID)) + + // Tear the connection down when ctx is cancelled. + stop := make(chan struct{}) + defer close(stop) + go func() { + select { + case <-ctx.Done(): + _ = conn.Close() + case <-stop: + } + }() + + for { + var frame wsFrame + if err := conn.ReadJSON(&frame); err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return nil + } + return fmt.Errorf("read frame: %w", err) + } + + eventType := http.Header(frame.Header).Get("X-GitHub-Event") + deliveryID := http.Header(frame.Header).Get("X-GitHub-Delivery") + result := s.ingester.Ingest(eventType, deliveryID, frame.Body) + + metrics.GetRegistry().WebhookEventsTotal. + WithLabelValues(eventType, "ws_"+wsOutcome(result.Status)).Inc() + + ack := wsAck{ + Status: result.Status, + Body: []byte(result.Message), + } + if err := conn.WriteJSON(ack); err != nil { + return fmt.Errorf("write ack: %w", err) + } + } +} + +// wsFrame matches the JSON shape pushed down by the relay. The relay encodes +// http.Header as a map[string][]string, which decodes cleanly into this type. +type wsFrame struct { + Header map[string][]string `json:"Header"` + Body []byte `json:"Body"` +} + +// wsAck is what we write back per frame. The relay forwards Status to GitHub +// so deliveries appear in the repo's hook log with the right status code. +type wsAck struct { + Status int `json:"Status"` + Header map[string][]string `json:"Header,omitempty"` + Body []byte `json:"Body,omitempty"` +} + +func wsOutcome(status int) string { + switch { + case status >= 200 && status < 300: + return "accepted" + case status == http.StatusServiceUnavailable: + return "queue_full" + default: + return "rejected" + } +} + +// hookResponse is the subset of the create-hook response we need. +type hookResponse struct { + ID int `json:"id"` + URL string `json:"url"` + WsURL string `json:"ws_url"` +} + +// hookConfig is the JSON sub-object for the GitHub hook config. +type hookConfig struct { + ContentType string `json:"content_type"` + InsecureSSL string `json:"insecure_ssl"` + URL string `json:"url,omitempty"` + Secret string `json:"secret,omitempty"` +} + +type createHookRequest struct { + Name string `json:"name"` + Events []string `json:"events"` + Active bool `json:"active"` + Config hookConfig `json:"config"` +} + +func (s *Subscriber) hookPath() string { + switch { + case s.cfg.Enterprise != "": + return "/enterprises/" + s.cfg.Enterprise + "/hooks" + case s.cfg.Org != "": + return "/orgs/" + s.cfg.Org + "/hooks" + default: + return "/repos/" + s.cfg.Repo + "/hooks" + } +} + +// createHook posts a new dev webhook and returns the relay coordinates. +// The hook is created with active=false so we can finish wiring up the +// WebSocket before GitHub starts delivering. +func (s *Subscriber) createHook(ctx context.Context) (*hookResponse, error) { + body, _ := json.Marshal(createHookRequest{ + Name: "cli", + Events: s.cfg.Events, + Active: false, + Config: hookConfig{ + ContentType: "json", + InsecureSSL: "0", + Secret: s.cfg.Secret, + }, + }) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + s.apiBase()+s.hookPath(), bytes.NewReader(body)) + if err != nil { + return nil, err + } + s.applyAuth(req) + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + b, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("create hook: %s: %s", resp.Status, strings.TrimSpace(string(b))) + } + var hr hookResponse + if err := json.NewDecoder(resp.Body).Decode(&hr); err != nil { + return nil, fmt.Errorf("decode hook response: %w", err) + } + if hr.WsURL == "" { + return nil, errors.New("create hook: response missing ws_url; the relay feature may not be enabled for this account") + } + return &hr, nil +} + +// activateHook flips the hook's active flag once the WebSocket is connected. +func (s *Subscriber) activateHook(ctx context.Context, hook *hookResponse) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, hook.URL, + strings.NewReader(`{"active": true}`)) + if err != nil { + return err + } + s.applyAuth(req) + resp, err := s.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("activate hook: %s: %s", resp.Status, strings.TrimSpace(string(b))) + } + return nil +} + +// deleteHook is best-effort; failures only result in a stale hook in the +// repo's settings, not a delivery problem. Errors are logged at debug level. +func (s *Subscriber) deleteHook(ctx context.Context, hook *hookResponse) { + if hook == nil || hook.URL == "" { + return + } + dctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(dctx, http.MethodDelete, hook.URL, nil) + if err != nil { + logger.Logger.Debug("delete hook: build request", zap.Error(err)) + return + } + s.applyAuth(req) + resp, err := s.httpClient.Do(req) + if err != nil { + logger.Logger.Debug("delete hook: request failed", zap.Error(err)) + return + } + resp.Body.Close() +} + +func (s *Subscriber) applyAuth(req *http.Request) { + // REST calls go through the standard GitHub API, which requires the + // "Bearer" (or "token") prefix. The WebSocket dial uses the bare token + // form because that's what the relay expects (see Run/runOnce). + req.Header.Set("Authorization", "Bearer "+s.cfg.Token) + req.Header.Set("Accept", "application/vnd.github+json") + req.Header.Set("X-GitHub-Api-Version", "2022-11-28") +} diff --git a/internal/services/ghws/subscriber_test.go b/internal/services/ghws/subscriber_test.go new file mode 100644 index 0000000..6e71874 --- /dev/null +++ b/internal/services/ghws/subscriber_test.go @@ -0,0 +1,276 @@ +package ghws + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync" + "testing" + "time" + + "github.com/gateixeira/live-actions/pkg/logger" + "github.com/gorilla/websocket" +) + +func init() { + // The package logs at Info on connect/disconnect; initialise the global + // logger so tests don't NPE. + logger.InitLogger("error") +} + +// fakeIngester captures dispatched frames and returns a canned result. +type fakeIngester struct { + mu sync.Mutex + calls []ingestCall + result IngestResult +} + +type ingestCall struct { + eventType string + deliveryID string + body []byte +} + +func (f *fakeIngester) Ingest(eventType, deliveryID string, body []byte) IngestResult { + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, ingestCall{eventType, deliveryID, append([]byte(nil), body...)}) + return f.result +} + +// TestSubscriber_HappyPath exercises the full create-hook → dial → +// activate → frame → ack → close cycle against an httptest server that +// upgrades to WebSocket and speaks the relay protocol. +func TestSubscriber_HappyPath(t *testing.T) { + t.Parallel() + + upgrader := websocket.Upgrader{} + var ( + wsConnReady = make(chan struct{}) + activated = make(chan struct{}) + ackCh = make(chan wsAck, 1) + deletedCh = make(chan struct{}, 1) + ) + + mux := http.NewServeMux() + mux.HandleFunc("/repos/owner/repo/hooks", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("unexpected method %s on create-hook", r.Method) + http.Error(w, "bad", http.StatusBadRequest) + return + } + if got := r.Header.Get("Authorization"); got != "Bearer tok" { + t.Errorf("create-hook Authorization = %q, want %q", got, "Bearer tok") + } + // We don't know srv.URL until later; the test patches the response + // before serving by closing over a pointer to it. + writeHookResponse(t, w, r.Host) + }) + mux.HandleFunc("/repos/owner/repo/hooks/1", func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPatch: + close(activated) + w.WriteHeader(http.StatusOK) + case http.MethodDelete: + select { + case deletedCh <- struct{}{}: + default: + } + w.WriteHeader(http.StatusNoContent) + default: + http.Error(w, "bad", http.StatusBadRequest) + } + }) + mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + if got := r.Header.Get("Authorization"); got != "tok" { + t.Errorf("ws Authorization = %q, want %q", got, "tok") + } + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Errorf("ws upgrade: %v", err) + return + } + defer c.Close() + close(wsConnReady) + + // Wait until activate has happened, then push a frame. + <-activated + + body, _ := json.Marshal(map[string]string{"hello": "world"}) + if err := c.WriteJSON(wsFrame{ + Header: map[string][]string{ + "X-Github-Event": {"push"}, + "X-Github-Delivery": {"deliv-1"}, + }, + Body: body, + }); err != nil { + t.Errorf("write frame: %v", err) + return + } + + var ack wsAck + if err := c.ReadJSON(&ack); err != nil { + t.Errorf("read ack: %v", err) + return + } + ackCh <- ack + + // Close cleanly so runOnce returns nil and Run exits. + _ = c.WriteMessage(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "bye")) + }) + + srv := httptest.NewServer(mux) + defer srv.Close() + + ing := &fakeIngester{result: IngestResult{Status: 200, Message: "queued"}} + sub, err := NewSubscriber(Config{ + Token: "tok", + Repo: "owner/repo", + Events: []string{"*"}, + }, ing) + if err != nil { + t.Fatalf("NewSubscriber: %v", err) + } + sub.apiBaseOverride = srv.URL + sub.httpClient = srv.Client() + sub.initialBackoff = 10 * time.Millisecond + sub.maxBackoff = 10 * time.Millisecond + + // The handler responds with ws_url pointing at the same host but with + // the ws:// scheme; patch it in by overriding the response writer. + hookHostHolder.set(srv.Listener.Addr().String()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + done := make(chan error, 1) + go func() { done <- sub.Run(ctx) }() + + select { + case <-wsConnReady: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for ws connection") + } + + select { + case ack := <-ackCh: + if ack.Status != 200 { + t.Errorf("ack.Status = %d, want 200", ack.Status) + } + if string(ack.Body) != "queued" { + t.Errorf("ack.Body = %q, want %q", string(ack.Body), "queued") + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for ack") + } + + select { + case err := <-done: + if err != nil { + t.Fatalf("Run returned %v, want nil after normal close", err) + } + case <-time.After(2 * time.Second): + cancel() + t.Fatal("timed out waiting for Run to exit") + } + + select { + case <-deletedCh: + case <-time.After(time.Second): + t.Error("hook was not deleted on shutdown") + } + + ing.mu.Lock() + defer ing.mu.Unlock() + if len(ing.calls) != 1 { + t.Fatalf("Ingest called %d times, want 1", len(ing.calls)) + } + if ing.calls[0].eventType != "push" || ing.calls[0].deliveryID != "deliv-1" { + t.Errorf("Ingest call = %+v, want push/deliv-1", ing.calls[0]) + } +} + +// TestSubscriber_CreateHookMissingWsURL ensures a confused relay response is +// surfaced as a clear error rather than dialing an empty URL. +func TestSubscriber_CreateHookMissingWsURL(t *testing.T) { + t.Parallel() + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"id": 7, "url": "https://api.example/hooks/7"}`)) + })) + defer srv.Close() + + sub, err := NewSubscriber(Config{ + Token: "tok", + Repo: "owner/repo", + Events: []string{"*"}, + }, &fakeIngester{}) + if err != nil { + t.Fatalf("NewSubscriber: %v", err) + } + sub.apiBaseOverride = srv.URL + sub.httpClient = srv.Client() + + _, err = sub.createHook(context.Background()) + if err == nil || !strings.Contains(err.Error(), "ws_url") { + t.Fatalf("createHook err = %v, want message about ws_url", err) + } +} + +// TestConfig_apiBase covers the three GitHub deployment shapes the +// subscriber needs to talk to: public github.com, GHE.com data residency +// subdomains, and GHES instances on arbitrary hostnames. +func TestConfig_apiBase(t *testing.T) { + t.Parallel() + + cases := []struct { + host string + want string + }{ + {"github.com", "https://api.github.com"}, + {"octocorp.ghe.com", "https://api.octocorp.ghe.com"}, + {"another-tenant.ghe.com", "https://api.another-tenant.ghe.com"}, + {"ghes.example.com", "https://ghes.example.com/api/v3"}, + } + for _, tc := range cases { + c := Config{Host: tc.host} + if got := c.apiBase(); got != tc.want { + t.Errorf("apiBase(%q) = %q, want %q", tc.host, got, tc.want) + } + } +} + +// hookHostHolder lets the create-hook handler embed the test server's host + +// in its ws_url response without a package-level race. +type hostHolder struct { + mu sync.Mutex + host string +} + +func (h *hostHolder) set(s string) { h.mu.Lock(); h.host = s; h.mu.Unlock() } +func (h *hostHolder) get() string { h.mu.Lock(); defer h.mu.Unlock(); return h.host } + +var hookHostHolder = &hostHolder{} + +func writeHookResponse(t *testing.T, w http.ResponseWriter, fallbackHost string) { + t.Helper() + host := hookHostHolder.get() + if host == "" { + host = fallbackHost + } + u := url.URL{Scheme: "ws", Host: host, Path: "/ws"} + hookURL := "http://" + host + "/repos/owner/repo/hooks/1" + resp := map[string]any{ + "id": 1, + "url": hookURL, + "ws_url": u.String(), + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) +} diff --git a/models/models.go b/models/models.go index 97ec6dd..3b2c018 100644 --- a/models/models.go +++ b/models/models.go @@ -152,19 +152,19 @@ type FailingJob struct { // FailureAnalytics contains summary failure metrics. type FailureAnalytics struct { - TotalCompleted int `json:"total_completed"` - TotalFailed int `json:"total_failed"` - TotalCancelled int `json:"total_cancelled"` - FailureRate float64 `json:"failure_rate"` - TopFailingJobs []FailingJob `json:"top_failing_jobs"` + TotalCompleted int `json:"total_completed"` + TotalFailed int `json:"total_failed"` + TotalCancelled int `json:"total_cancelled"` + FailureRate float64 `json:"failure_rate"` + TopFailingJobs []FailingJob `json:"top_failing_jobs"` } // FailureTrendPoint represents failure counts at a point in time. type FailureTrendPoint struct { - Timestamp int64 `json:"timestamp"` - Failures int `json:"failures"` - Successes int `json:"successes"` - Cancelled int `json:"cancelled"` + Timestamp int64 `json:"timestamp"` + Failures int `json:"failures"` + Successes int `json:"successes"` + Cancelled int `json:"cancelled"` } // LabelDemandSummary represents aggregate demand stats for a single runner label. diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 2286dd3..3cc7dc4 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -142,25 +142,25 @@ func (r *Registry) ResetJobsByLabel() { // once at startup. Errors from duplicate registration are ignored so tests // that re-init the global registry do not panic. func (r *Registry) RegisterIngestQueueDepth(depth func() float64) { -g := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ -Name: "live_actions_ingest_queue_depth", -Help: "Current number of events buffered in the in-memory ingest queue", -}, depth) -_ = prometheus.Register(g) + g := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "live_actions_ingest_queue_depth", + Help: "Current number of events buffered in the in-memory ingest queue", + }, depth) + _ = prometheus.Register(g) } // RegisterSSESubscribers registers a GaugeFunc that reports the number of // connected SSE clients by calling the supplied closure on every scrape. func (r *Registry) RegisterSSESubscribers(count func() float64) { -g := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ -Name: "live_actions_sse_subscribers", -Help: "Current number of connected SSE clients", -}, count) -_ = prometheus.Register(g) + g := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "live_actions_sse_subscribers", + Help: "Current number of connected SSE clients", + }, count) + _ = prometheus.Register(g) } // RegisterDBStats wires the standard database/sql DBStats collector for the // supplied pool, namespaced by the given pool name (e.g. "write", "read"). func (r *Registry) RegisterDBStats(name string, db *sql.DB) { -_ = prometheus.Register(collectors.NewDBStatsCollector(db, name)) + _ = prometheus.Register(collectors.NewDBStatsCollector(db, name)) }