fix(apigateway): heartbeat, per-chunk persistence, configurable embed-job knobs#480
Merged
Merged
Conversation
…-job knobs (#479) The embed-job worker held a fixed 10-minute lease and persisted all chunks in one final atomic write. On a CPU-only embedder, a 148-op spec ran ~10.5 min per attempt, hit the reaper's lease ceiling at the last chunk, threw away the prior chunks' work, and retried from scratch forever, silently rendering "queued" in the UI. This change: - Adds Store.RenewLease + a worker heartbeat goroutine that re-stamps lease_expires_at every lease_duration/3 while Compute runs, so a slow embed pass no longer looks abandoned. - Removes the worker's local LeaseDuration+30s ctx ceiling that silently defeated the heartbeat. drainQueue now uses a 1-hour processSafetyBound as the wall-clock backstop; the DB lease (heartbeat-maintained) is the authoritative deadline for normal operation. - Threads a PersistBatch callback through ComputeOperationEmbeddings and embedInBatchesIter so each chunk's rows hit api_catalog_operation_embeddings via a new UpsertOperationEmbeddingsBatch (INSERT ON CONFLICT DO UPDATE, preserves rows outside the batch). The final atomic Upsert at job completion still does the canonical full replacement. - Adds apigateway.embed_jobs.batch_size and lease_duration as config knobs with sensible defaults (32, 10m) and a startup warning when embed_timeout >= lease_duration. - Updates EmbeddingStatusBadge to distinguish a pending job with attempts > 0 as "retrying (N tries)" with the last error in the tooltip, the silent-failure mode that hid the doom loop. Includes an end-to-end integration test that proves a mid-job failure preserves prior chunks across attempts, plus a regression gate (TestWorker_HeartbeatLetsComputeOutlastLeaseDuration) that fails if a future refactor reintroduces a ctx ceiling tied to LeaseDuration. Deferred (pre-existing, not introduced by this PR): catalog.ErrNotFound returned by Upsert / UpsertBatch is wrapped by the persister and retried up to MaxAttempts instead of terminating immediately. The behavior matches the pre-existing UpsertOperationEmbeddings path and a deleted-mid-job spec costs ~155s of retry backoff before failing; worth a separate change to translate to a terminal sentinel.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #480 +/- ##
==========================================
- Coverage 86.21% 86.19% -0.03%
==========================================
Files 237 237
Lines 32792 32969 +177
==========================================
+ Hits 28272 28416 +144
- Misses 3272 3296 +24
- Partials 1248 1257 +9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #479.
Summary
Closes the three problems #479 documented:
apigateway.embed_jobs.batch_size(default 32) andapigateway.embed_jobs.lease_duration(default 10m) are now operator-tunable. Startup logs a warning whenembed_timeout >= lease_duration.lease_duration / 3whileComputeis running, and each completed chunk is written toapi_catalog_operation_embeddingsimmediately via a new additiveUpsertOperationEmbeddingsBatch. A job that fails on chunk N now leaves chunks 0..N-1 persisted for the next attempt's dedup pass.attempts > 0renders in the catalog UI asretrying (N tries)with the last error in the tooltip, instead of indistinguishablequeued. The reproduction case (a 148-op spec that doom-looped through 6 attempts with no visible signal) is the failure mode this badge change closes.Reproduction this fixes
Upload a ~150-op OpenAPI spec to a deployment using CPU-only Ollama embeddings. Before this PR: each attempt runs ~10m30s, hits the worker's
LeaseDuration + 30s gracectx ceiling at the last chunk, throws away the prior chunks' work, retries from scratch, repeats forever. UI showsqueuedthrough every attempt.After this PR: the heartbeat keeps the lease alive past
lease_durationwhileComputeis genuinely progressing; per-chunk persistence preserves work across retries; the badge surfaces retry state so a doom loop is visible at a glance.Architecture changes
Backend
embedjobs.Store.RenewLease(ctx, id, workerID, duration)added; Postgres impl issuesUPDATE ... SET lease_expires_at = NOW() + interval WHERE id AND worker_id AND status='running', returnsErrNotFoundon lease rotation.embedjobs.NewPostgresStoreacceptsWithLeaseDuration(d)and exposesLeaseDuration().embedjobs.WorkerConfiggainsLeaseDurationandBatchSize; defaults areDefaultLeaseDuration = 10mandDefaultEmbedBatchSize = 32.process()that renews the lease whileComputeruns. Stops on context cancel orErrNotFound(lease rotated to another worker).drainQueue's per-iteration ctx ceiling changed fromLeaseDuration + 30s graceto a 1-hourprocessSafetyBound. The earlier ceiling silently defeated the heartbeat (DB lease alive but local ctx canceled at the lease window). The DB lease, heartbeat-maintained, is now the authoritative deadline; the 1-hour bound is a wall-clock backstop against a Compute that hangs without forward progress.EmbeddingComputer.Computenow takes aComputeRequeststruct (BatchSize + PersistBatch callback added).ComputeOperationEmbeddingstakes aComputeRequestand threadsBatchSize+PersistBatchthroughfillFreshEmbeddingsinto a newembedInBatchesIter(per-chunk callback variant ofembedInBatches).catalog.Store.UpsertOperationEmbeddingsBatchadded:INSERT ... ON CONFLICT (catalog_id, spec_name, operation_id) DO UPDATE. Preserves rows outside the batch. Memory and Postgres backends both implemented.UpsertOperationEmbeddingsat job completion still runs as the canonical full replacement, so removed operations are cleaned up.UI
EmbeddingStatusBadgeinui/src/pages/settings/CatalogsPanel.tsxadds a sixth state. Whenjob_status === "pending" && job_attempts > 0, rendersretrying (N tries)with the last error in the tooltip.Config
Tests
New unit tests:
TestWorker_HeartbeatRenewsLeaseWhileComputing— heartbeat fires during a long Compute.TestWorker_HeartbeatStopsAfterCompute— goroutine exits on the deferred cancel.TestWorker_HeartbeatLetsComputeOutlastLeaseDuration— regression gate against a future ctx ceiling tied toLeaseDuration.TestWorker_PersistBatchForwardsToUpsertBatch— PersistBatch callback reachesPersister.UpsertBatch.TestWorker_PersistBatchErrorFailsCompute— DB error during incremental persist surfaces as a retryable failure.TestWorker_BatchSizeFlowsToComputeRequest— config flows fromWorkerConfig.BatchSizetoComputeRequest.BatchSize.TestNewWorker_DefaultsLeaseAndBatchSize— defaults apply when caller omits both.TestRenewLease_HappyPath/_NotFoundOnLeaseRotation/_DBErrorWrapped/_NonPositiveDurationFallsBackToConfigured— Postgres semantics.TestWithLeaseDuration_StampsConfiguredValueOnClaim/_NonPositiveKeepsDefault— option plumbing.TestMemoryStore_UpsertOperationEmbeddingsBatch_AdditiveSemantics/_UpdatesExisting/_WithoutSpec— memory backend additive contract.TestUpsertOperationEmbeddingsBatch_OnConflictUpdates/_EmptyRowsShortCircuits/_FKViolation_ReturnsNotFound/_CommitError— Postgres backend.Integration test:
TestWorker_IntegrationResumeAfterMidJobFailure— wires the real Worker against a fake queue and an in-memory persister, drives a 3-chunk job that fails on chunk 1 on attempt 1 and succeeds on attempt 2. Asserts all rows are present in the persister after the retry, proving prior chunks survived the failure.make verifypasses (fmt, race tests, ≥80% coverage, golangci-lint with--new-from-rev, gosec, govulncheck, semgrep, dead-code, mutation ≥60%, doc-check, release-check).Deferred (pre-existing, not introduced here)
catalog.ErrNotFoundreturned byUpsertand the newUpsertBatchis wrapped by the persister withfmt.Errorf("catalogEmbeddingPersister: %w", err)and retried up toMaxAttemptsinstead of failing terminally. The existingUpsertOperationEmbeddingspath has the same behavior and predates this PR. A deleted-mid-job spec costs ~155s of retry backoff before moving to failed. Worth a separate change to translate to a terminal sentinel.Operator notes
Existing deployments with
embed_timeoutalready configured but nolease_durationget the 10m default for the lease; the startup warning will fire ifembed_timeout >= 10m. Recommendation for CPU-only Ollama on large specs:Test plan
lease_durationin the deployment configmap; rollN/M indexedretrying (N tries)with the upstream error in the tooltip