Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gateixeira/live-actions/internal/middleware"
"github.com/gateixeira/live-actions/internal/services"
"github.com/gateixeira/live-actions/pkg/logger"
pkgmetrics "github.com/gateixeira/live-actions/pkg/metrics"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -42,31 +43,48 @@ func SetupAndRun(staticFS embed.FS) {
}
}

sqlDB, err := database.InitDB(dbPath)
writeDB, readDB, err := database.InitDB(dbPath)
if err != nil {
logger.Logger.Error("Failed to initialize database", zap.Error(err))
os.Exit(1)
}

defer func() {
if err := sqlDB.Close(); err != nil {
logger.Logger.Error("Failed to close database connection", zap.Error(err))
if err := writeDB.Close(); err != nil {
logger.Logger.Error("Failed to close database write connection", zap.Error(err))
}
if err := readDB.Close(); err != nil {
logger.Logger.Error("Failed to close database read connection", zap.Error(err))
}
}()

db := database.NewDBWrapper(sqlDB)
db := database.NewDBWrapper(writeDB, readDB)

ctx := context.Background()

cleanupService := services.NewCleanupService(cfg, db, ctx)
metricsService := services.NewMetricsUpdateService(db, 10*time.Second, ctx)
metricsService := services.NewMetricsUpdateService(db, 2*time.Second, ctx)

handlers.InitSSEHandler()
sseHandler := handlers.GetSSEHandler()
webhookHandler := handlers.NewWebhookHandler(cfg, db)
apiHandler := handlers.NewAPIHandler(cfg, db)
metricsHandler := handlers.NewMetricsHandler()

// Register dynamic Prometheus collectors now that pools and services exist.
pmReg := pkgmetrics.GetRegistry()
pmReg.RegisterDBStats("write", writeDB)
pmReg.RegisterDBStats("read", readDB)
if os := webhookHandler.OrderingService(); os != nil {
pmReg.IngestQueueCapacity.Set(float64(os.IngestQueueCap()))
pmReg.RegisterIngestQueueDepth(func() float64 {
return float64(os.IngestQueueLen())
})
}
pmReg.RegisterSSESubscribers(func() float64 {
return float64(sseHandler.SubscriberCount())
})

r := gin.New()

r.Use(middleware.ErrorHandler())
Expand Down Expand Up @@ -101,6 +119,7 @@ func SetupAndRun(staticFS embed.FS) {
r.GET("/healthz", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
r.GET("/readyz", handlers.ReadyzHandler(writeDB, readDB, webhookHandler.OrderingService()))

// Serve the React SPA for all other routes
indexHTML, err := fs.ReadFile(staticFS, "frontend/dist/index.html")
Expand Down
68 changes: 53 additions & 15 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { useState, useEffect, useCallback, useMemo } from 'react'
import { useState, useEffect, useCallback, useMemo, useRef } from 'react'
import { Search, ChevronDown } from 'lucide-react'
import { MetricsCards } from './components/MetricsCards'
import { DemandChart } from './components/DemandChart'
Expand All @@ -7,6 +7,8 @@ import { FailureAnalytics } from './components/FailureAnalytics'
import { LabelDemand } from './components/LabelDemand'
import { Sidebar } from './components/Sidebar'
import { useSSE } from './hooks/useSSE'
import { useThrottledLive } from './hooks/useThrottledLive'
import { usePausableLive } from './hooks/usePausableLive'
import { getMetrics, getRepositories, initCsrf } from './api/client'
import type { MetricsResponse, Period } from './api/types'

Expand Down Expand Up @@ -34,15 +36,40 @@ export default function App() {
const [activePage, setActivePage] = useState<Page>('dashboard')
const [period, setPeriod] = useState<Period>('day')
const [metricsData, setMetricsData] = useState<MetricsResponse | null>(null)
const [liveRunning, setLiveRunning] = useState<number | null>(null)
const [liveQueued, setLiveQueued] = useState<number | null>(null)
const [ready, setReady] = useState(false)
const [selectedRepo, setSelectedRepo] = useState('')
const [selectedStatus, setSelectedStatus] = useState('')
const [repos, setRepos] = useState<string[]>([])
const [repoSearchOpen, setRepoSearchOpen] = useState(false)
const [repoSearch, setRepoSearch] = useState('')

// Pause UI updates while the user is interacting with the dashboard
// content (hovering, focusing inputs, expanding rows, etc.). Updates
// continue to be captured in the background; they are committed when
// interaction ends or on the next 30s tick.
const [interacting, setInteracting] = useState(false)

// Refresh model:
// - SSE is the single source of truth for the live `running`/`queued`
// counters; updates land on the cards as soon as they arrive (paused
// while the user is interacting with the table to avoid jitter).
// - The 30s REST poll only refreshes the chart history and the
// aggregate metrics (avg_queue_time / avg_run_time / peak_demand). Its
// `current_metrics.running_jobs/queued_jobs` is used solely as the
// initial seed before the first SSE event arrives.
// - WorkflowTable refetches on workflow_update, but throttled to 30s so
// a burst of events does not trigger a refetch storm.
const [liveRunning, setLiveRunningLatest] = usePausableLive<number | null>(null, {
paused: interacting,
})
const [liveQueued, setLiveQueuedLatest] = usePausableLive<number | null>(null, {
paused: interacting,
})
const [workflowRefresh, bumpWorkflowRefresh] = useThrottledLive<number>(0, {
paused: interacting,
})
const workflowRefreshTickRef = useRef(0)

useEffect(() => {
initCsrf().then(() => setReady(true))
}, [])
Expand Down Expand Up @@ -70,15 +97,14 @@ export default function App() {
return () => clearInterval(interval)
}, [period, loadMetrics, ready])

const [workflowRefresh, setWorkflowRefresh] = useState(0)

const { connected } = useSSE({
onMetricsUpdate: (data) => {
setLiveRunning(data.running_jobs)
setLiveQueued(data.queued_jobs)
setLiveRunningLatest(data.running_jobs)
setLiveQueuedLatest(data.queued_jobs)
},
onWorkflowUpdate: () => {
setWorkflowRefresh((r) => r + 1)
workflowRefreshTickRef.current += 1
bumpWorkflowRefresh(workflowRefreshTickRef.current)
},
})

Expand Down Expand Up @@ -187,13 +213,25 @@ export default function App() {
loadMetrics(p)
}}
/>
<WorkflowTable
key={`${selectedRepo}:${selectedStatus}`}
ready={ready}
refreshSignal={workflowRefresh}
repo={selectedRepo}
status={selectedStatus}
/>
<div
onMouseEnter={() => setInteracting(true)}
onMouseLeave={() => setInteracting(false)}
onFocusCapture={() => setInteracting(true)}
onBlurCapture={(e) => {
// Only un-pause when focus leaves the table region entirely.
if (!e.currentTarget.contains(e.relatedTarget as Node | null)) {
setInteracting(false)
}
}}
>
<WorkflowTable
key={`${selectedRepo}:${selectedStatus}`}
ready={ready}
refreshSignal={workflowRefresh}
repo={selectedRepo}
status={selectedStatus}
/>
</div>
</div>
)}

