diff --git a/.claude/skills/kagent-dev/references/database-migrations.md b/.claude/skills/kagent-dev/references/database-migrations.md index d63aa498bd..1da8b00cf1 100644 --- a/.claude/skills/kagent-dev/references/database-migrations.md +++ b/.claude/skills/kagent-dev/references/database-migrations.md @@ -133,7 +133,7 @@ Files must follow `NNNNNN_description.up.sql` / `NNNNNN_description.down.sql` wi Every `.up.sql` must have a corresponding `.down.sql` that exactly reverses it. Down migrations are used for rollbacks and by automatic rollback on migration failure. They must be **idempotent** — the two-track rollback logic (roll back core if vector fails) may call them more than once in failure scenarios. -A down file that never runs is a down file you cannot trust. There are no up-only migrations — a working down has shipped with every migration since the golang-migrate adoption. Exercising every migration up → down → up against the real migration set, to prove the reversal rather than assume it, is a *Target — not yet enforced* (see [Upgrade and rollback testing](#upgrade-and-rollback-testing)). +A down file that never runs is a down file you cannot trust. There are no up-only migrations — a working down has shipped with every migration since the golang-migrate adoption. The reversal is proven, not assumed: the upgrade round-trip applies `HEAD`'s migrations over a prior release and then reverses them back, asserting the reverted schema matches a clean install of that release and that seeded data survives (see [Upgrade and rollback testing](#upgrade-and-rollback-testing)). ## One Linear History @@ -205,11 +205,11 @@ These tests catch policy violations at PR time without needing a running databas ## Upgrade and rollback testing -Static analysis covers file *content*; round-trip tests cover *behavior* against a real Postgres. Beyond `runner_test.go` (rollback and concurrency), two release-to-release tests make the rollback promise real. Both are *Target — not yet enforced*. +Static analysis covers file *content*; round-trip tests cover *behavior* against a real Postgres. Beyond `runner_test.go` (rollback and concurrency), release-to-release tests make the rollback promise real. -**Previous-minor round-trip.** Seed a database at the previous minor's latest release with representative data, apply migrations up to `HEAD`, and assert the schema matches a clean `HEAD` install and the data survives; then reverse to the previous minor and assert the schema matches a clean previous-minor install and the data survives. This exercises every changed down file rather than only reviewing it. +**Previous-release round-trip** (enforced by `TestUpgrade`, run by the `upgrade-tests` CI job). Seed a database at a prior release with representative data, apply migrations up to `HEAD`, and assert the controller rolls out without crashing, the schema matches a clean `HEAD` install, and the data survives; then reverse the migrations back to the prior release and assert the schema matches a clean install of that release and the data survives. It runs against two prior versions — the latest release reachable from `HEAD` and the previous stable line's latest patch (the `release/vX.Y.x` tip) — and `TestRollingUpgradeCompatibility` (the `rolling-upgrade-tests` job) additionally exercises the old-code/new-schema window, with the prior release's controller serving while `HEAD`'s migrations are applied. -**Query-level backward compatibility.** Run the previous minor's database test suite against a `HEAD`-migrated schema, proving old code's queries run against the newer schema — the exact property [ahead-schema tolerance](#rollback-and-ahead-schema-tolerance) relies on. +**Query-level backward compatibility** (*Target — not yet enforced*). Run the previous minor's database test suite against a `HEAD`-migrated schema, proving old code's queries run against the newer schema — the exact property [ahead-schema tolerance](#rollback-and-ahead-schema-tolerance) relies on. ## Downstream Extension Model diff --git a/.github/actions/upgrade-test-setup/action.yaml b/.github/actions/upgrade-test-setup/action.yaml new file mode 100644 index 0000000000..9ff866fb8a --- /dev/null +++ b/.github/actions/upgrade-test-setup/action.yaml @@ -0,0 +1,94 @@ +name: Upgrade Test Setup + +description: >- + Shared prelude for the upgrade-tests and rolling-upgrade-tests jobs: resolve + the upgrade-from version (with the prev-stable == adjacent skip) and bring up + the build/cluster toolchain. Exposes the resolved version and skip flag so the + caller can gate its test step. The caller MUST run actions/checkout (with + fetch-depth: 0 + fetch-tags) before this action — a local action is loaded + from the checked-out workspace and the version resolvers need full history. + +inputs: + upgrade-from: + description: 'Which release to upgrade from: "adjacent" or "prev-stable".' + required: true + buildx-builder-name: + description: Name for the docker/setup-buildx-action builder. + required: true + buildx-version: + description: Buildx version for docker/setup-buildx-action. + required: true + +outputs: + skip: + description: '"true" when this leg is redundant (prev-stable == adjacent) and the caller should skip its test step.' + value: ${{ steps.resolve.outputs.skip }} + version: + description: The resolved upgrade-from version (empty when skip is true). + value: ${{ steps.resolve.outputs.version }} + +runs: + using: "composite" + steps: + # The caller must run actions/checkout (fetch-depth: 0 + fetch-tags) before + # this action: a local action is loaded from the checked-out workspace, so + # its files don't exist on the runner until checkout has run. + - name: Resolve upgrade-from version + id: resolve + shell: bash + run: | + ADJ="$(./scripts/upgrade-from-version.sh)" + if [ "${{ inputs.upgrade-from }}" = "prev-stable" ]; then + V="$(./scripts/prev-stable-version.sh)" + if [ -z "$V" ]; then + echo "no stable line below the current line; skipping prev-stable leg." + echo "skip=true" >> "$GITHUB_OUTPUT" + exit 0 + fi + if [ "$V" = "$ADJ" ]; then + echo "prev-stable ($V) == adjacent; skipping (covered by the adjacent leg)." + echo "skip=true" >> "$GITHUB_OUTPUT" + exit 0 + fi + else + V="$ADJ" + fi + echo "Upgrade-from target: $V" + echo "version=$V" >> "$GITHUB_OUTPUT" + - name: Initialize Environment + if: steps.resolve.outputs.skip != 'true' + uses: ./.github/actions/initialize-environment + - name: Allow unprivileged user namespaces + if: steps.resolve.outputs.skip != 'true' + shell: bash + run: | + sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0 || true + - name: Set up QEMU + if: steps.resolve.outputs.skip != 'true' + uses: docker/setup-qemu-action@v4 + with: + platforms: linux/amd64,linux/arm64 + - name: Set up Docker Buildx + if: steps.resolve.outputs.skip != 'true' + uses: docker/setup-buildx-action@v4 + with: + name: ${{ inputs.buildx-builder-name }} + version: ${{ inputs.buildx-version }} + platforms: linux/amd64,linux/arm64 + use: "true" + driver-opts: network=host + - name: Set up Helm + if: steps.resolve.outputs.skip != 'true' + uses: azure/setup-helm@v5.0.0 + with: + version: v3.18.0 + - name: Install Kind + if: steps.resolve.outputs.skip != 'true' + uses: helm/kind-action@ef37e7f390d99f746eb8b610417061a60e82a6cc + with: + install_only: true + - name: Create Kind cluster + if: steps.resolve.outputs.skip != 'true' + shell: bash + run: | + make create-kind-cluster diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b8e3768d9a..e428dc1f20 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -160,6 +160,102 @@ jobs: echo "::error::Kubectl logs -n kagent deployment/kagent-controller" kubectl logs -n kagent deployment/kagent-controller + upgrade-tests: + needs: + - setup + env: + VERSION: v0.0.1-test + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + # adjacent: the latest release reachable from HEAD (upgrade-from-version.sh) + # prev-stable: the previous stable line's latest patch (prev-stable-version.sh) + upgrade-from: [adjacent, prev-stable] + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + # Full history + tags so the version resolvers can derive the + # upgrade-from release, and so the local action below is on disk. + fetch-depth: 0 + fetch-tags: true + - name: Prepare upgrade test environment + id: prep + uses: ./.github/actions/upgrade-test-setup + with: + upgrade-from: ${{ matrix.upgrade-from }} + buildx-builder-name: ${{ env.BUILDX_BUILDER_NAME }} + buildx-version: ${{ env.BUILDX_VERSION }} + - name: Run upgrade tests + if: steps.prep.outputs.skip != 'true' + env: + OPENAI_API_KEY: fake + BUILDX_BUILDER_NAME: ${{ env.BUILDX_BUILDER_NAME }} + KAGENT_HELM_EXTRA_ARGS: --cleanup-on-fail=false + DOCKER_BUILD_ARGS: >- + --cache-from=type=gha,scope=${{ needs.setup.outputs.cache-key }}-e2e + --cache-from=type=gha,scope=${{ env.CACHE_KEY_PREFIX }}-main-e2e + --platform=linux/amd64 + --push + run: | + make run-upgrade-tests UPGRADE_FROM_VERSION="${{ steps.prep.outputs.version }}" + - name: fail print info + if: failure() && steps.prep.outputs.skip != 'true' + run: | + echo "::error::Failed to run upgrade tests" + kubectl describe pods -n kagent + kubectl get events -n kagent + kubectl logs -n kagent deployment/kagent-controller || true + + rolling-upgrade-tests: + needs: + - setup + env: + VERSION: v0.0.1-test + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + # adjacent: the latest release reachable from HEAD (upgrade-from-version.sh) + # prev-stable: the previous stable line's latest patch (prev-stable-version.sh) + upgrade-from: [adjacent, prev-stable] + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + # Full history + tags so the version resolvers can derive the + # upgrade-from release, and so the local action below is on disk. + fetch-depth: 0 + fetch-tags: true + - name: Prepare upgrade test environment + id: prep + uses: ./.github/actions/upgrade-test-setup + with: + upgrade-from: ${{ matrix.upgrade-from }} + buildx-builder-name: ${{ env.BUILDX_BUILDER_NAME }} + buildx-version: ${{ env.BUILDX_VERSION }} + - name: Run rolling upgrade tests + if: steps.prep.outputs.skip != 'true' + env: + OPENAI_API_KEY: fake + BUILDX_BUILDER_NAME: ${{ env.BUILDX_BUILDER_NAME }} + KAGENT_HELM_EXTRA_ARGS: --cleanup-on-fail=false + DOCKER_BUILD_ARGS: >- + --cache-from=type=gha,scope=${{ needs.setup.outputs.cache-key }}-e2e + --cache-from=type=gha,scope=${{ env.CACHE_KEY_PREFIX }}-main-e2e + --platform=linux/amd64 + --push + run: | + make run-rolling-upgrade-tests UPGRADE_FROM_VERSION="${{ steps.prep.outputs.version }}" + - name: fail print info + if: failure() && steps.prep.outputs.skip != 'true' + run: | + echo "::error::Failed to run rolling upgrade tests" + kubectl describe pods -n kagent + kubectl get events -n kagent + kubectl logs -n kagent deployment/kagent-controller || true + go-unit-tests: runs-on: ubuntu-latest steps: diff --git a/Makefile b/Makefile index af3cfd6d98..e84e55ad56 100644 --- a/Makefile +++ b/Makefile @@ -460,6 +460,97 @@ helm-uninstall: ## Uninstall kagent and kagent-crds Helm releases from the kind helm uninstall kagent --namespace kagent --kube-context kind-$(KIND_CLUSTER_NAME) --wait helm uninstall kagent-crds --namespace kagent --kube-context kind-$(KIND_CLUSTER_NAME) --wait +# Upgrade test targets install the previous released kagent chart from the public +# OCI registry, build the current images, then run the e2e assertions in +# go/core/test/e2e/upgrade. The Go test performs the actual upgrade to the current +# build by invoking `make helm-install-provider`. UPGRADE_FROM_VERSION defaults to +# the latest release reachable from HEAD (scripts/upgrade-from-version.sh); CI runs +# this against two targets via a matrix — that adjacent release and the previous +# stable line's latest patch (scripts/prev-stable-version.sh) — and you can pin +# either locally, e.g. `UPGRADE_FROM_VERSION=$$(./scripts/prev-stable-version.sh)`. +# The previous install pins the bundled Postgres image to whatever the +# upgrade-from release's own install target shipped (see PREV_DB_SET_FLAGS), so +# the baseline matches how that release actually runs rather than a hardcoded +# guess; the upgrade then exercises the real app/migration (and any DB image) +# change between that release and the current build. +# +# Prerequisite (provided by CI as a separate step; run it locally first): a kind +# cluster (make create-kind-cluster). agent-sandbox is not required — the +# controller tolerates the missing CRD and these tests create no SandboxAgents. +UPGRADE_FROM_VERSION ?= $(shell ./scripts/upgrade-from-version.sh) + +# The bundled-Postgres image is selected by the install target's --set flags, not +# by the chart defaults (the chart ships a non-vector image). So the previous +# install must use the exact pins the upgrade-from release shipped — otherwise the +# baseline DB would differ from how that release actually runs, and the upgrade +# would conflate a DB swap with the migration change under test. Read those flags +# straight from that release's own helm-install-provider target (via its tagged +# Makefile) instead of hardcoding values that drift as the bundled image changes. +# Assumes the flags are literal (no make/env variables); the guard in +# install-previous-release fails loudly if they can't be read. +PREV_DB_SET_FLAGS = $(shell git show v$(UPGRADE_FROM_VERSION):Makefile 2>/dev/null | \ + grep -oE '\-\-set[[:space:]]+database\.postgres\.[^[:space:]\\]+') + +.PHONY: install-previous-release +install-previous-release: ## Install the previous released kagent + kagent-crds charts from the public OCI registry + test -n "$(UPGRADE_FROM_VERSION)" || { echo "UPGRADE_FROM_VERSION is empty; set it explicitly or ensure git tags are fetched." >&2; exit 1; } + test -n "$(strip $(PREV_DB_SET_FLAGS))" || { echo "Could not read bundled-Postgres --set flags from v$(UPGRADE_FROM_VERSION):Makefile; the upgrade-from release's install target may have moved or renamed them." >&2; exit 1; } + @echo "=== Installing previous release: $(UPGRADE_FROM_VERSION) ===" + @echo " bundled-Postgres flags (from v$(UPGRADE_FROM_VERSION) install target): $(PREV_DB_SET_FLAGS)" + helm upgrade --install kagent-crds $(HELM_REPO)/kagent/helm/kagent-crds \ + --version $(UPGRADE_FROM_VERSION) \ + --namespace kagent --create-namespace \ + --kube-context kind-$(KIND_CLUSTER_NAME) \ + --timeout 5m --wait + helm upgrade --install kagent $(HELM_REPO)/kagent/helm/kagent \ + --version $(UPGRADE_FROM_VERSION) \ + --namespace kagent --create-namespace \ + --kube-context kind-$(KIND_CLUSTER_NAME) \ + --timeout 5m --wait \ + --set ui.service.type=LoadBalancer \ + --set controller.service.type=LoadBalancer \ + --set providers.default=openAI \ + --set providers.openAI.apiKey="$${OPENAI_API_KEY:-test}" \ + $(PREV_DB_SET_FLAGS) \ + $(UPGRADE_PREV_EXTRA_ARGS) + +# run-upgrade-tests installs the previous release, builds the current images, and +# runs the DB-layer upgrade scenario in TestUpgrade: seed -> upgrade -> controller +# rollout (no crash) -> data survival -> schema-equivalence (upgraded == clean +# install) -> reverse schema to target (down files) + data survival. +# Prerequisite (provided by CI as a separate step; run it locally first): a kind +# cluster (make create-kind-cluster). The controller tolerates the missing +# agent-sandbox CRD (the owned-resource watch is skipped), and these tests create +# no SandboxAgents, so agent-sandbox is not required. +.PHONY: run-upgrade-tests +run-upgrade-tests: ## Install the previous release, build current images, and run the upgrade test (migration round-trip) + test -n "$(UPGRADE_FROM_VERSION)" || { echo "UPGRADE_FROM_VERSION is empty; set it explicitly or ensure git tags are fetched." >&2; exit 1; } + $(MAKE) build + $(MAKE) install-previous-release + @echo "=== Upgrade test: $(UPGRADE_FROM_VERSION) -> $(VERSION) (registry=$(DOCKER_REGISTRY)) ===" + cd go && \ + RUN_UPGRADE_TESTS=true \ + UPGRADE_FROM_VERSION=$(UPGRADE_FROM_VERSION) \ + VERSION=$(VERSION) \ + DOCKER_REGISTRY=$(DOCKER_REGISTRY) \ + KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) \ + OPENAI_API_KEY="$${OPENAI_API_KEY:-test}" \ + go test ./core/test/e2e/upgrade -run TestUpgrade -count=1 -timeout=20m -v + +.PHONY: run-rolling-upgrade-tests +run-rolling-upgrade-tests: ## Install the previous release with 2 controller replicas, build the current images, and run the rolling upgrade e2e test + $(MAKE) build + $(MAKE) install-previous-release UPGRADE_PREV_EXTRA_ARGS="--set controller.replicas=2" + @echo "=== Rolling upgrade test: $(UPGRADE_FROM_VERSION) -> $(VERSION) (registry=$(DOCKER_REGISTRY)) ===" + cd go && \ + RUN_ROLLING_UPGRADE_TESTS=true \ + UPGRADE_FROM_VERSION=$(UPGRADE_FROM_VERSION) \ + VERSION=$(VERSION) \ + DOCKER_REGISTRY=$(DOCKER_REGISTRY) \ + KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) \ + OPENAI_API_KEY="$${OPENAI_API_KEY:-test}" \ + go test ./core/test/e2e/upgrade -run TestRollingUpgradeCompatibility -count=1 -timeout=20m -v + .PHONY: helm-publish helm-publish: ## Package and push all Helm charts to the OCI registry helm-publish: helm-version diff --git a/go/core/test/e2e/upgrade/rolling_upgrade_test.go b/go/core/test/e2e/upgrade/rolling_upgrade_test.go new file mode 100644 index 0000000000..fc223b7e37 --- /dev/null +++ b/go/core/test/e2e/upgrade/rolling_upgrade_test.go @@ -0,0 +1,166 @@ +package upgrade + +import ( + "context" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRollingUpgradeCompatibility(t *testing.T) { + if os.Getenv("RUN_ROLLING_UPGRADE_TESTS") != "true" { + t.Skip("set RUN_ROLLING_UPGRADE_TESTS=true to run rolling upgrade tests") + } + + env := loadUpgradeEnv(t) + targetCoreVersion := latestCoreMigrationVersion(t) + seed := fmt.Sprintf("%d", time.Now().UnixNano()) + baselineAgentID := "rolling-baseline-agent-" + seed + compatAgentID := "rolling-compat-agent-" + seed + compatUserID := "rolling-compat-user-" + seed + + t.Logf("rolling upgrade test: %s -> %s (registry=%s, kubeContext=%s)", + env.upgradeFromVersion, env.version, env.dockerRegistry, env.kubeContext) + + waitForReadyPods(t, env, postgresSelector, 3*time.Minute) + waitForPostgresAgentTable(t, env, 3*time.Minute) + + // This test is only meaningful when the target build has migrations the + // baseline has not already applied. Otherwise there is no old-code/new-schema + // compatibility window to exercise. + baselineState := pgMigrationState(t, env) + require.False(t, baselineState.dirty, "baseline Postgres migrations are dirty") + if baselineState.version >= targetCoreVersion { + t.Skipf("baseline migration version %d is already at target %d", baselineState.version, targetCoreVersion) + } + + // Keep multiple old controller pods around during the rollout. With a single + // replica the old-code/new-schema window can be too small to observe reliably. + kubectl(t, env, 2*time.Minute, + "scale", "deployment/kagent-controller", + "-n", env.namespace, + "--replicas=2", + ) + kubectl(t, env, 3*time.Minute, + "rollout", "status", "deployment/kagent-controller", + "-n", env.namespace, + "--timeout=3m", + ) + oldPods := podNamesForSelector(t, env, controllerSelector) + require.NotEmpty(t, oldPods, "expected old controller pods before rolling upgrade") + + // Seed with the baseline schema before the target controller applies new + // migrations. The compatibility canary below verifies this row is still + // readable while old pods are alive against the target schema. + pgExec(t, env, fmt.Sprintf("INSERT INTO agent (id, type) VALUES (%s, 'Deployment')", pgQuote(baselineAgentID))) + + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) + defer cancel() + done := make(chan upgradeResult, 1) + cmd := helmUpgradeCommand(ctx, env) + go func() { + out, err := cmd.CombinedOutput() + done <- upgradeResult{out: string(out), err: err} + }() + + // Wait until the target schema has landed while at least one previous-release + // controller pod is still ready. That is the rolling deploy hazard: old code + // can keep serving briefly after a new pod has applied migrations. Surface a + // helm upgrade failure immediately instead of letting it look like a + // schema-observation timeout. + var helmErr error + var helmOut string + require.Eventually(t, func() bool { + select { + case r := <-done: + if r.err != nil { + helmErr, helmOut = r.err, r.out + return true + } + // Helm finished successfully before we caught the window; put the + // result back so the final read below still sees it, then keep polling. + done <- r + default: + } + state := pgMigrationState(t, env) + return state.version == targetCoreVersion && + !state.dirty && + anyPodsReady(t, env, oldPods) + }, 6*time.Minute, 500*time.Millisecond, "target schema was not observed while old controller pods were still ready") + require.NoError(t, helmErr, "helm upgrade failed before target schema was observed:\n%s", helmOut) + + // These SQL canaries intentionally use the pre-upgrade column shape. They do + // not prove every previous-release code path works, but they catch migrations + // that break basic old read/write assumptions during a rolling deployment. + require.Equal(t, 1, + pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM agent WHERE id = %s AND type = 'Deployment'", pgQuote(baselineAgentID))), + "old-shape read failed after target schema was applied", + ) + pgExec(t, env, fmt.Sprintf("INSERT INTO agent (id, type) VALUES (%s, 'Deployment')", pgQuote(compatAgentID))) + pgExec(t, env, fmt.Sprintf("INSERT INTO feedback (user_id, feedback_text) VALUES (%s, 'rolling compatibility feedback')", pgQuote(compatUserID))) + require.Equal(t, 1, + pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM agent WHERE id = %s", pgQuote(compatAgentID))), + "old-shape agent write did not survive against target schema", + ) + require.Equal(t, 1, + pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM feedback WHERE user_id = %s", pgQuote(compatUserID))), + "old-shape feedback write did not survive against target schema", + ) + + result := <-done + require.NoError(t, result.err, "helm upgrade failed:\n%s", result.out) + + kubectl(t, env, 3*time.Minute, + "rollout", "status", "deployment/kagent-controller", + "-n", env.namespace, + "--timeout=3m", + ) + finalState := pgMigrationState(t, env) + require.False(t, finalState.dirty, "post-rollout Postgres migrations are dirty") + require.Equal(t, targetCoreVersion, finalState.version, "final migration version") +} + +type upgradeResult struct { + out string + err error +} + +func podNamesForSelector(t *testing.T, env upgradeEnv, selector string) []string { + t.Helper() + + out := kubectl(t, env, time.Minute, + "get", "pods", + "-n", env.namespace, + "-l", selector, + "-o", "jsonpath={range .items[*]}{.metadata.name}{\"\\n\"}{end}", + ) + lines := strings.Split(strings.TrimSpace(out), "\n") + pods := make([]string, 0, len(lines)) + for _, line := range lines { + pod := strings.TrimSpace(line) + if pod != "" { + pods = append(pods, pod) + } + } + return pods +} + +func anyPodsReady(t *testing.T, env upgradeEnv, pods []string) bool { + t.Helper() + + for _, pod := range pods { + out, err := kubectlOutput(t, env, 10*time.Second, + "get", "pod", pod, + "-n", env.namespace, + "-o", fmt.Sprintf("jsonpath={.status.containerStatuses[?(@.name==%q)].ready}", controllerContainer), + ) + if err == nil && strings.TrimSpace(out) == "true" { + return true + } + } + return false +} diff --git a/go/core/test/e2e/upgrade/roundtrip_test.go b/go/core/test/e2e/upgrade/roundtrip_test.go new file mode 100644 index 0000000000..012e1b66b6 --- /dev/null +++ b/go/core/test/e2e/upgrade/roundtrip_test.go @@ -0,0 +1,266 @@ +package upgrade + +import ( + "bufio" + "context" + "database/sql" + "errors" + "fmt" + "os/exec" + "regexp" + "strconv" + "strings" + "testing" + "time" + + "github.com/golang-migrate/migrate/v4" + migratepgx "github.com/golang-migrate/migrate/v4/database/pgx/v5" + "github.com/golang-migrate/migrate/v4/source/iofs" + _ "github.com/jackc/pgx/v5/stdlib" + migrations "github.com/kagent-dev/kagent/go/core/pkg/migrations" + "github.com/stretchr/testify/require" +) + +const postgresServiceName = "kagent-postgresql" + +// migrationTrack mirrors one migration source: the FS subdirectory it owns +// and its golang-migrate tracking table. Registration order is core, then +// vector; rollback reverses it (vector, then core) so a track is never reversed +// while a later-registered track still depends on its schema. +type migrationTrack struct { + name string + dir string + trackingTable string +} + +var migrationTracks = []migrationTrack{ + {name: "core", dir: "core", trackingTable: "schema_migrations"}, + {name: "vector", dir: "vector", trackingTable: "vector_schema_migrations"}, +} + +// pgTrackVersion returns the current applied version of a golang-migrate +// tracking table, or 0 when the table does not exist (e.g. a disabled track). +func pgTrackVersion(t *testing.T, env upgradeEnv, table string) int { + t.Helper() + + raw := pgQuery(t, env, fmt.Sprintf( + "SELECT CASE WHEN to_regclass('public.%s') IS NULL THEN 0 ELSE (SELECT COALESCE(MAX(version), 0) FROM public.%s) END", + table, table)) + return parseInt(t, raw, table+" version") +} + +// pgExecDB runs a statement against an arbitrary database on the bundled +// Postgres (pgExec always targets the kagent database). Used to create and drop +// the throwaway database that holds the clean-install reference schema. +func pgExecDB(t *testing.T, env upgradeEnv, database, query string) { + t.Helper() + + pod := podNameForSelector(t, env, postgresSelector) + kubectl(t, env, time.Minute, + "exec", "-n", env.namespace, pod, "-c", postgresContainer, "--", + "psql", "-v", "ON_ERROR_STOP=1", "-U", "kagent", "-d", database, "-tAc", query, + ) +} + +// pgSchemaDump returns the normalized schema-only dump of a database, suitable +// for structural equality comparison between a migrated and a clean install. +func pgSchemaDump(t *testing.T, env upgradeEnv, database string) string { + t.Helper() + + pod := podNameForSelector(t, env, postgresSelector) + out := kubectl(t, env, 2*time.Minute, + "exec", "-n", env.namespace, pod, "-c", postgresContainer, "--", + "pg_dump", "--schema-only", "--no-owner", "--no-privileges", + "-U", "kagent", "-d", database, + ) + return normalizeSchemaDump(out) +} + +// normalizeSchemaDump strips the non-structural noise from a pg_dump so two +// dumps of the same logical schema compare equal: comments, blank lines, the +// session-setup preamble (SET / SELECT pg_catalog.set_config), and psql +// meta-commands. The latter matters because pg_dump on PostgreSQL 17+ wraps the +// output in `\restrict`/`\unrestrict` lines carrying a per-invocation random +// token, so two dumps of an identical schema would otherwise never compare equal. +func normalizeSchemaDump(dump string) string { + lines := strings.Split(dump, "\n") + kept := make([]string, 0, len(lines)) + for _, line := range lines { + trimmed := strings.TrimRight(line, " \t") + if trimmed == "" { + continue + } + if strings.HasPrefix(trimmed, "--") { + continue + } + if strings.HasPrefix(trimmed, "SET ") { + continue + } + if strings.HasPrefix(trimmed, "SELECT pg_catalog.set_config") { + continue + } + // psql meta-commands (\restrict, \unrestrict, \connect): non-structural, + // and the restrict tokens are randomized per dump. + if strings.HasPrefix(trimmed, `\`) { + continue + } + kept = append(kept, trimmed) + } + return strings.Join(kept, "\n") +} + +// buildCleanInstallSchema provisions a throwaway database, applies the embedded +// migrations up to the latest version via the real RunUp code path, and +// returns its normalized schema. This is the "clean HEAD install" reference the +// design's round-trip gate compares an upgraded database against. +func buildCleanInstallSchema(t *testing.T, env upgradeEnv, dbName string, vectorEnabled bool) string { + t.Helper() + + pgExecDB(t, env, "kagent", fmt.Sprintf("DROP DATABASE IF EXISTS %s WITH (FORCE)", dbName)) + pgExecDB(t, env, "kagent", "CREATE DATABASE "+dbName) + // Best-effort drop with its own context: t.Context() is already canceled by + // the time cleanups run, so a normal kubectl call here would always error. + t.Cleanup(func() { dropDatabaseBestEffort(env, dbName) }) + + localPort, stop := startPortForward(t, env, postgresServiceName, 5432) + defer stop() + + url := fmt.Sprintf("postgres://kagent:kagent@127.0.0.1:%d/%s?sslmode=disable", localPort, dbName) + require.NoError(t, migrations.RunUp(url, migrations.FS, vectorEnabled), + "apply embedded migrations to clean reference database %s", dbName) + + return pgSchemaDump(t, env, dbName) +} + +// dropDatabaseBestEffort removes a scratch database, ignoring all errors. It +// uses its own background context so it still runs during test teardown, after +// t.Context() has been canceled, and never fails the test. +func dropDatabaseBestEffort(env upgradeEnv, database string) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + podOut, err := exec.CommandContext(ctx, "kubectl", + "--context", env.kubeContext, "get", "pods", + "-n", env.namespace, "-l", postgresSelector, + "-o", "jsonpath={.items[0].metadata.name}", + ).Output() + if err != nil { + return + } + pod := strings.TrimSpace(string(podOut)) + if pod == "" { + return + } + _ = exec.CommandContext(ctx, "kubectl", + "--context", env.kubeContext, "exec", "-n", env.namespace, pod, "-c", postgresContainer, "--", + "psql", "-U", "kagent", "-d", "kagent", "-tAc", + fmt.Sprintf("DROP DATABASE IF EXISTS %s WITH (FORCE)", database), + ).Run() +} + +// migrateTrackTo drives one track to a target version using the embedded +// migration files, standing in for `kagent db migrate goto` until that CLI +// exists. golang-migrate's Migrate moves up or down to the target; a target of +// 0 means roll the track all the way down. +func migrateTrackTo(t *testing.T, url string, track migrationTrack, target int) { + t.Helper() + + src, err := iofs.New(migrations.FS, track.dir) + require.NoError(t, err, "open embedded %s migrations", track.name) + + db, err := sql.Open("pgx", url) + require.NoError(t, err, "open db for %s track", track.name) + + driver, err := migratepgx.WithInstance(db, &migratepgx.Config{MigrationsTable: track.trackingTable}) + require.NoError(t, err, "build migrate driver for %s track", track.name) + + m, err := migrate.NewWithInstance("iofs", src, "pgx", driver) + require.NoError(t, err, "build migrator for %s track", track.name) + defer m.Close() + + if target == 0 { + err = m.Down() + } else { + err = m.Migrate(uint(target)) + } + if err != nil && !errors.Is(err, migrate.ErrNoChange) { + require.NoError(t, err, "migrate %s track to version %d", track.name, target) + } +} + +// scaleController scales the controller deployment and, for a scale to zero, +// waits until its pods are gone so a booting pod cannot re-apply migrations +// during a schema reversal (the design's scale-to-zero reversal recipe). +func scaleController(t *testing.T, env upgradeEnv, replicas int) { + t.Helper() + + kubectl(t, env, 2*time.Minute, + "scale", "deployment/kagent-controller", + "-n", env.namespace, + fmt.Sprintf("--replicas=%d", replicas), + ) + + if replicas == 0 { + require.Eventually(t, func() bool { + return len(podNamesForSelector(t, env, controllerSelector)) == 0 + }, 2*time.Minute, 2*time.Second, "controller pods did not terminate after scale to zero") + return + } + + kubectl(t, env, 3*time.Minute, + "rollout", "status", "deployment/kagent-controller", + "-n", env.namespace, + "--timeout=3m", + ) +} + +var forwardingPortRE = regexp.MustCompile(`Forwarding from 127\.0\.0\.1:(\d+)`) + +// startPortForward opens a kubectl port-forward to a service and returns the +// chosen local port plus a stop function. Used so the test process can drive +// golang-migrate against the in-cluster Postgres directly. +func startPortForward(t *testing.T, env upgradeEnv, service string, remotePort int) (int, func()) { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + cmd := exec.CommandContext(ctx, "kubectl", + "--context", env.kubeContext, + "port-forward", + "-n", env.namespace, + "svc/"+service, + fmt.Sprintf(":%d", remotePort), + ) + stdout, err := cmd.StdoutPipe() + require.NoError(t, err, "pipe port-forward stdout") + require.NoError(t, cmd.Start(), "start port-forward") + + stop := func() { + cancel() + _ = cmd.Wait() + } + + portCh := make(chan int, 1) + go func() { + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + if match := forwardingPortRE.FindStringSubmatch(scanner.Text()); match != nil { + p, convErr := strconv.Atoi(match[1]) + if convErr == nil { + select { + case portCh <- p: + default: + } + } + } + } + }() + + select { + case port := <-portCh: + return port, stop + case <-time.After(15 * time.Second): + stop() + t.Fatalf("port-forward to svc/%s did not become ready", service) + return 0, func() {} + } +} diff --git a/go/core/test/e2e/upgrade/upgrade_test.go b/go/core/test/e2e/upgrade/upgrade_test.go new file mode 100644 index 0000000000..f63b3dc2fa --- /dev/null +++ b/go/core/test/e2e/upgrade/upgrade_test.go @@ -0,0 +1,529 @@ +package upgrade + +import ( + "bytes" + "context" + "fmt" + "io/fs" + "os" + "os/exec" + "path/filepath" + "runtime" + "slices" + "strconv" + "strings" + "testing" + "time" + + migrations "github.com/kagent-dev/kagent/go/core/pkg/migrations" + "github.com/stretchr/testify/require" +) + +const ( + postgresSelector = "app.kubernetes.io/name=kagent,app.kubernetes.io/component=database" + controllerSelector = "app.kubernetes.io/name=kagent,app.kubernetes.io/component=controller" + + postgresContainer = "postgresql" + controllerContainer = "controller" +) + +type upgradeEnv struct { + repoRoot string + upgradeFromVersion string + version string + dockerRegistry string + kindClusterName string + namespace string + kubeContext string + openAIAPIKey string +} + +type postgresMigrationState struct { + version int + dirty bool +} + +func TestUpgrade(t *testing.T) { + if os.Getenv("RUN_UPGRADE_TESTS") != "true" { + t.Skip("set RUN_UPGRADE_TESTS=true to run upgrade tests") + } + + env := loadUpgradeEnv(t) + seed := fmt.Sprintf("%d", time.Now().UnixNano()) + seedAgentID := "upgrade-seed-agent-" + seed + seedUserID := "upgrade-seed-user-" + seed + seedSessionID := "upgrade-seed-session-" + seed + seedEventID := "upgrade-seed-event-" + seed + seedTaskID := "upgrade-seed-task-" + seed + seedPushID := "upgrade-seed-push-" + seed + seedToolID := "upgrade-seed-tool-" + seed + seedToolServerName := "upgrade-seed-toolserver-" + seed + seedGroupKind := "upgrade.seed/v1/Canary" + seedCanaryCounts := map[string]int{} + // The controller image embeds the migration files. Comparing the DB + // state to this version proves the upgraded pod actually applied the + // migration set shipped in the target build. + targetCoreVersion := latestCoreMigrationVersion(t) + + t.Logf("upgrade test: %s -> %s (registry=%s, kubeContext=%s)", + env.upgradeFromVersion, env.version, env.dockerRegistry, env.kubeContext) + + var pgBaselineState postgresMigrationState + var baselineVectorVersion int + // cleanTargetSchema is the previous release's freshly-installed schema. The + // rollback round-trip below asserts the reversed database matches it exactly. + var cleanTargetSchema string + // cleanHeadSchema is an independent clean install of the current build's + // migrations. The post-upgrade database must match it exactly. + var cleanHeadSchema string + + if !t.Run("seed baseline data before upgrade", func(t *testing.T) { + waitForReadyPods(t, env, postgresSelector, 3*time.Minute) + waitForPostgresAgentTable(t, env, 3*time.Minute) + + // A dirty baseline means a previous migration failed; continuing would + // make any post-upgrade failure ambiguous rather than a regression signal. + pgBaselineState = pgMigrationState(t, env) + require.False(t, pgBaselineState.dirty, "baseline Postgres migrations are dirty") + baselineVectorVersion = pgTrackVersion(t, env, "vector_schema_migrations") + t.Logf("baseline Postgres schema_migrations version: %d dirty=%t vector=%d (target=%d)", + pgBaselineState.version, pgBaselineState.dirty, baselineVectorVersion, targetCoreVersion) + + // Capture the clean target schema before any seeding or upgrade. The + // freshly-installed previous release is, by definition, a clean target + // install, so its schema is the reversal reference. schema-only dumps + // exclude row data, so seeding afterward does not perturb it. + cleanTargetSchema = pgSchemaDump(t, env, "kagent") + + // Seed a small cross-section of stable tables. These rows are not + // meant to validate every future migration's semantics; they are canaries + // for accidental table drops, destructive rewrites, and key/index changes + // that lose existing customer data during an upgrade. + pgExec(t, env, fmt.Sprintf("INSERT INTO agent (id, type) VALUES (%s, 'Deployment')", pgQuote(seedAgentID))) + pgExec(t, env, fmt.Sprintf("INSERT INTO session (id, user_id, name, agent_id, source) VALUES (%s, %s, 'upgrade canary session', %s, 'upgrade-test')", + pgQuote(seedSessionID), pgQuote(seedUserID), pgQuote(seedAgentID))) + pgExec(t, env, fmt.Sprintf("INSERT INTO event (id, user_id, session_id, data) VALUES (%s, %s, %s, '{}')", + pgQuote(seedEventID), pgQuote(seedUserID), pgQuote(seedSessionID))) + pgExec(t, env, fmt.Sprintf("INSERT INTO task (id, session_id, data) VALUES (%s, %s, '{}')", + pgQuote(seedTaskID), pgQuote(seedSessionID))) + pgExec(t, env, fmt.Sprintf("INSERT INTO push_notification (id, task_id, data) VALUES (%s, %s, '{}')", + pgQuote(seedPushID), pgQuote(seedTaskID))) + pgExec(t, env, fmt.Sprintf("INSERT INTO feedback (user_id, feedback_text) VALUES (%s, 'pre-upgrade feedback')", pgQuote(seedUserID))) + pgExec(t, env, fmt.Sprintf("INSERT INTO tool (id, server_name, group_kind, description) VALUES (%s, %s, %s, 'upgrade canary tool')", + pgQuote(seedToolID), pgQuote(seedToolServerName), pgQuote(seedGroupKind))) + pgExec(t, env, fmt.Sprintf("INSERT INTO toolserver (name, group_kind, description) VALUES (%s, %s, 'upgrade canary toolserver')", + pgQuote(seedToolServerName), pgQuote(seedGroupKind))) + + seedAgents := pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM agent WHERE id = %s", pgQuote(seedAgentID))) + require.GreaterOrEqual(t, seedAgents, 1, "expected seeded agent row") + t.Logf("seeded agent rows: %d", seedAgents) + + seedCanaryCounts = map[string]int{ + "agent": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM agent WHERE id = %s", pgQuote(seedAgentID))), + "session": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM session WHERE id = %s AND user_id = %s", pgQuote(seedSessionID), pgQuote(seedUserID))), + "event": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM event WHERE id = %s AND user_id = %s", pgQuote(seedEventID), pgQuote(seedUserID))), + "task": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM task WHERE id = %s", pgQuote(seedTaskID))), + "push_notification": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM push_notification WHERE id = %s", pgQuote(seedPushID))), + "feedback": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM feedback WHERE user_id = %s", pgQuote(seedUserID))), + "tool": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM tool WHERE id = %s AND server_name = %s AND group_kind = %s", pgQuote(seedToolID), pgQuote(seedToolServerName), pgQuote(seedGroupKind))), + "toolserver": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM toolserver WHERE name = %s AND group_kind = %s", pgQuote(seedToolServerName), pgQuote(seedGroupKind))), + } + for table, count := range seedCanaryCounts { + require.GreaterOrEqual(t, count, 1, "expected seeded %s canary row", table) + } + }) { + return + } + + if !t.Run("upgrade with helm", func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) + defer cancel() + + cmd := helmUpgradeCommand(ctx, env) + out, err := cmd.CombinedOutput() + require.NoError(t, err, "helm upgrade to current build failed:\n%s", string(out)) + }) { + return + } + + if !t.Run("verify controller rollout", func(t *testing.T) { + kubectl(t, env, 3*time.Minute, + "rollout", "status", "deployment/kagent-controller", + "-n", env.namespace, + "--timeout=3m", + ) + + // Wait for Postgres to be fully ready before rolling out a fresh controller pod. + // The controller can crash on startup if Postgres isn't accepting connections yet + // (e.g. due to a concurrent Postgres restart during the helm upgrade), which + // would leave a non-zero restart count on the upgraded pod. + waitForReadyPods(t, env, postgresSelector, 2*time.Minute) + + // Restart the controller now that Postgres is confirmed healthy, then verify + // the fresh pod starts without any crashes. + kubectl(t, env, time.Minute, + "rollout", "restart", "deployment/kagent-controller", + "-n", env.namespace, + ) + kubectl(t, env, 3*time.Minute, + "rollout", "status", "deployment/kagent-controller", + "-n", env.namespace, + "--timeout=3m", + ) + + pod := newestPodNameForSelector(t, env, controllerSelector) + restarts := podContainerRestartCount(t, env, pod, controllerContainer) + require.Zero(t, restarts, "kagent-controller pod %s restarted after upgrade", pod) + t.Logf("kagent-controller %s restarts=%d", pod, restarts) + }) { + return + } + + t.Run("verify seeded data survived migrations", func(t *testing.T) { + // These checks are migration plumbing checks: the version cannot regress, + // the migration table must be clean, and the upgraded controller must + // have reached the latest core migration embedded in this test build. + pgPostState := pgMigrationState(t, env) + require.False(t, pgPostState.dirty, "post-upgrade Postgres migrations are dirty") + require.GreaterOrEqual(t, pgPostState.version, pgBaselineState.version, + "Postgres migration version regressed") + require.Equal(t, targetCoreVersion, pgPostState.version, + "Postgres migrations did not reach the target embedded migration version") + t.Logf("Postgres schema_migrations version: %d -> %d dirty=%t", + pgBaselineState.version, pgPostState.version, pgPostState.dirty) + + // Keep the schema invariant intentionally broad and cheap: core + // tables should still exist before we ask more specific questions about + // the seeded rows below. + requirePostgresTablesExist(t, env, + "agent", + "session", + "event", + "task", + "push_notification", + "feedback", + "tool", + "toolserver", + ) + + postAgents := pgQueryInt(t, env, + fmt.Sprintf("SELECT count(*) FROM agent WHERE id = %s AND workload_type = 'deployment'", pgQuote(seedAgentID))) + require.GreaterOrEqual(t, postAgents, 1, + "seeded agent row missing or not backfilled to workload_type='deployment' after upgrade") + + postFeedback := pgQueryInt(t, env, + fmt.Sprintf("SELECT count(*) FROM feedback WHERE user_id = %s", pgQuote(seedUserID))) + require.GreaterOrEqual(t, postFeedback, 1, "seeded feedback row did not survive the upgrade migrations") + + postCanaryCounts := map[string]int{ + "agent": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM agent WHERE id = %s", pgQuote(seedAgentID))), + "session": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM session WHERE id = %s AND user_id = %s", pgQuote(seedSessionID), pgQuote(seedUserID))), + "event": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM event WHERE id = %s AND user_id = %s", pgQuote(seedEventID), pgQuote(seedUserID))), + "task": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM task WHERE id = %s", pgQuote(seedTaskID))), + "push_notification": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM push_notification WHERE id = %s", pgQuote(seedPushID))), + "feedback": postFeedback, + "tool": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM tool WHERE id = %s AND server_name = %s AND group_kind = %s", pgQuote(seedToolID), pgQuote(seedToolServerName), pgQuote(seedGroupKind))), + "toolserver": pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM toolserver WHERE name = %s AND group_kind = %s", pgQuote(seedToolServerName), pgQuote(seedGroupKind))), + } + for table, before := range seedCanaryCounts { + // The generic canaries only assert non-regression. Future migrations + // that intentionally transform data should still add targeted + // assertions for their expected post-upgrade shape. + require.GreaterOrEqual(t, postCanaryCounts[table], before, + "%s canary row count decreased across upgrade", table) + } + }) + + vectorEnabled := baselineVectorVersion > 0 + + if !t.Run("verify upgraded schema matches a clean install", func(t *testing.T) { + // Build an independent clean install of the current build's migrations + // and require the upgraded database to be structurally identical. This + // catches upgrade paths that leave residue a fresh install would not. + cleanHeadSchema = buildCleanInstallSchema(t, env, "clean_head_"+seed, vectorEnabled) + upgradedSchema := pgSchemaDump(t, env, "kagent") + + if cleanHeadSchema == cleanTargetSchema { + t.Log("no schema change between target and HEAD; the round-trip is a structural no-op until a new migration lands") + } + require.Equal(t, cleanHeadSchema, upgradedSchema, + "upgraded schema diverged from a clean install of the current build") + }) { + return + } + + t.Run("reverse schema to target", func(t *testing.T) { + // Scale the controller to zero so no booting pod re-applies migrations + // while we reverse the schema (the design's scale-to-zero recipe). + scaleController(t, env, 0) + + localPort, stop := startPortForward(t, env, postgresServiceName, 5432) + defer stop() + dbURL := fmt.Sprintf("postgres://kagent:kagent@127.0.0.1:%d/kagent?sslmode=disable", localPort) + + // Reverse each track to the target release's version, in reverse + // registration order (vector before core). This stands in for + // `kagent db migrate goto --release ` until that CLI exists; it + // exercises every down file between HEAD and the target. + targets := map[string]int{ + "core": pgBaselineState.version, + "vector": baselineVectorVersion, + } + for _, track := range slices.Backward(migrationTracks) { + migrateTrackTo(t, dbURL, track, targets[track.name]) + } + + // Migration bookkeeping is back at the target versions and clean. + reverted := pgMigrationState(t, env) + require.False(t, reverted.dirty, "reverted Postgres migrations are dirty") + require.Equal(t, pgBaselineState.version, reverted.version, "core track not reversed to target version") + require.Equal(t, baselineVectorVersion, pgTrackVersion(t, env, "vector_schema_migrations"), + "vector track not reversed to target version") + + // Schema matches a clean target install, and the seeded rows survived + // the down migrations. + require.Equal(t, cleanTargetSchema, pgSchemaDump(t, env, "kagent"), + "reversed schema diverged from a clean target install") + require.GreaterOrEqual(t, pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM agent WHERE id = %s", pgQuote(seedAgentID))), 1, + "seeded agent row did not survive the rollback") + require.GreaterOrEqual(t, pgQueryInt(t, env, fmt.Sprintf("SELECT count(*) FROM feedback WHERE user_id = %s", pgQuote(seedUserID))), 1, + "seeded feedback row did not survive the rollback") + }) +} + +// helmUpgradeCommand returns the command that upgrades the cluster from the +// previously-installed release to the current local build. It reuses the repo's +// helm-install-provider target, which packages the local charts and runs +// `helm upgrade --install` of kagent-crds and kagent against the locally-built +// images (registry=DOCKER_REGISTRY, tag=VERSION). +func helmUpgradeCommand(ctx context.Context, env upgradeEnv) *exec.Cmd { + cmd := exec.CommandContext(ctx, "make", "-C", env.repoRoot, "helm-install-provider") + cmd.Dir = env.repoRoot + cmd.Env = append(os.Environ(), + "VERSION="+env.version, + "DOCKER_REGISTRY="+env.dockerRegistry, + "KIND_CLUSTER_NAME="+env.kindClusterName, + "OPENAI_API_KEY="+env.openAIAPIKey, + "KAGENT_DEFAULT_MODEL_PROVIDER=openAI", + ) + return cmd +} + +func loadUpgradeEnv(t *testing.T) upgradeEnv { + t.Helper() + + _, filename, _, ok := runtime.Caller(0) + require.True(t, ok, "determine test file path") + + // go/core/test/e2e/upgrade/upgrade_test.go -> repo root is five levels up. + repoRoot, err := filepath.Abs(filepath.Join(filepath.Dir(filename), "..", "..", "..", "..", "..")) + require.NoError(t, err) + + clusterName := envOrDefault("KIND_CLUSTER_NAME", "kagent") + return upgradeEnv{ + repoRoot: repoRoot, + upgradeFromVersion: requireEnv(t, "UPGRADE_FROM_VERSION"), + version: requireEnv(t, "VERSION"), + dockerRegistry: envOrDefault("DOCKER_REGISTRY", "localhost:5001"), + kindClusterName: clusterName, + namespace: envOrDefault("NAMESPACE", "kagent"), + kubeContext: envOrDefault("KUBE_CONTEXT", "kind-"+clusterName), + openAIAPIKey: envOrDefault("OPENAI_API_KEY", "fake"), + } +} + +func requireEnv(t *testing.T, key string) string { + t.Helper() + + val := os.Getenv(key) + require.NotEmpty(t, val, "%s must be set", key) + return val +} + +func envOrDefault(key, fallback string) string { + if val := os.Getenv(key); val != "" { + return val + } + return fallback +} + +func waitForReadyPods(t *testing.T, env upgradeEnv, selector string, timeout time.Duration) { + t.Helper() + + kubectl(t, env, timeout, + "wait", "--for=condition=Ready", "pod", + "-l", selector, + "-n", env.namespace, + fmt.Sprintf("--timeout=%s", timeout), + ) +} + +func waitForPostgresAgentTable(t *testing.T, env upgradeEnv, timeout time.Duration) { + t.Helper() + + require.Eventually(t, func() bool { + return pgQuery(t, env, "SELECT to_regclass('public.agent') IS NOT NULL") == "t" + }, timeout, 5*time.Second, "agent table did not appear in the baseline Postgres schema") +} + +func pgExec(t *testing.T, env upgradeEnv, query string) { + t.Helper() + _ = pgQuery(t, env, query) +} + +func pgQueryInt(t *testing.T, env upgradeEnv, query string) int { + t.Helper() + return parseInt(t, pgQuery(t, env, query), query) +} + +func pgQuery(t *testing.T, env upgradeEnv, query string) string { + t.Helper() + + pod := podNameForSelector(t, env, postgresSelector) + out := kubectl(t, env, time.Minute, + "exec", "-n", env.namespace, pod, "-c", postgresContainer, "--", + "psql", "-v", "ON_ERROR_STOP=1", "-U", "kagent", "-d", "kagent", "-tAc", query, + ) + return strings.TrimSpace(out) +} + +func pgMigrationState(t *testing.T, env upgradeEnv) postgresMigrationState { + t.Helper() + + raw := pgQuery(t, env, "SELECT CASE WHEN to_regclass('public.schema_migrations') IS NULL THEN '0,false' ELSE (SELECT concat(COALESCE(MAX(version), 0), ',', COALESCE(bool_or(dirty), false)) FROM public.schema_migrations) END") + parts := strings.Split(raw, ",") + require.Len(t, parts, 2, "parse schema_migrations state %q", raw) + return postgresMigrationState{ + version: parseInt(t, parts[0], "schema_migrations version"), + dirty: parseBool(t, parts[1], "schema_migrations dirty"), + } +} + +func requirePostgresTablesExist(t *testing.T, env upgradeEnv, tables ...string) { + t.Helper() + + for _, table := range tables { + exists := pgQuery(t, env, fmt.Sprintf("SELECT to_regclass(%s) IS NOT NULL", pgQuote("public."+table))) + require.Equal(t, "t", exists, "expected public.%s to exist after upgrade", table) + } +} + +func podNameForSelector(t *testing.T, env upgradeEnv, selector string) string { + t.Helper() + + out := kubectl(t, env, time.Minute, + "get", "pods", + "-n", env.namespace, + "-l", selector, + "-o", "jsonpath={.items[0].metadata.name}", + ) + pod := strings.TrimSpace(out) + require.NotEmpty(t, pod, "no pod matched selector %q in namespace %s", selector, env.namespace) + return pod +} + +func newestPodNameForSelector(t *testing.T, env upgradeEnv, selector string) string { + t.Helper() + + out := kubectl(t, env, time.Minute, + "get", "pods", + "-n", env.namespace, + "-l", selector, + "--sort-by=.metadata.creationTimestamp", + "-o", "jsonpath={.items[-1].metadata.name}", + ) + pod := strings.TrimSpace(out) + require.NotEmpty(t, pod, "no pod matched selector %q in namespace %s", selector, env.namespace) + return pod +} + +func podContainerRestartCount(t *testing.T, env upgradeEnv, pod, container string) int { + t.Helper() + + out := kubectl(t, env, time.Minute, + "get", "pod", pod, + "-n", env.namespace, + "-o", fmt.Sprintf("jsonpath={.status.containerStatuses[?(@.name==%q)].restartCount}", container), + ) + if strings.TrimSpace(out) == "" { + return 0 + } + return parseInt(t, out, "container restart count") +} + +func kubectl(t *testing.T, env upgradeEnv, timeout time.Duration, args ...string) string { + t.Helper() + + out, err := kubectlOutput(t, env, timeout, args...) + require.NoError(t, err, "kubectl %s failed:\n%s", strings.Join(append([]string{"--context", env.kubeContext}, args...), " "), out) + return out +} + +func kubectlOutput(t *testing.T, env upgradeEnv, timeout time.Duration, args ...string) (string, error) { + t.Helper() + + ctx, cancel := context.WithTimeout(t.Context(), timeout) + defer cancel() + + fullArgs := append([]string{"--context", env.kubeContext}, args...) + cmd := exec.CommandContext(ctx, "kubectl", fullArgs...) + cmd.Dir = env.repoRoot + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + return stdout.String(), fmt.Errorf("kubectl %s: %w\nstderr: %s", strings.Join(fullArgs, " "), err, stderr.String()) + } + return stdout.String(), nil +} + +func parseInt(t *testing.T, raw, description string) int { + t.Helper() + + n, err := strconv.Atoi(strings.TrimSpace(raw)) + require.NoError(t, err, "parse integer from %s output %q", description, raw) + return n +} + +func parseBool(t *testing.T, raw, description string) bool { + t.Helper() + + switch strings.TrimSpace(raw) { + case "t", "true": + return true + case "f", "false": + return false + default: + require.Failf(t, "parse bool", "parse boolean from %s output %q", description, raw) + return false + } +} + +func latestCoreMigrationVersion(t *testing.T) int { + t.Helper() + + entries, err := fs.ReadDir(migrations.FS, "core") + require.NoError(t, err, "read embedded core migrations") + + maxVersion := 0 + for _, entry := range entries { + name := entry.Name() + if entry.IsDir() || !strings.HasSuffix(name, ".up.sql") { + continue + } + versionPart, _, ok := strings.Cut(name, "_") + require.True(t, ok, "migration file %q should start with a version prefix", name) + version := parseInt(t, versionPart, name) + if version > maxVersion { + maxVersion = version + } + } + require.NotZero(t, maxVersion, "expected at least one embedded core migration") + return maxVersion +} + +func pgQuote(s string) string { + return "'" + strings.ReplaceAll(s, "'", "''") + "'" +} diff --git a/scripts/prev-stable-version.sh b/scripts/prev-stable-version.sh new file mode 100755 index 0000000000..0ca15fca95 --- /dev/null +++ b/scripts/prev-stable-version.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash +# prev-stable-version.sh — prints the latest released patch of the stable line +# immediately BELOW the line currently being built: the highest +# vMAJOR.MINOR.PATCH tag on the newest release/vMAJOR.MINOR.x branch whose +# MAJOR.MINOR is strictly less than the current line. This is the rollback-window +# floor the upgrade/contraction tests target. +# +# The current line comes from the base/target branch: +# - release/vX.Y.x -> current line is X.Y, so this resolves to the newest +# release line below X.Y (release/v0.9.x -> 0.8.x). +# - main (or any non-release branch) -> the unreleased next minor, which sorts +# above every release line, so this resolves to the newest +# release line overall (-> 0.9.x). +# Source order for the current ref: CURRENT_REF override, GITHUB_BASE_REF (PR +# target), GITHUB_REF_NAME (push), then the checked-out branch. +# +# Prints nothing and exits 0 when no stable line exists below the current one +# (e.g. building the oldest release line), so the caller can skip the leg. +# Uses `git ls-remote`, so it needs network to the remote but not the branch +# checked out locally. Output has no leading 'v'. Override the remote with REMOTE. +set -euo pipefail + +remote="${REMOTE:-origin}" + +# Current line MAJOR.MINOR, or empty for main / any non-release line. +current_ref="${CURRENT_REF:-${GITHUB_BASE_REF:-${GITHUB_REF_NAME:-$(git rev-parse --abbrev-ref HEAD 2>/dev/null || true)}}}" +current_minor="" +if [[ "${current_ref}" =~ ^release/v([0-9]+\.[0-9]+)\.x$ ]]; then + current_minor="${BASH_REMATCH[1]}" +fi + +# All release lines on the remote, ascending by version (MAJOR.MINOR only). +lines="$(git ls-remote --heads "$remote" 'refs/heads/release/v*' 2>/dev/null \ + | sed -nE 's#.*refs/heads/release/v([0-9]+\.[0-9]+)\.x$#\1#p' \ + | sort -V)" +if [ -z "${lines}" ]; then + echo "ERROR: no release/vMAJOR.MINOR.x branch found on ${remote}" >&2 + exit 1 +fi + +# ver_lt A B -> success when A < B by version sort. +ver_lt() { [ "$1" != "$2" ] && [ "$(printf '%s\n%s\n' "$1" "$2" | sort -V | head -1)" = "$1" ]; } + +# Highest line strictly below the current line. With no current_minor (main / +# next), every line qualifies, so this lands on the newest line overall. +prev_minor="" +for l in ${lines}; do + if [ -z "${current_minor}" ] || ver_lt "$l" "${current_minor}"; then + prev_minor="$l" + fi +done +if [ -z "${prev_minor}" ]; then + # No stable line below the current one; let the caller skip the prev-stable leg. + exit 0 +fi + +esc="${prev_minor//./\\.}" +latest="$(git ls-remote --tags "$remote" 2>/dev/null \ + | grep -oE "refs/tags/v${esc}\.[0-9]+$" \ + | sed 's#refs/tags/v##' | sort -V | tail -1)" +if [ -z "${latest}" ]; then + echo "ERROR: no v${prev_minor}.PATCH release tags found on ${remote} (fetch tags?)" >&2 + exit 1 +fi + +echo "${latest}" diff --git a/scripts/upgrade-from-version.sh b/scripts/upgrade-from-version.sh new file mode 100755 index 0000000000..cdc72c9ffc --- /dev/null +++ b/scripts/upgrade-from-version.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +# upgrade-from-version.sh — prints the version the upgrade test upgrades FROM: +# the latest released tag reachable from HEAD (the adjacent previous release in +# the current line). Pre-release tags (e.g. v0.8.0-beta1) are excluded. Output +# has no leading 'v'. +set -euo pipefail + +tag="$(git describe --tags --abbrev=0 --match='v[0-9]*' --exclude='*-*' 2>/dev/null || true)" +if [ -z "${tag}" ]; then + echo "ERROR: no release tag reachable from HEAD (fetch tags?)" >&2 + exit 1 +fi +echo "${tag#v}"