diff --git a/.gitignore b/.gitignore index 08cc6b4312c56..bc02ffc5b3150 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,5 @@ flake.lock /known-docker-images.txt /test/sqllogictest/sqlite my-local-mz/ +/test/orchestratord/cluster.yaml +uv.lock diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 0d299f27db68d..3167323cec30c 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -2367,6 +2367,7 @@ steps: steps: - id: orchestratord-defaults label: "Orchestratord test (defaults from documentation)" + artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 plugins: @@ -2379,6 +2380,7 @@ steps: - id: orchestratord-default-properties label: "Orchestratord test (defaults for properties)" + artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 plugins: @@ -2391,6 +2393,7 @@ steps: - id: orchestratord-individual label: "Orchestratord test (individual properties)" + artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 plugins: @@ -2403,76 +2406,65 @@ steps: - id: orchestratord-combine label: "Orchestratord test (combine properties)" + artifact_paths: ["mz_debug_*.zip"] depends_on: build-aarch64 timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=noop, --properties=combine, --runtime=3600, --recreate-cluster] + args: [--action=noop, --properties=combine, --runtime=1800, --recreate-cluster] ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-upgrade-individual label: "Orchestratord test (upgrade, individual props)" + artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=upgrade, --properties=individual, --runtime=3600, --recreate-cluster] + args: [--action=upgrade, --properties=individual, --runtime=1800, --recreate-cluster] ci-builder: stable - env: - # Old versions are not on GHCR yet - MZ_GHCR: 0 agents: - queue: hetzner-aarch64-8cpu-16gb - skip: "https://github.com/MaterializeInc/materialize/pull/34214" + queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-upgrade-combine label: "Orchestratord test (upgrade, combine props)" + artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=upgrade, --properties=combine, --runtime=3600, --recreate-cluster] + args: [--action=upgrade, --properties=combine, --runtime=1800, --recreate-cluster] ci-builder: stable - env: - # Old versions are not on GHCR yet - MZ_GHCR: 0 agents: - queue: hetzner-aarch64-8cpu-16gb - skip: "https://github.com/MaterializeInc/materialize/pull/34214" + queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-upgrade-chain-individual label: "Orchestratord test (upgrade chain, individual props)" + artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=upgrade-chain, --properties=individual, --runtime=3600, --recreate-cluster] + args: [--action=upgrade-chain, --properties=individual, --runtime=1800, --recreate-cluster] ci-builder: stable - env: - # Old versions are not on GHCR yet - MZ_GHCR: 0 agents: - queue: hetzner-aarch64-8cpu-16gb - skip: "https://github.com/MaterializeInc/materialize/pull/34214" + queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-upgrade-chain-combine label: "Orchestratord test (upgrade chain, combine props)" + artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - args: [--action=upgrade-chain, --properties=combine, --runtime=3600, --recreate-cluster] + args: [--action=upgrade-chain, --properties=combine, --runtime=1800, --recreate-cluster] ci-builder: stable - env: - # Old versions are not on GHCR yet - MZ_GHCR: 0 agents: queue: hetzner-aarch64-16cpu-32gb - skip: "https://github.com/MaterializeInc/materialize/pull/34214" diff --git a/src/cloud-resources/src/crd/materialize.rs b/src/cloud-resources/src/crd/materialize.rs index 8f617a5736553..39fb037a9ac21 100644 --- a/src/cloud-resources/src/crd/materialize.rs +++ b/src/cloud-resources/src/crd/materialize.rs @@ -66,6 +66,23 @@ pub mod v1alpha1 { #[default] WaitUntilReady, + /// Create a new generation of pods, leaving the old generation as the serving generation + /// until the user manually promotes the new generation. + /// + /// Users can promote the new generation at any time, even if the new generation pods are + /// not fully caught up, by setting `forcePromote` to the same value as `requestRollout` in + /// the Materialize spec. + /// + /// {{}} + /// Do not leave new generations unpromoted indefinitely. + /// + /// The new generation keeps open read holds which prevent compaction. Once promoted or + /// cancelled, those read holds are released. If left unpromoted for an extended time, this + /// data can build up, and can cause extreme deletion load on the metadata backend database + /// when finally promoted or cancelled. + /// {{}} + ManuallyPromote, + /// {{}} /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!! /// @@ -429,6 +446,20 @@ pub mod v1alpha1 { false } + pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool { + let Some(status) = self.status.as_ref() else { + return false; + }; + if status.conditions.is_empty() { + return false; + } + status + .conditions + .iter() + .any(|condition| condition.reason == "ReadyToPromote") + && &status.resources_hash == resources_hash + } + pub fn is_promoting(&self) -> bool { let Some(status) = self.status.as_ref() else { return false; diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index 3cab752159c51..a69da3ff67d13 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -604,34 +604,38 @@ impl k8s_controller::Context for Context { // replace_status, but this is fine because we already // extracted all of the information we want from the spec // earlier. - let mz = self - .update_status( - &mz_api, - mz, - MaterializeStatus { - active_generation, - // don't update the reconciliation id yet, - // because the rollout hasn't yet completed. if - // we fail later on, we want to ensure that the - // rollout gets retried. - last_completed_rollout_request: status.last_completed_rollout_request, - resource_id: status.resource_id, - resources_hash: String::new(), - conditions: vec![Condition { - type_: "UpToDate".into(), - status: "Unknown".into(), - last_transition_time: Time(chrono::offset::Utc::now()), - message: format!( - "Applying changes for generation {desired_generation}" - ), - observed_generation: mz.meta().generation, - reason: "Applying".into(), - }], - }, - active_generation != desired_generation, - ) - .await?; - let mz = &mz; + let mz = if mz.is_ready_to_promote(&resources_hash) { + mz + } else { + &self + .update_status( + &mz_api, + mz, + MaterializeStatus { + active_generation, + // don't update the reconciliation id yet, + // because the rollout hasn't yet completed. if + // we fail later on, we want to ensure that the + // rollout gets retried. + last_completed_rollout_request: status + .last_completed_rollout_request, + resource_id: status.resource_id, + resources_hash: String::new(), + conditions: vec![Condition { + type_: "UpToDate".into(), + status: "Unknown".into(), + last_transition_time: Time(chrono::offset::Utc::now()), + message: format!( + "Applying changes for generation {desired_generation}" + ), + observed_generation: mz.meta().generation, + reason: "Applying".into(), + }], + }, + active_generation != desired_generation, + ) + .await? + }; let status = mz.status(); if mz.spec.rollout_strategy @@ -655,6 +659,37 @@ impl k8s_controller::Context for Context { Ok(Some(action)) } Ok(None) => { + if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote + && !mz.should_force_promote() + { + trace!( + "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy." + ); + self.update_status( + &mz_api, + mz, + MaterializeStatus { + active_generation, + last_completed_rollout_request: status + .last_completed_rollout_request, + resource_id: status.resource_id, + resources_hash, + conditions: vec![Condition { + type_: "UpToDate".into(), + status: "Unknown".into(), + last_transition_time: Time(chrono::offset::Utc::now()), + message: format!( + "Ready to promote generation {desired_generation}" + ), + observed_generation: mz.meta().generation, + reason: "ReadyToPromote".into(), + }], + }, + active_generation != desired_generation, + ) + .await?; + return Ok(None); + } // do this last, so that we keep traffic pointing at // the previous environmentd until the new one is // fully ready diff --git a/test/orchestratord/cluster.yaml b/test/orchestratord/cluster.yaml.tmpl similarity index 86% rename from test/orchestratord/cluster.yaml rename to test/orchestratord/cluster.yaml.tmpl index bd7f1aa35cedc..1bca4c489bdaf 100644 --- a/test/orchestratord/cluster.yaml +++ b/test/orchestratord/cluster.yaml.tmpl @@ -11,6 +11,14 @@ kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 +# Allow access to the registry from both inside and outside kubernetes +containerdConfigPatches: + - |- + [plugins."io.containerd.grpc.v1.cri".registry.mirrors."docker.io"] + endpoint = ["http://proxy-docker-hub:5000"] + - |- + [plugins."io.containerd.grpc.v1.cri".registry.mirrors."ghcr.io"] + endpoint = ["http://proxy-ghcr:5000"] # Constrain the node port range to something relatively small, then forward all # those ports from the host. This makes services running in Kubernetes # accessible at localhost:$NODEPORT without requiring manual port forwarding. @@ -23,6 +31,9 @@ kubeadmConfigPatches: nodes: - role: control-plane image: kindest/node:v1.32.5 + extraMounts: + - containerPath: /var/lib/kubelet/config.json + hostPath: "$DOCKER_CONFIG/config.json" extraPortMappings: - containerPort: 32000 hostPort: 32000 @@ -160,10 +171,17 @@ nodes: materialize.cloud/availability-zone: "1" topology.kubernetes.io/zone: "1" workload: "materialize-instance" + extraMounts: + - containerPath: /var/lib/kubelet/config.json + hostPath: "$DOCKER_CONFIG/config.json" - role: worker image: kindest/node:v1.32.5 labels: materialize.cloud/scratch-fs: "true" + materialize.cloud/disk: "true" materialize.cloud/availability-zone: "2" topology.kubernetes.io/zone: "2" workload: "materialize-instance" + extraMounts: + - containerPath: /var/lib/kubelet/config.json + hostPath: "$DOCKER_CONFIG/config.json" diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index d49cbd2c1f26b..5d8c2da26f029 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -30,8 +30,7 @@ import yaml from semver.version import Version -from materialize import MZ_ROOT, ci_util, git, spawn, ui -from materialize.docker import MZ_GHCR_DEFAULT +from materialize import MZ_ROOT, ci_util, git, spawn from materialize.mz_version import MzVersion from materialize.mzcompose.composition import ( Composition, @@ -41,6 +40,7 @@ from materialize.mzcompose.services.balancerd import Balancerd from materialize.mzcompose.services.clusterd import Clusterd from materialize.mzcompose.services.environmentd import Environmentd +from materialize.mzcompose.services.mz_debug import MzDebug from materialize.mzcompose.services.orchestratord import Orchestratord from materialize.mzcompose.services.testdrive import Testdrive from materialize.util import all_subclasses @@ -55,9 +55,26 @@ Environmentd(), Clusterd(), Balancerd(), + MzDebug(), ] +def run_mz_debug() -> None: + # TODO: Hangs a lot in CI + # Only using capture because it's too noisy + # spawn.capture( + # [ + # "./mz-debug", + # "self-managed", + # "--k8s-namespace", + # "materialize-environment", + # "--mz-instance-name", + # "12345678-1234-1234-1234-123456789012", + # ] + # ) + pass + + def get_tag(tag: str | None = None) -> str: # We can't use the mzbuild tag because it has a different fingerprint for # environmentd/clusterd/balancerd and the orchestratord depends on them @@ -242,7 +259,9 @@ def all_modifications() -> list[type[Modification]]: class LicenseKey(Modification): @classmethod def values(cls, version: MzVersion) -> list[Any]: - return ["valid", "invalid", "del"] + # TODO: Reenable "del" when database-issues#9928 is fixed + # return ["valid", "invalid", "del"] + return ["valid", "invalid"] @classmethod def failed_reconciliation_values(cls) -> list[Any]: @@ -498,14 +517,9 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: def check() -> None: environmentd = get_environmentd_data() image = environmentd["items"][0]["spec"]["containers"][0]["image"] - image_registry = ( - "ghcr.io/materializeinc/materialize" - if ui.env_is_truthy("MZ_GHCR", MZ_GHCR_DEFAULT) - else "materialize" - ) - expected = f"{image_registry}/environmentd:{self.value}" + expected = f"materialize/environmentd:{self.value}" assert ( - image == expected + image == expected or f"ghcr.io/materializeinc/{image}" == expected ), f"Expected environmentd image {expected}, but found {image}" retry(check, 240) @@ -1070,11 +1084,11 @@ def check_pods() -> None: class AuthenticatorKind(Modification): @classmethod def values(cls, version: MzVersion) -> list[Any]: - # Test None, Password (v0.147.7+), and Sasl (v0.147.16+) + # Test None, Password (v0.147.7+), and Sasl result = ["None"] if version >= MzVersion.parse_mz("v0.147.7"): result.append("Password") - if version >= MzVersion.parse_mz("v0.147.16"): + if version >= MzVersion.parse_mz("v26.0.0"): result.append("Sasl") return result @@ -1100,13 +1114,13 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: if self.value == "Password" and version <= MzVersion.parse_mz("v0.147.6"): return - if self.value == "Sasl" and version < MzVersion.parse_mz("v0.147.16"): + if self.value == "Sasl" and version < MzVersion.parse_mz("v26.0.0"): return port = ( 6875 if (version >= MzVersion.parse_mz("v0.147.0") and self.value == "Password") - or (version >= MzVersion.parse_mz("v0.147.16") and self.value == "Sasl") + or (version >= MzVersion.parse_mz("v26.0.0") and self.value == "Sasl") else 6877 ) for i in range(120): @@ -1232,6 +1246,27 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: os.killpg(os.getpgid(process.pid), signal.SIGTERM) +class RolloutStrategy(Modification): + @classmethod + def values(cls, version: MzVersion) -> list[Any]: + return [ + "WaitUntilReady", + "ManuallyPromote", + "ImmediatelyPromoteCausingDowntime", + ] + + @classmethod + def default(cls) -> Any: + return "WaitUntilReady" + + def modify(self, definition: dict[str, Any]) -> None: + definition["materialize"]["spec"]["rolloutStrategy"] = self.value + + def validate(self, mods: dict[type[Modification], Any]) -> None: + # This is validated in post_run_check + return + + class Properties(Enum): Defaults = "defaults" Individual = "individual" @@ -1252,10 +1287,23 @@ def workflow_defaults(c: Composition, parser: WorkflowArgumentParser) -> None: ) args = parser.parse_args() - current_version = get_tag(args.tag) + c.up(Service("mz-debug", idle=True)) + c.invoke("cp", "mz-debug:/usr/local/bin/mz-debug", ".") + + current_version = get_version(args.tag) # Following https://materialize.com/docs/installation/install-on-local-kind/ - for version in reversed(get_self_managed_versions() + [get_version(args.tag)]): + # orchestratord test can't run against future versions, so ignore those + versions = reversed( + [ + version + for version in get_self_managed_versions() + if version < current_version + ] + + [current_version] + ) + for version in versions: + print(f"--- Running with defaults against {version}") dir = "my-local-mz" if os.path.exists(dir): shutil.rmtree(dir) @@ -1392,9 +1440,6 @@ def workflow_defaults(c: Composition, parser: WorkflowArgumentParser) -> None: materialize_setup = list(yaml.load_all(f, Loader=yaml.Loader)) assert len(materialize_setup) == 3 - print(version) - print(current_version) - print(version == current_version) if version == current_version: materialize_setup[2]["spec"][ "environmentdImageRef" @@ -1493,6 +1538,78 @@ def workflow_defaults(c: Composition, parser: WorkflowArgumentParser) -> None: ] ) raise ValueError("Never completed") + run_mz_debug() + + +class ModSource: + def __init__(self, mod_classes: list[type[Modification]]): + self.mod_classes = mod_classes + + def next_mods(self, version: MzVersion) -> list[Modification]: + raise NotImplementedError + + +class DefaultModSource(ModSource): + def __init__(self, mod_classes: list[type[Modification]]): + super().__init__(mod_classes) + self.state = 0 + + def next_mods(self, version: MzVersion) -> list[Modification]: + if self.state == 0: + self.state += 1 + return [cls(cls.default()) for cls in self.mod_classes] + elif self.state == 1: + self.state += 1 + return [NumMaterializeEnvironments(2)] + else: + raise StopIteration + + +class IndividualModSource(ModSource): + def __init__(self, mod_classes: list[type[Modification]]): + super().__init__(mod_classes) + self._iters_by_version: dict[object, Iterator[list[Modification]]] = {} + + def _iter_values_for_version( + self, version: MzVersion + ) -> Iterator[list[Modification]]: + for cls in self.mod_classes: + for value in cls.values(version): + yield [cls(value)] + + def next_mods(self, version: MzVersion) -> list[Modification]: + it = self._iters_by_version.setdefault( + version, self._iter_values_for_version(version) + ) + try: + return next(it) + except StopIteration: + del self._iters_by_version[version] + raise + + +class CombineModSource(ModSource): + def __init__(self, mod_classes: list[type[Modification]], rng: random.Random): + super().__init__(mod_classes) + self.rng = rng + + def next_mods(self, version: MzVersion) -> list[Modification]: + return [ + cls(self.rng.choice(cls.good_values(version))) for cls in self.mod_classes + ] + + +def make_mod_source( + properties: Properties, mod_classes: list[type[Modification]], rng: random.Random +): + if properties == Properties.Defaults: + return DefaultModSource(mod_classes) + elif properties == Properties.Individual: + return IndividualModSource(mod_classes) + elif properties == Properties.Combine: + return CombineModSource(mod_classes, rng) + else: + raise ValueError(f"Unhandled properties: {properties}") def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: @@ -1534,7 +1651,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "0.29.0" ), f"kind >= v0.29.0 required, while you are on {kind_version}" - c.up(Service("testdrive", idle=True)) + c.up(Service("testdrive", idle=True), Service("mz-debug", idle=True)) + c.invoke("cp", "mz-debug:/usr/local/bin/mz-debug", ".") cluster = "kind" clusters = spawn.capture(["kind", "get", "clusters"]).strip().split("\n") @@ -1577,7 +1695,6 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: definition["secret"] = materialize_setup[1] definition["materialize"] = materialize_setup[2] - current_version = get_version(args.tag) if args.orchestratord_override: definition["operator"]["operator"]["image"]["tag"] = get_tag(args.tag) # TODO: database-issues#9696, makes environmentd -> clusterd connections fail @@ -1629,91 +1746,59 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: action = Action(args.action) properties = Properties(args.properties) - - def get_mods() -> Iterator[list[Modification]]: - if properties == Properties.Defaults: - yield [mod_class(mod_class.default()) for mod_class in mod_classes] - yield [NumMaterializeEnvironments(2)] - elif properties == Properties.Individual: - for mod_class in mod_classes: - for value in mod_class.values(current_version): - if value in mod_class.values(current_version): - yield [mod_class(value)] - elif properties == Properties.Combine: - assert args.runtime - while time.time() < end_time: - yield [ - mod_class(rng.choice(mod_class.good_values(current_version))) - for mod_class in mod_classes - ] - else: - raise ValueError(f"Unhandled properties value {properties}") - - mods_it = get_mods() + mod_source = make_mod_source(properties, mod_classes, rng) try: if action == Action.Noop: - for mods in mods_it: + while True: + mods = mod_source.next_mods(get_version(args.tag)) if args.tag: mods.append(EnvironmentdImageRef(str(args.tag))) run_scenario([mods], definition) + elif action == Action.Upgrade: - assert not ui.env_is_truthy( - "MZ_GHCR", MZ_GHCR_DEFAULT - ), "Manually set MZ_GHCR=0 as an environment variable for upgrade testing" - assert args.runtime end_time = ( - datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) + datetime.datetime.now() + + datetime.timedelta(seconds=args.runtime or 3600) ).timestamp() versions = get_all_self_managed_versions() + while time.time() < end_time: current_version = rng.choice(versions[:-1]) - selected_versions = [ - current_version, - get_upgrade_target(rng, current_version, versions), - ] - try: - mod = next(mods_it) - except StopIteration: - mods_it = get_mods() - mod = next(mods_it) + target_version = get_upgrade_target(rng, current_version, versions) + mods = mod_source.next_mods(current_version) scenario = [ - [EnvironmentdImageRef(str(version))] + mod - for version in selected_versions + [EnvironmentdImageRef(str(v))] + mods + for v in (current_version, target_version) ] run_scenario(scenario, definition) + elif action == Action.UpgradeChain: - assert not ui.env_is_truthy( - "MZ_GHCR", MZ_GHCR_DEFAULT - ), "Manually set MZ_GHCR=0 as an environment variable for upgrade testing" - assert args.runtime end_time = ( - datetime.datetime.now() + datetime.timedelta(seconds=args.runtime) + datetime.datetime.now() + + datetime.timedelta(seconds=args.runtime or 3600) ).timestamp() versions = get_all_self_managed_versions() + while time.time() < end_time: current_version = rng.choice(versions) - selected_versions = [current_version] + chain = [current_version] next_version = current_version + try: - for i in range(len(versions)): + for _ in range(len(versions)): next_version = get_upgrade_target(rng, next_version, versions) - selected_versions.append(next_version) + chain.append(next_version) except ValueError: # We can't upgrade any further, just run the test as far as it goes now pass - try: - mod = next(mods_it) - except StopIteration: - mods_it = get_mods() - mod = next(mods_it) + + mods = mod_source.next_mods(current_version) scenario = [ - [EnvironmentdImageRef(str(version))] + mod for version in versions + [EnvironmentdImageRef(str(version))] + mods for version in chain ] - assert len(scenario) == len( - versions - ), f"Expected scenario with {len(versions)} steps, but only found: {scenario}" run_scenario(scenario, definition) + else: raise ValueError(f"Unhandled action {action}") except StopIteration: @@ -1722,6 +1807,62 @@ def get_mods() -> Iterator[list[Modification]]: def setup(cluster: str): spawn.runv(["kind", "delete", "cluster", "--name", cluster]) + + try: + spawn.runv(["docker", "network", "create", "kind"]) + except: + pass + try: + spawn.runv( + [ + "docker", + "run", + "-d", + "--name", + "proxy-docker-hub", + "--restart=always", + "--net=kind", + "-v", + f"{MZ_ROOT}/misc/kind/cache/docker-hub:/var/lib/registry", + "-e", + "REGISTRY_PROXY_REMOTEURL=https://registry-1.docker.io", + "registry:2", + ] + ) + except: + pass + try: + spawn.runv( + [ + "docker", + "run", + "-d", + "--name", + "proxy-ghcr", + "--restart=always", + "--net=kind", + "-v", + f"{MZ_ROOT}/misc/kind/cache/ghcr:/var/lib/registry", + "-e", + "REGISTRY_PROXY_REMOTEURL=https://ghcr.io", + "registry:2", + ] + ) + except: + pass + + with ( + open(MZ_ROOT / "test" / "orchestratord" / "cluster.yaml.tmpl") as in_file, + open(MZ_ROOT / "test" / "orchestratord" / "cluster.yaml", "w") as out_file, + ): + text = in_file.read() + out_file.write( + text.replace( + "$DOCKER_CONFIG", + os.getenv("DOCKER_CONFIG", f'{os.environ["HOME"]}/.docker'), + ) + ) + spawn.runv( [ "kind", @@ -1806,18 +1947,14 @@ def run_scenario( mod.modify(definition) if mod.value in mod.failed_reconciliation_values(): expect_fail = True - if not initialize: - definition["materialize"]["spec"][ - "rolloutStrategy" - ] = "ImmediatelyPromoteCausingDowntime" - definition["materialize"]["spec"]["requestRollout"] = str(uuid.uuid4()) - run(definition, expect_fail) if initialize: init(definition) run(definition, expect_fail) initialize = False # only initialize once else: - upgrade(definition, expect_fail) + upgrade_operator_helm_chart(definition, expect_fail) + definition["materialize"]["spec"]["requestRollout"] = str(uuid.uuid4()) + run(definition, expect_fail) mod_dict = {mod.__class__: mod.value for mod in mods} for subclass in all_subclasses(Modification): if subclass not in mod_dict: @@ -1831,6 +1968,9 @@ def run_scenario( f"Reproduce with bin/mzcompose --find orchestratord run default --recreate-cluster --scenario='{scenario_json}'" ) raise + finally: + if not expect_fail: + run_mz_debug() def init(definition: dict[str, Any]) -> None: @@ -1866,7 +2006,7 @@ def init(definition: dict[str, Any]) -> None: stderr=subprocess.DEVNULL, ) - for i in range(120): + for i in range(240): try: spawn.capture( [ @@ -1890,7 +2030,7 @@ def init(definition: dict[str, Any]) -> None: raise ValueError("Never completed") -def upgrade(definition: dict[str, Any], expect_fail: bool) -> None: +def upgrade_operator_helm_chart(definition: dict[str, Any], expect_fail: bool) -> None: spawn.runv( [ "helm", @@ -1907,7 +2047,6 @@ def upgrade(definition: dict[str, Any], expect_fail: bool) -> None: stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) - post_run_check(definition, expect_fail) def run(definition: dict[str, Any], expect_fail: bool) -> None: @@ -1926,12 +2065,47 @@ def run(definition: dict[str, Any], expect_fail: bool) -> None: except subprocess.CalledProcessError as e: print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}") raise - post_run_check(definition, expect_fail) + if definition["materialize"]["spec"].get("rolloutStrategy") == "ManuallyPromote": + # First wait for it to become ready to promote, but not yet promoted + for _ in range(900): + time.sleep(1) + if is_ready_to_manually_promote(): + break + else: + spawn.runv( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "yaml", + ], + ) + raise RuntimeError("Never became ready for manual promotion") -def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None: - for i in range(60): - try: + # Wait to see that it doesn't promote + time.sleep(30) + if not is_ready_to_manually_promote(): + spawn.runv( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "yaml", + ], + ) + raise RuntimeError( + "Stopped being ready for manual promotion before promoting" + ) + + # Manually promote it + mz = json.loads( spawn.capture( [ "kubectl", @@ -1939,14 +2113,97 @@ def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None: "materializes", "-n", "materialize-environment", + "-o", + "json", ], stderr=subprocess.DEVNULL, ) - break + )["items"][0] + definition["materialize"]["spec"]["forcePromote"] = mz["spec"]["requestRollout"] + try: + spawn.runv( + ["kubectl", "apply", "-f", "-"], + stdin=yaml.dump(definition["materialize"]).encode(), + ) + except subprocess.CalledProcessError as e: + print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}") + raise + + post_run_check(definition, expect_fail) + + +def is_ready_to_manually_promote(): + data = json.loads( + spawn.capture( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "json", + ], + stderr=subprocess.DEVNULL, + ) + ) + conditions = data["items"][0].get("status", {}).get("conditions") + return ( + conditions is not None + and conditions[0]["type"] == "UpToDate" + and conditions[0]["status"] == "Unknown" + and conditions[0]["reason"] == "ReadyToPromote" + ) + + +def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None: + for i in range(900): + time.sleep(1) + try: + data = json.loads( + spawn.capture( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "json", + ], + stderr=subprocess.DEVNULL, + ) + ) + status = data["items"][0].get("status") + if not status: + continue + if expect_fail: + break + if ( + not status["conditions"] + or status["conditions"][0]["type"] != "UpToDate" + or status["conditions"][0]["status"] != "True" + ): + continue + if ( + status["lastCompletedRolloutRequest"] + == data["items"][0]["spec"]["requestRollout"] + ): + break except subprocess.CalledProcessError: pass - time.sleep(1) else: + spawn.runv( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "yaml", + ], + ) raise ValueError("Never completed") for i in range(480): @@ -2005,5 +2262,3 @@ def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None: ] ) raise ValueError("Never completed") - # Wait a bit for the status to stabilize - time.sleep(60)