Expand Down
52 changes: 52 additions & 0 deletions frontend/src/hooks/usePausableLive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { useCallback, useEffect, useRef, useState } from 'react'

interface Options {
/**
* When true, writes are buffered and not committed to React state until
* `paused` flips back to false. While unpaused, writes are committed
* synchronously (no throttling).
*/
paused?: boolean
}

/**
* usePausableLive exposes a `[committed, setLatest]` pair where `setLatest`
* commits to React state immediately, EXCEPT while `paused` is true. While
* paused, the latest value is buffered in a ref; it is committed when `paused`
* flips back to false.
*
* This is used for high-frequency SSE-driven counters that should feel "live"
* but must not cause flicker while the user is actively interacting with the
* page (hovering rows, focusing inputs, etc.).
*/
export function usePausableLive<T>(
initial: T,
opts: Options = {},
): [T, (v: T) => void] {
const { paused = false } = opts
const [committed, setCommitted] = useState<T>(initial)
const latestRef = useRef<T>(initial)
const hasBufferedRef = useRef(false)
const pausedRef = useRef(paused)

useEffect(() => {
pausedRef.current = paused
Comment on lines +31 to +33
if (!paused && hasBufferedRef.current) {
hasBufferedRef.current = false
setCommitted((prev) =>
Object.is(latestRef.current, prev) ? prev : latestRef.current,
)
}
}, [paused])

const setLatest = useCallback((v: T) => {
latestRef.current = v
if (pausedRef.current) {
hasBufferedRef.current = true
return
}
setCommitted((prev) => (Object.is(v, prev) ? prev : v))
}, [])

return [committed, setLatest]
}
58 changes: 58 additions & 0 deletions frontend/src/hooks/useThrottledLive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { useCallback, useEffect, useRef, useState } from 'react'

interface Options {
/** Commit interval in milliseconds. Defaults to 30s. */
intervalMs?: number
/** When true, commits are deferred until it becomes false again. */
paused?: boolean
}

/**
* useThrottledLive buffers high-frequency updates in a ref and only commits
* them to React state every `intervalMs`. While `paused` is true, commits are
* deferred so the UI stays stable while the user is interacting with the page.
* When `paused` flips back to false, any buffered value is committed
* immediately.
*
* Calling the returned setter does NOT trigger a re-render — it only updates
* the buffered value. Re-renders happen on the timer tick (when the buffered
* value differs from the committed one) or on resume.
*
* Returns a tuple of `[committedValue, setLatest]`.
*/
export function useThrottledLive<T>(
initial: T,
opts: Options = {},
): [T, (v: T) => void] {
const { intervalMs = 30_000, paused = false } = opts
const [committed, setCommitted] = useState<T>(initial)
const latestRef = useRef<T>(initial)
const pausedRef = useRef(paused)

useEffect(() => {
pausedRef.current = paused
Comment on lines +31 to +33
if (!paused) {
// On resume, commit immediately if the buffered value differs.
setCommitted((prev) =>
Object.is(latestRef.current, prev) ? prev : latestRef.current,
)
}
}, [paused])

useEffect(() => {
const id = setInterval(() => {
if (pausedRef.current) return
setCommitted((prev) =>
Object.is(latestRef.current, prev) ? prev : latestRef.current,
)
}, intervalMs)
return () => clearInterval(id)
}, [intervalMs])

const setLatest = useCallback((v: T) => {
latestRef.current = v
}, [])

return [committed, setLatest]
}

1 change: 1 addition & 0 deletions handlers/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

func setupAPITest() (*gin.Engine, *database.MockDatabase, *config.Config) {
stopLeakedSSECoalescer()
// Initialize logger for tests
logger.InitLogger("error")

Expand Down
7 changes: 7 additions & 0 deletions handlers/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@ func NewWebhookHandler(config *config.Config, db database.DatabaseInterface) *We
func (h *WebhookHandler) RegisterHandler(handler EventHandler) {
h.handlers[handler.GetEventType()] = handler
}

// OrderingService returns the underlying EventOrderingService so callers
// (e.g. the readiness handler and metrics registration) can inspect queue
// depth/capacity. May be nil before Start() / after Shutdown().
func (h *WebhookHandler) OrderingService() *services.EventOrderingService {
return h.orderingService
}
76 changes: 76 additions & 0 deletions handlers/health_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package handlers

import (
"context"
"database/sql"
"net/http"
"time"

"github.com/gin-gonic/gin"
)

// readyzQueueFullThreshold is the queue-utilisation point at which /readyz
// starts reporting NOT READY. We deliberately fail before the queue is 100%
// full so a load balancer can pull the instance out of rotation while there
// is still headroom for in-flight deliveries.
const readyzQueueFullThreshold = 0.95

// queueStatProvider is the minimal surface /readyz needs from
// EventOrderingService; defining it as an interface here keeps this package
// from importing internal/services and makes the handler easy to test.
type queueStatProvider interface {
IngestQueueLen() int
IngestQueueCap() int
}

// ReadyzHandler returns a gin handler that reports readiness to serve traffic.
// It pings both DB pools (with a short timeout) and refuses traffic if the
// in-memory ingest queue is essentially full. /healthz remains a simple
// process-liveness probe; /readyz is the right thing for load balancers.
func ReadyzHandler(writeDB, readDB *sql.DB, queue queueStatProvider) gin.HandlerFunc {
return func(c *gin.Context) {
ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second)
defer cancel()

checks := gin.H{}
ready := true

if writeDB != nil {
if err := writeDB.PingContext(ctx); err != nil {
ready = false
checks["write_db"] = "error: " + err.Error()
} else {
checks["write_db"] = "ok"
}
}
if readDB != nil {
if err := readDB.PingContext(ctx); err != nil {
ready = false
checks["read_db"] = "error: " + err.Error()
} else {
checks["read_db"] = "ok"
}
}

if queue != nil {
depth := queue.IngestQueueLen()
capacity := queue.IngestQueueCap()
checks["ingest_queue_depth"] = depth
checks["ingest_queue_capacity"] = capacity
if capacity > 0 && float64(depth)/float64(capacity) >= readyzQueueFullThreshold {
ready = false
checks["ingest_queue"] = "saturated"
} else {
checks["ingest_queue"] = "ok"
}
}

status := http.StatusOK
state := "ready"
if !ready {
status = http.StatusServiceUnavailable
state = "not_ready"
}
c.JSON(status, gin.H{"status": state, "checks": checks})
}
}
Loading