diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 63fd0a7fa508f..28e04fd66f1bb 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -175,6 +175,7 @@ steps: args: - --other-tag - common-ancestor + - --azurite - group: Kafka key: kafka @@ -224,7 +225,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: testdrive - args: [--redpanda] + args: [--redpanda, --azurite] - id: redpanda-testdrive-aarch64 label: ":panda_face: :racing_car: testdrive aarch64" @@ -235,7 +236,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: testdrive - args: [--redpanda] + args: [--redpanda, --azurite] skip: "Disabled due to taking too long for the value provided" # TODO(def-) Remove this when old upsert implementation is removed @@ -248,7 +249,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: testdrive - args: [--system-param=storage_use_continual_feedback_upsert=false] + args: [--system-param=storage_use_continual_feedback_upsert=false, --azurite] - id: testdrive-partitions-5 label: ":racing_car: testdrive with --kafka-default-partitions 5" @@ -259,7 +260,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: testdrive - args: [--kafka-default-partitions=5] + args: [--kafka-default-partitions=5, --azurite] - id: testdrive-replicas-4 label: ":racing_car: testdrive 4 replicas" @@ -270,7 +271,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: testdrive - args: [--replicas=4] + args: [--replicas=4, --azurite] - id: testdrive-size-1 label: ":racing_car: testdrive with SIZE 1" @@ -281,7 +282,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: testdrive - args: [--default-size=1] + args: [--default-size=1, --azurite] - id: testdrive-size-8 label: ":racing_car: testdrive with SIZE 8" @@ -292,7 +293,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: testdrive - args: [--default-size=8] + args: [--default-size=8, --azurite] - id: testdrive-structured-persist-arrow label: ":racing_car: testdrive with Structured Persist (Only Structured)" @@ -307,6 +308,7 @@ steps: --system-param=persist_part_decode_format=arrow, --system-param=persist_batch_columnar_format=both_v2, --system-param=persist_encoding_enable_dictionary=true, + --azurite, ] - id: testdrive-in-cloudtest @@ -466,7 +468,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=KafkaSources, --actions=10000, --max-execution-time=30m] + args: [--scenario=KafkaSources, --actions=10000, --max-execution-time=30m, --azurite] # TODO(def-) Remove this when old upsert implementation is removed - id: zippy-kafka-sources-old-upsert @@ -478,7 +480,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=KafkaSources, --actions=10000, --max-execution-time=30m, --system-param=storage_use_continual_feedback_upsert=false] + args: [--scenario=KafkaSources, --actions=10000, --max-execution-time=30m, --system-param=storage_use_continual_feedback_upsert=false, --azurite] - id: zippy-kafka-parallel-insert label: "Zippy Kafka Parallel Insert" @@ -489,7 +491,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=KafkaParallelInsert, --transaction-isolation=serializable, --actions=10000, --max-execution-time=30m] + args: [--scenario=KafkaParallelInsert, --transaction-isolation=serializable, --actions=10000, --max-execution-time=30m, --azurite] - id: zippy-user-tables label: "Zippy User Tables" @@ -500,7 +502,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=UserTables, --actions=10000, --max-execution-time=30m] + args: [--scenario=UserTables, --actions=10000, --max-execution-time=30m, --azurite] - id: zippy-postgres-cdc label: "Zippy Postgres CDC" @@ -511,7 +513,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=PostgresCdc, --actions=10000, --max-execution-time=30m] + args: [--scenario=PostgresCdc, --actions=10000, --max-execution-time=30m, --azurite] - id: zippy-mysql-cdc label: "Zippy MySQL CDC" @@ -522,7 +524,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=MySqlCdc, --actions=10000, --max-execution-time=30m] + args: [--scenario=MySqlCdc, --actions=10000, --max-execution-time=30m, --azurite] - id: zippy-debezium-postgres label: "Zippy Debezium Postgres" @@ -533,7 +535,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=DebeziumPostgres, --actions=10000, --max-execution-time=30m] + args: [--scenario=DebeziumPostgres, --actions=10000, --max-execution-time=30m, --azurite] - id: zippy-cluster-replicas label: "Zippy Cluster Replicas" @@ -544,7 +546,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=ClusterReplicas, --actions=10000, --max-execution-time=30m] + args: [--scenario=ClusterReplicas, --actions=10000, --max-execution-time=30m, --azurite] - id: zippy-crdb-latest label: "Zippy w/ latest CRDB" @@ -556,7 +558,7 @@ steps: - ./ci/plugins/mzcompose: composition: zippy # TODO: Reenable --cockroach-tag=latest when https://github.com/cockroachdb/cockroach/issues/136678 is fixed - args: [--scenario=KafkaSources, --actions=10000, --max-execution-time=30m] + args: [--scenario=KafkaSources, --actions=10000, --max-execution-time=30m, --azurite] - id: zippy-alter-connection label: "Zippy w/ alter connection" @@ -567,7 +569,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=AlterConnectionWithKafkaSources, --actions=10000, --max-execution-time=30m] + args: [--scenario=AlterConnectionWithKafkaSources, --actions=10000, --max-execution-time=30m, --azurite] - group: CDC with old source syntax key: cdc-old-source-syntax @@ -642,6 +644,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: testdrive-old-kafka-src-syntax + args: [--azurite] agents: queue: hetzner-aarch64-8cpu-16gb @@ -700,7 +703,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=RestartEntireMz, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=RestartEntireMz, "--seed=$BUILDKITE_JOB_ID", --azurite] # TODO(def-) Remove this when old upsert implementation is removed - id: checks-restart-entire-mz-old-upsert @@ -715,7 +718,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=RestartEntireMz, --system-param=storage_use_continual_feedback_upsert=false, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=RestartEntireMz, --system-param=storage_use_continual_feedback_upsert=false, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-backup-rollback label: "Checks + backup + rollback to previous %N" @@ -727,7 +730,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=BackupAndRestoreToPreviousState, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=BackupAndRestoreToPreviousState, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-parallel-drop-create-default-replica label: "Checks parallel + DROP/CREATE replica %N" @@ -739,7 +742,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=DropCreateDefaultReplica, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=DropCreateDefaultReplica, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-parallel-restart-clusterd-compute label: "Checks parallel + restart compute clusterd %N" @@ -751,7 +754,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=RestartClusterdCompute, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=RestartClusterdCompute, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-parallel-restart-entire-mz label: "Checks parallel + restart of the entire Mz %N" @@ -763,7 +766,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=RestartEntireMz, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=RestartEntireMz, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-parallel-restart-environmentd-clusterd-storage label: "Checks parallel + restart of environmentd & storage clusterd %N" @@ -775,7 +778,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=RestartEnvironmentdClusterdStorage, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=RestartEnvironmentdClusterdStorage, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-parallel-kill-clusterd-storage label: "Checks parallel + kill storage clusterd %N" @@ -787,7 +790,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=KillClusterdStorage, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=KillClusterdStorage, --execution-mode=parallel, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-upgrade-entire-mz label: "Checks upgrade, whole-Mz restart %N" @@ -799,7 +802,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=UpgradeEntireMz, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=UpgradeEntireMz, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-preflight-check-rollback label: "Checks preflight-check and roll back upgrade %N" @@ -811,7 +814,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=PreflightCheckRollback, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=PreflightCheckRollback, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-upgrade-entire-mz-two-versions label: "Checks upgrade across two versions %N" @@ -823,7 +826,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=UpgradeEntireMzTwoVersions, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=UpgradeEntireMzTwoVersions, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-upgrade-entire-mz-four-versions label: "Checks upgrade across four versions %N" @@ -835,7 +838,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=UpgradeEntireMzFourVersions, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=UpgradeEntireMzFourVersions, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-0dt-restart-entire-mz-forced-migrations label: "Checks 0dt restart of the entire Mz with forced migrations %N" @@ -847,7 +850,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=ZeroDowntimeRestartEntireMzForcedMigrations, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=ZeroDowntimeRestartEntireMzForcedMigrations, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: checks-0dt-upgrade-entire-mz label: "Checks 0dt upgrade, whole-Mz restart %N" @@ -894,7 +897,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=ZeroDowntimeBumpedVersion, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=ZeroDowntimeBumpedVersion, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: cloudtest-upgrade label: "Platform checks upgrade in Cloudtest/K8s" @@ -1422,6 +1425,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: data-ingest + args: --azurite - group: "Parallel Workload" key: parallel-workload @@ -1436,7 +1440,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --complexity=dml, --threads=16] + args: [--runtime=1500, --complexity=dml, --threads=16, --azurite] - id: parallel-workload-ddl label: "Parallel Workload (DDL)" @@ -1448,7 +1452,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --threads=8] + args: [--runtime=1500, --threads=8, --azurite] - id: parallel-workload-ddl-only label: "Parallel Workload (DDL Only)" @@ -1460,7 +1464,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --complexity=ddl-only, --threads=2] + args: [--runtime=1500, --complexity=ddl-only, --threads=2, --azurite] - id: parallel-workload-many-threads label: "Parallel Workload (many threads)" @@ -1473,7 +1477,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --threads=100] + args: [--runtime=1500, --threads=100, --azurite] skip: "Too unstable at the moment" - id: parallel-workload-rename-naughty @@ -1486,7 +1490,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --scenario=rename, --naughty-identifiers, --threads=16] + args: [--runtime=1500, --scenario=rename, --naughty-identifiers, --threads=16, --azurite] - id: parallel-workload-rename label: "Parallel Workload (rename)" @@ -1498,7 +1502,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --scenario=rename, --threads=16] + args: [--runtime=1500, --scenario=rename, --threads=16, --azurite] - id: parallel-workload-cancel label: "Parallel Workload (cancel)" @@ -1511,7 +1515,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --scenario=cancel, --threads=16] + args: [--runtime=1500, --scenario=cancel, --threads=16, --azurite] - id: parallel-workload-kill label: "Parallel Workload (kill)" @@ -1523,7 +1527,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --scenario=kill, --threads=8] + args: [--runtime=1500, --scenario=kill, --threads=8, --azurite] - id: parallel-workload-backup-restore label: "Parallel Workload (backup & restore)" @@ -1536,7 +1540,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --scenario=backup-restore, --naughty-identifiers, --threads=16] + args: [--runtime=1500, --scenario=backup-restore, --naughty-identifiers, --threads=16, --azurite] - id: parallel-workload-0dt label: "Parallel Workload (0dt deploy)" @@ -1548,7 +1552,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: parallel-workload - args: [--runtime=1500, --scenario=0dt-deploy, --threads=16] + args: [--runtime=1500, --scenario=0dt-deploy, --threads=16, --azurite] - id: incident-70 label: "Test for incident 70" @@ -1636,6 +1640,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: txn-wal-fencing + args: [--azurite] agents: queue: hetzner-aarch64-8cpu-16gb diff --git a/ci/test/pipeline.template.yml b/ci/test/pipeline.template.yml index 1a182c01205bd..3cf94dd973813 100644 --- a/ci/test/pipeline.template.yml +++ b/ci/test/pipeline.template.yml @@ -328,6 +328,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: cluster + args: ["--azurite"] agents: queue: hetzner-aarch64-8cpu-16gb @@ -593,7 +594,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: zippy - args: [--scenario=KafkaSources, --actions=100] + args: [--scenario=KafkaSources, --actions=100, --azurite] - group: "Platform checks" key: platform-checks @@ -613,6 +614,7 @@ steps: [ --scenario=RestartEnvironmentdClusterdStorage, "--seed=$BUILDKITE_JOB_ID", + --azurite, ] - id: checks-no-restart-no-upgrade @@ -626,7 +628,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID"] + args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID", --azurite] - id: source-sink-errors label: "Source/Sink Error Reporting %N" @@ -651,6 +653,7 @@ steps: --scenario=KafkaUpsertUnique, --other-tag=common-ancestor, --ignore-other-tag-missing, + --azurite, ] coverage: skip agents: diff --git a/misc/python/materialize/checks/mzcompose_actions.py b/misc/python/materialize/checks/mzcompose_actions.py index 162a7fc5b77a8..cc5ac2cd67bf0 100644 --- a/misc/python/materialize/checks/mzcompose_actions.py +++ b/misc/python/materialize/checks/mzcompose_actions.py @@ -61,6 +61,7 @@ def __init__( self.restart = restart self.force_migrations = force_migrations self.publish = publish + self.scenario = scenario def execute(self, e: Executor) -> None: c = e.mzcompose_composition() @@ -73,6 +74,7 @@ def execute(self, e: Executor) -> None: image=image, external_metadata_store=True, external_blob_store=True, + blob_store_is_azure=self.scenario.azurite, environment_extra=self.environment_extra, system_parameter_defaults=self.system_parameter_defaults, additional_system_parameter_defaults=self.additional_system_parameter_defaults, diff --git a/misc/python/materialize/checks/scenarios.py b/misc/python/materialize/checks/scenarios.py index 5cadc80146cb5..a6fc5a940ca43 100644 --- a/misc/python/materialize/checks/scenarios.py +++ b/misc/python/materialize/checks/scenarios.py @@ -45,10 +45,15 @@ class Scenario: def __init__( - self, checks: list[type[Check]], executor: Executor, seed: str | None = None + self, + checks: list[type[Check]], + executor: Executor, + azurite: bool, + seed: str | None = None, ) -> None: self._checks = checks self.executor = executor + self.azurite = azurite self.rng = None if seed is None else Random(seed) self._base_version = MzVersion.parse_cargo() @@ -269,10 +274,11 @@ def __init__( self, checks: list[type[Check]], executor: Executor, + azurite: bool, seed: str | None, change_entries: list[SystemVarChangeEntry], ): - super().__init__(checks, executor, seed) + super().__init__(checks, executor, azurite, seed) self.change_entries = change_entries def actions(self) -> list[Action]: diff --git a/misc/python/materialize/checks/scenarios_upgrade.py b/misc/python/materialize/checks/scenarios_upgrade.py index 7411b7b7484f8..fe9a9bd26c470 100644 --- a/misc/python/materialize/checks/scenarios_upgrade.py +++ b/misc/python/materialize/checks/scenarios_upgrade.py @@ -162,10 +162,14 @@ class UpgradeEntireMzFourVersions(Scenario): """Test upgrade X-4 -> X-3 -> X-2 -> X-1 -> X""" def __init__( - self, checks: list[type[Check]], executor: Executor, seed: str | None = None + self, + checks: list[type[Check]], + executor: Executor, + azurite: bool, + seed: str | None = None, ): self.minor_versions = get_minor_versions() - super().__init__(checks, executor, seed) + super().__init__(checks, executor, azurite, seed) def base_version(self) -> MzVersion: return self.minor_versions[3] diff --git a/misc/python/materialize/checks/scenarios_zero_downtime.py b/misc/python/materialize/checks/scenarios_zero_downtime.py index 3f1723137cf45..29ad53565900b 100644 --- a/misc/python/materialize/checks/scenarios_zero_downtime.py +++ b/misc/python/materialize/checks/scenarios_zero_downtime.py @@ -257,10 +257,14 @@ class ZeroDowntimeUpgradeEntireMzFourVersions(Scenario): """Test 0dt upgrade from X-4 -> X-3 -> X-2 -> X-1 -> X""" def __init__( - self, checks: list[type[Check]], executor: Executor, seed: str | None = None + self, + checks: list[type[Check]], + executor: Executor, + azurite: bool, + seed: str | None = None, ): self.minor_versions = get_minor_versions() - super().__init__(checks, executor, seed) + super().__init__(checks, executor, azurite, seed) def base_version(self) -> MzVersion: return self.minor_versions[3] diff --git a/misc/python/materialize/data_ingest/transaction_def.py b/misc/python/materialize/data_ingest/transaction_def.py index 43fc63e2a4d72..74a6c8edc67ed 100644 --- a/misc/python/materialize/data_ingest/transaction_def.py +++ b/misc/python/materialize/data_ingest/transaction_def.py @@ -64,11 +64,16 @@ class RestartMz(TransactionDef): workload: "Workload" def __init__( - self, composition: Composition, probability: float, workload: "Workload" + self, + composition: Composition, + probability: float, + workload: "Workload", + azurite: bool, ): self.composition = composition self.probability = probability self.workload = workload + self.azurite = azurite def generate(self, fields: list[Field]) -> Iterator[Transaction | None]: if random.random() < self.probability: @@ -83,6 +88,7 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]: name=self.workload.mz_service, ports=ports, external_blob_store=True, + blob_store_is_azure=self.azurite, external_metadata_store=True, system_parameter_defaults=get_default_system_parameters( zero_downtime=True @@ -104,11 +110,16 @@ class ZeroDowntimeDeploy(TransactionDef): workload: "Workload" def __init__( - self, composition: Composition, probability: float, workload: "Workload" + self, + composition: Composition, + probability: float, + workload: "Workload", + azurite: bool, ): self.composition = composition self.probability = probability self.workload = workload + self.azurite = azurite def generate(self, fields: list[Field]) -> Iterator[Transaction | None]: if random.random() < self.probability: @@ -130,6 +141,7 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]: name=self.workload.mz_service, ports=ports, external_blob_store=True, + blob_store_is_azure=self.azurite, external_metadata_store=True, system_parameter_defaults=get_default_system_parameters( zero_downtime=True diff --git a/misc/python/materialize/data_ingest/workload.py b/misc/python/materialize/data_ingest/workload.py index e6087527728c9..b0a3cbfda9349 100644 --- a/misc/python/materialize/data_ingest/workload.py +++ b/misc/python/materialize/data_ingest/workload.py @@ -46,8 +46,12 @@ class Workload: deploy_generation: int def __init__( - self, mz_service: str = "materailized", deploy_generation: int = 0 + self, + azurite: bool, + mz_service: str = "materailized", + deploy_generation: int = 0, ) -> None: + self.azurite = azurite self.mz_service = mz_service self.deploy_generation = deploy_generation @@ -62,11 +66,12 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction]: class SingleSensorUpdating(Workload): def __init__( self, + azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0, ) -> None: - super().__init__(mz_service, deploy_generation) + super().__init__(azurite, mz_service, deploy_generation) self.cycle = [ TransactionDef( [ @@ -83,11 +88,12 @@ def __init__( class SingleSensorUpdatingDisruptions(Workload): def __init__( self, + azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0, ) -> None: - super().__init__(mz_service, deploy_generation) + super().__init__(azurite, mz_service, deploy_generation) self.cycle = [ TransactionDef( [ @@ -100,17 +106,22 @@ def __init__( ), ] if composition: - self.cycle.append(RestartMz(composition, probability=0.1, workload=self)) + self.cycle.append( + RestartMz( + composition, probability=0.1, workload=self, azurite=self.azurite + ) + ) class SingleSensorUpdating0dtDeploy(Workload): def __init__( self, + azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0, ) -> None: - super().__init__(mz_service, deploy_generation) + super().__init__(azurite, mz_service, deploy_generation) self.cycle = [ TransactionDef( [ @@ -124,18 +135,21 @@ def __init__( ] if composition: self.cycle.append( - ZeroDowntimeDeploy(composition, probability=0.1, workload=self) + ZeroDowntimeDeploy( + composition, probability=0.1, workload=self, azurite=self.azurite + ) ) class DeleteDataAtEndOfDay(Workload): def __init__( self, + azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0, ) -> None: - super().__init__(mz_service, deploy_generation) + super().__init__(azurite, mz_service, deploy_generation) insert = Insert( count=Records.SOME, record_size=RecordSize.SMALL, @@ -163,11 +177,12 @@ def __init__( class DeleteDataAtEndOfDayDisruptions(Workload): def __init__( self, + azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0, ) -> None: - super().__init__(mz_service, deploy_generation) + super().__init__(azurite, mz_service, deploy_generation) insert = Insert( count=Records.SOME, record_size=RecordSize.SMALL, @@ -192,17 +207,22 @@ def __init__( ] if composition: - self.cycle.append(RestartMz(composition, probability=0.1, workload=self)) + self.cycle.append( + RestartMz( + composition, probability=0.1, workload=self, azurite=self.azurite + ) + ) class DeleteDataAtEndOfDay0dtDeploys(Workload): def __init__( self, + azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0, ) -> None: - super().__init__(mz_service, deploy_generation) + super().__init__(azurite, mz_service, deploy_generation) insert = Insert( count=Records.SOME, record_size=RecordSize.SMALL, @@ -228,16 +248,18 @@ def __init__( if composition: self.cycle.append( - ZeroDowntimeDeploy(composition, probability=0.1, workload=self) + ZeroDowntimeDeploy( + composition, probability=0.1, workload=self, azurite=self.azurite + ) ) # TODO: Implement # class ProgressivelyEnrichRecords(Workload): # def __init__( -# self, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0 +# self, azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0 # ) -> None: -# super().__init__(mz_service, deploy_generation) +# super().__init__(azurite, mz_service, deploy_generation) # self.cycle: list[Definition] = [ # ] diff --git a/misc/python/materialize/mzcompose/services/azure.py b/misc/python/materialize/mzcompose/services/azure.py index 86a10cd52dfe7..4a05b0e381792 100644 --- a/misc/python/materialize/mzcompose/services/azure.py +++ b/misc/python/materialize/mzcompose/services/azure.py @@ -41,6 +41,7 @@ def __init__( "--blobPort", "10000", "--disableProductStyleUrl", + "--loose", ] if in_memory: diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index caea5458030c6..2e82c40ed5b5e 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1599,12 +1599,14 @@ def __init__( self, rng: random.Random, composition: Composition | None, + azurite: bool, sanity_restart: bool, system_param_fn: Callable[[dict[str, str]], dict[str, str]] = lambda x: x, ): super().__init__(rng, composition) self.system_param_fn = system_param_fn self.system_parameters = {} + self.azurite = azurite self.sanity_restart = sanity_restart def run(self, exe: Executor) -> bool: @@ -1615,6 +1617,7 @@ def run(self, exe: Executor) -> bool: Materialized( restart="on-failure", external_blob_store="toxiproxy", + blob_store_is_azure=self.azurite, external_metadata_store="toxiproxy", ports=["6975:6875", "6976:6876", "6977:6877"], sanity_restart=self.sanity_restart, @@ -1632,9 +1635,11 @@ def __init__( self, rng: random.Random, composition: Composition | None, + azurite: bool, sanity_restart: bool, ): super().__init__(rng, composition) + self.azurite = azurite self.sanity_restart = sanity_restart self.deploy_generation = 0 @@ -1656,6 +1661,7 @@ def run(self, exe: Executor) -> bool: Materialized( name=mz_service, external_blob_store="toxiproxy", + blob_store_is_azure=self.azurite, external_metadata_store="toxiproxy", ports=ports, sanity_restart=self.sanity_restart, diff --git a/misc/python/materialize/parallel_workload/database.py b/misc/python/materialize/parallel_workload/database.py index 97504f939bc70..0a47ba77f99af 100644 --- a/misc/python/materialize/parallel_workload/database.py +++ b/misc/python/materialize/parallel_workload/database.py @@ -517,7 +517,7 @@ def __init__( schema.name(), cluster.name(), ) - workload = rng.choice(list(WORKLOADS))() + workload = rng.choice(list(WORKLOADS))(azurite=False) for transaction_def in workload.cycle: for definition in transaction_def.operations: if type(definition) == Insert and definition.count > MAX_ROWS: @@ -677,7 +677,7 @@ def __init__( schema.name(), cluster.name(), ) - self.generator = rng.choice(list(WORKLOADS))().generate(fields) + self.generator = rng.choice(list(WORKLOADS))(azurite=False).generate(fields) self.lock = threading.Lock() def name(self) -> str: @@ -745,7 +745,7 @@ def __init__( schema.name(), cluster.name(), ) - self.generator = rng.choice(list(WORKLOADS))().generate(fields) + self.generator = rng.choice(list(WORKLOADS))(azurite=False).generate(fields) self.lock = threading.Lock() def name(self) -> str: diff --git a/misc/python/materialize/parallel_workload/parallel_workload.py b/misc/python/materialize/parallel_workload/parallel_workload.py index 83266e861607a..3722782de7316 100644 --- a/misc/python/materialize/parallel_workload/parallel_workload.py +++ b/misc/python/materialize/parallel_workload/parallel_workload.py @@ -68,6 +68,7 @@ def run( num_threads: int | None, naughty_identifiers: bool, composition: Composition | None, + azurite: bool, sanity_restart: bool, ) -> None: num_threads = num_threads or os.cpu_count() or 10 @@ -222,7 +223,7 @@ def run( assert composition, "Kill scenario only works in mzcompose" worker = Worker( worker_rng, - [KillAction(worker_rng, composition, sanity_restart)], + [KillAction(worker_rng, composition, azurite, sanity_restart)], [1], end_time, autocommit=False, @@ -246,6 +247,7 @@ def run( ZeroDowntimeDeployAction( worker_rng, composition, + azurite, sanity_restart, ) ], @@ -474,6 +476,9 @@ def parse_common_args(parser: argparse.ArgumentParser) -> None: action="store_true", help="Whether to initialize expensive parts like SQLsmith, sources, sinks (for fast local testing, reduces coverage)", ) + parser.add_argument( + "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" + ) def main() -> int: @@ -525,6 +530,7 @@ def main() -> int: args.threads, args.naughty_identifiers, composition=None, # only works in mzcompose + azurite=args.azurite, sanity_restart=False, # only works in mzcompose ) return 0 diff --git a/test/cloudtest/test_upgrade.py b/test/cloudtest/test_upgrade.py index a3cdf393e5e3b..0932afde0a163 100644 --- a/test/cloudtest/test_upgrade.py +++ b/test/cloudtest/test_upgrade.py @@ -91,5 +91,5 @@ def test_upgrade(aws_region: str | None, log_filter: str | None, dev: bool) -> N AlterConnectionHost, } ) - scenario = CloudtestUpgrade(checks=checks, executor=executor) + scenario = CloudtestUpgrade(checks=checks, executor=executor, azurite=False) scenario.run() diff --git a/test/data-ingest/mzcompose.py b/test/data-ingest/mzcompose.py index 5f2506fb0c09a..b42fceb543dcf 100644 --- a/test/data-ingest/mzcompose.py +++ b/test/data-ingest/mzcompose.py @@ -25,6 +25,7 @@ from materialize.data_ingest.workload import WORKLOADS, execute_workload from materialize.mzcompose import get_default_system_parameters from materialize.mzcompose.composition import Composition, WorkflowArgumentParser +from materialize.mzcompose.services.azure import Azurite from materialize.mzcompose.services.clusterd import Clusterd from materialize.mzcompose.services.kafka import Kafka from materialize.mzcompose.services.materialized import Materialized @@ -53,24 +54,10 @@ SchemaRegistry(), CockroachOrPostgresMetadata(), Minio(setup_materialize=True), - # Fixed port so that we keep the same port after restarting Mz in disruptions - Materialized( - ports=["16875:6875"], - external_blob_store=True, - external_metadata_store=True, - system_parameter_defaults=get_default_system_parameters(zero_downtime=True), - additional_system_parameter_defaults={"enable_table_keys": "true"}, - sanity_restart=False, - ), - Materialized( - name="materialized2", - ports=["26875:6875"], - external_blob_store=True, - external_metadata_store=True, - system_parameter_defaults=get_default_system_parameters(zero_downtime=True), - additional_system_parameter_defaults={"enable_table_keys": "true"}, - sanity_restart=False, - ), + Azurite(), + # Overridden below + Materialized(), + Materialized(name="materialized2"), Clusterd(name="clusterd1", options=["--scratch-directory=/mzdata/source_data"]), ] @@ -90,6 +77,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: action="append", help="Workload(s) to run.", ) + parser.add_argument( + "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" + ) args = parser.parse_args() @@ -118,38 +108,60 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: executor_classes = [MySqlExecutor, KafkaRoundtripExecutor, KafkaExecutor] - c.up(*services) - conn = c.sql_connection() - conn.autocommit = True - with conn.cursor() as cur: - cur.execute( - """CREATE CONNECTION IF NOT EXISTS kafka_conn - FOR KAFKA BROKER 'kafka:9092', SECURITY PROTOCOL PLAINTEXT""" - ) - cur.execute( - """CREATE CONNECTION IF NOT EXISTS csr_conn - FOR CONFLUENT SCHEMA REGISTRY - URL 'http://schema-registry:8081'""" - ) - conn.autocommit = False - conn.close() + with c.override( + # Fixed port so that we keep the same port after restarting Mz in disruptions + Materialized( + ports=["16875:6875"], + external_blob_store=True, + blob_store_is_azure=args.azurite, + external_metadata_store=True, + system_parameter_defaults=get_default_system_parameters(zero_downtime=True), + additional_system_parameter_defaults={"enable_table_keys": "true"}, + sanity_restart=False, + ), + Materialized( + name="materialized2", + ports=["26875:6875"], + external_blob_store=True, + blob_store_is_azure=args.azurite, + external_metadata_store=True, + system_parameter_defaults=get_default_system_parameters(zero_downtime=True), + additional_system_parameter_defaults={"enable_table_keys": "true"}, + sanity_restart=False, + ), + ): + c.up(*services) + conn = c.sql_connection() + conn.autocommit = True + with conn.cursor() as cur: + cur.execute( + """CREATE CONNECTION IF NOT EXISTS kafka_conn + FOR KAFKA BROKER 'kafka:9092', SECURITY PROTOCOL PLAINTEXT""" + ) + cur.execute( + """CREATE CONNECTION IF NOT EXISTS csr_conn + FOR CONFLUENT SCHEMA REGISTRY + URL 'http://schema-registry:8081'""" + ) + conn.autocommit = False + conn.close() - ports = {s: c.default_port(s) for s in services} - ports["materialized2"] = 26875 - mz_service = "materialized" - deploy_generation = 0 + ports = {s: c.default_port(s) for s in services} + ports["materialized2"] = 26875 + mz_service = "materialized" + deploy_generation = 0 - for i, workload_class in enumerate(workloads): - random.seed(args.seed) - print(f"--- Testing workload {workload_class.__name__}") - workload = workload_class(c, mz_service, deploy_generation) - execute_workload( - executor_classes, - workload, - i, - ports, - args.runtime, - args.verbose, - ) - mz_service = workload.mz_service - deploy_generation = workload.deploy_generation + for i, workload_class in enumerate(workloads): + random.seed(args.seed) + print(f"--- Testing workload {workload_class.__name__}") + workload = workload_class(args.azurite, c, mz_service, deploy_generation) + execute_workload( + executor_classes, + workload, + i, + ports, + args.runtime, + args.verbose, + ) + mz_service = workload.mz_service + deploy_generation = workload.deploy_generation diff --git a/test/feature-benchmark/mzcompose.py b/test/feature-benchmark/mzcompose.py index 414b6b0d5d652..b8f354e1a5a1c 100644 --- a/test/feature-benchmark/mzcompose.py +++ b/test/feature-benchmark/mzcompose.py @@ -84,6 +84,7 @@ TerminationCondition, ) from materialize.mzcompose.composition import Composition, WorkflowArgumentParser +from materialize.mzcompose.services.azure import Azurite from materialize.mzcompose.services.balancerd import Balancerd from materialize.mzcompose.services.clusterd import Clusterd from materialize.mzcompose.services.cockroach import Cockroach @@ -136,6 +137,7 @@ def make_aggregation_class() -> type[Aggregation]: Redpanda(), Cockroach(setup_materialize=True), Minio(setup_materialize=True), + Azurite(), KgenService(), Postgres(), MySql(), @@ -193,7 +195,9 @@ def run_one_scenario( additional_system_parameter_defaults[param_name] = param_value mz_image = f"materialize/materialized:{tag}" if tag else None - mz = create_mz_service(mz_image, size, additional_system_parameter_defaults) + mz = create_mz_service( + mz_image, size, additional_system_parameter_defaults, args.azurite + ) clusterd_image = f"materialize/clusterd:{tag}" if tag else None clusterd = create_clusterd_service( clusterd_image, size, additional_system_parameter_defaults @@ -204,7 +208,9 @@ def run_one_scenario( f"Unable to find materialize image with tag {tag}, proceeding with latest instead!" ) mz_image = "materialize/materialized:latest" - mz = create_mz_service(mz_image, size, additional_system_parameter_defaults) + mz = create_mz_service( + mz_image, size, additional_system_parameter_defaults, args.azurite + ) clusterd_image = f"materialize/clusterd:{tag}" if tag else None clusterd = create_clusterd_service( clusterd_image, size, additional_system_parameter_defaults @@ -220,6 +226,8 @@ def run_one_scenario( default_timeout=default_timeout, materialize_params={"statement_timeout": f"'{default_timeout}'"}, metadata_store="cockroach", + external_blob_store=True, + blob_store_is_azure=args.azurite, ) ): c.testdrive( @@ -308,6 +316,7 @@ def create_mz_service( mz_image: str | None, default_size: int, additional_system_parameter_defaults: dict[str, str] | None, + azurite: bool, ) -> Materialized: return Materialized( image=mz_image, @@ -319,6 +328,7 @@ def create_mz_service( external_metadata_store=True, metadata_store="cockroach", external_blob_store=True, + blob_store_is_azure=azurite, sanity_restart=False, ) @@ -462,6 +472,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "--other-size", metavar="N", type=int, default=4, help="SIZE to use for 'OTHER'" ) + parser.add_argument( + "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" + ) args = parser.parse_args() diff --git a/test/parallel-benchmark/mzcompose.py b/test/parallel-benchmark/mzcompose.py index 5f9a476e221b7..a928e49efa4f7 100644 --- a/test/parallel-benchmark/mzcompose.py +++ b/test/parallel-benchmark/mzcompose.py @@ -25,6 +25,7 @@ from materialize.mz_env_util import get_cloud_hostname from materialize.mzcompose import ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS from materialize.mzcompose.composition import Composition, WorkflowArgumentParser +from materialize.mzcompose.services.azure import Azurite from materialize.mzcompose.services.balancerd import Balancerd from materialize.mzcompose.services.cockroach import Cockroach from materialize.mzcompose.services.kafka import Kafka as KafkaService @@ -95,13 +96,14 @@ def known_regression(scenario: str, other_tag: str) -> bool: Redpanda(), Cockroach(setup_materialize=True), Minio(setup_materialize=True), + Azurite(), KgenService(), Postgres(), MySql(), Balancerd(), # Overridden below Materialized(), - Testdrive(no_reset=True, seed=1, metadata_store="cockroach"), + Testdrive(), Mz(app_password=""), ] @@ -439,11 +441,19 @@ def run_once( soft_assertions=False, external_metadata_store=True, external_blob_store=True, + blob_store_is_azure=args.azurite, sanity_restart=False, additional_system_parameter_defaults=ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS | {"max_connections": "100000"}, metadata_store="cockroach", - ) + ), + Testdrive( + no_reset=True, + seed=1, + metadata_store="cockroach", + external_blob_store=True, + blob_store_is_azure=args.azurite, + ), ] target = None @@ -778,6 +788,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: action="store_true", help="Store results in SQLite instead of in memory", ) + parser.add_argument( + "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" + ) args = parser.parse_args() diff --git a/test/parallel-workload/mzcompose.py b/test/parallel-workload/mzcompose.py index 7b16ff127c0f3..fad8f31dbee69 100644 --- a/test/parallel-workload/mzcompose.py +++ b/test/parallel-workload/mzcompose.py @@ -20,6 +20,7 @@ from materialize.mzcompose.composition import Composition, WorkflowArgumentParser from materialize.mzcompose.service import Service +from materialize.mzcompose.services.azure import Azurite from materialize.mzcompose.services.cockroach import Cockroach from materialize.mzcompose.services.kafka import Kafka from materialize.mzcompose.services.materialized import Materialized @@ -48,6 +49,7 @@ ), SchemaRegistry(), Minio(setup_materialize=True, additional_directories=["copytos3"]), + Azurite(), Mc(), Materialized(), Materialized(name="materialized2"), @@ -72,7 +74,6 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "zookeeper", "kafka", "schema-registry", - "minio", "materialized", ] @@ -84,6 +85,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: with c.override( Materialized( external_blob_store="toxiproxy", + blob_store_is_azure=args.azurite, external_metadata_store="toxiproxy", ports=["6975:6875", "6976:6876", "6977:6877"], sanity_restart=sanity_restart, @@ -91,7 +93,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: ), Toxiproxy(seed=random.randrange(2**63)), ): - toxiproxy_start(c) + toxiproxy_start(c, args.azurite) c.up(*service_names) c.up("mc", persistent=True) c.exec( @@ -124,6 +126,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: args.threads, args.naughty_identifiers, c, + args.azurite, sanity_restart, ) # Don't wait for potentially hanging threads that we are ignoring @@ -137,7 +140,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: # return -def toxiproxy_start(c: Composition) -> None: +def toxiproxy_start(c: Composition, azurite: bool) -> None: c.up("toxiproxy") port = c.default_port("toxiproxy") @@ -161,6 +164,16 @@ def toxiproxy_start(c: Composition) -> None: }, ) assert r.status_code == 201, r + r = requests.post( + f"http://localhost:{port}/proxies", + json={ + "name": "azurite", + "listen": "0.0.0.0:10000", + "upstream": "azurite:10000", + "enabled": True, + }, + ) + assert r.status_code == 201, r r = requests.post( f"http://localhost:{port}/proxies/cockroach/toxics", json={ @@ -179,3 +192,12 @@ def toxiproxy_start(c: Composition) -> None: }, ) assert r.status_code == 200, r + r = requests.post( + f"http://localhost:{port}/proxies/minio/toxics", + json={ + "name": "azurite", + "type": "latency", + "attributes": {"latency": 0, "jitter": 100}, + }, + ) + assert r.status_code == 200, r diff --git a/test/platform-checks/mzcompose.py b/test/platform-checks/mzcompose.py index 35f19b51818cf..e1b3a26f61572 100644 --- a/test/platform-checks/mzcompose.py +++ b/test/platform-checks/mzcompose.py @@ -26,6 +26,7 @@ from materialize.checks.scenarios_upgrade import * # noqa: F401 F403 from materialize.checks.scenarios_zero_downtime import * # noqa: F401 F403 from materialize.mzcompose.composition import Composition, WorkflowArgumentParser +from materialize.mzcompose.services.azure import Azurite from materialize.mzcompose.services.clusterd import Clusterd from materialize.mzcompose.services.cockroach import Cockroach from materialize.mzcompose.services.debezium import Debezium @@ -48,19 +49,37 @@ def create_mzs( - additional_system_parameter_defaults: dict[str, str] | None = None -) -> list[Materialized]: + azurite: bool, + additional_system_parameter_defaults: dict[str, str] | None = None, +) -> list[TestdriveService | Materialized]: return [ Materialized( name=mz_name, external_metadata_store=True, external_blob_store=True, + blob_store_is_azure=azurite, sanity_restart=False, volumes_extra=["secrets:/share/secrets"], metadata_store="cockroach", additional_system_parameter_defaults=additional_system_parameter_defaults, ) for mz_name in ["materialized", "mz_1", "mz_2", "mz_3", "mz_4", "mz_5"] + ] + [ + TestdriveService( + default_timeout=TESTDRIVE_DEFAULT_TIMEOUT, + materialize_params={"statement_timeout": f"'{TESTDRIVE_DEFAULT_TIMEOUT}'"}, + external_blob_store=True, + blob_store_is_azure=azurite, + no_reset=True, + seed=1, + entrypoint_extra=[ + "--var=replicas=1", + f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}", + f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1", + ], + volumes_extra=["secrets:/share/secrets"], + metadata_store="cockroach", + ) ] @@ -73,6 +92,7 @@ def create_mzs( restart="on-failure:5", ), Minio(setup_materialize=True, additional_directories=["copytos3"]), + Azurite(), Mc(), Postgres(), MySql(), @@ -117,20 +137,7 @@ def create_mzs( Clusterd( name="clusterd_compute_1" ), # Started by some Scenarios, defined here only for the teardown - *create_mzs(), - TestdriveService( - default_timeout=TESTDRIVE_DEFAULT_TIMEOUT, - materialize_params={"statement_timeout": f"'{TESTDRIVE_DEFAULT_TIMEOUT}'"}, - no_reset=True, - seed=1, - entrypoint_extra=[ - "--var=replicas=1", - f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}", - f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1", - ], - volumes_extra=["secrets:/share/secrets"], - metadata_store="cockroach", - ), + *create_mzs(azurite=False), Persistcli(), SshBastionHost(), ] @@ -157,7 +164,6 @@ def setup(c: Composition) -> None: "postgres", "mysql", "debezium", - "minio", "ssh-bastion-host", ) @@ -211,6 +217,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: nargs="*", help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`", ) + parser.add_argument( + "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" + ) args = parser.parse_args() @@ -242,7 +251,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: assert len(x) == 2, f"--system-param '{val}' should be the format =" additional_system_parameter_defaults[x[0]] = x[1] - with c.override(*create_mzs(additional_system_parameter_defaults)): + with c.override(*create_mzs(args.azurite, additional_system_parameter_defaults)): executor = MzcomposeExecutor(composition=c) for scenario_class in scenarios: assert issubclass( @@ -263,7 +272,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: if execution_mode in [ExecutionMode.SEQUENTIAL, ExecutionMode.PARALLEL]: setup(c) scenario = scenario_class( - checks=checks, executor=executor, seed=args.seed + checks=checks, + executor=executor, + azurite=args.azurite, + seed=args.seed, ) scenario.run() teardown(c) @@ -277,7 +289,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: ) setup(c) scenario = scenario_class( - checks=[check], executor=executor, seed=args.seed + checks=[check], + executor=executor, + azurite=args.azurite, + seed=args.seed, ) scenario.run() teardown(c) diff --git a/test/testdrive-old-kafka-src-syntax/mzcompose.py b/test/testdrive-old-kafka-src-syntax/mzcompose.py index 937c9671933db..77fd29709bf9d 100644 --- a/test/testdrive-old-kafka-src-syntax/mzcompose.py +++ b/test/testdrive-old-kafka-src-syntax/mzcompose.py @@ -18,6 +18,7 @@ from materialize import ci_util from materialize.mzcompose import get_default_system_parameters from materialize.mzcompose.composition import Composition, WorkflowArgumentParser +from materialize.mzcompose.services.azure import Azurite from materialize.mzcompose.services.fivetran_destination import FivetranDestination from materialize.mzcompose.services.kafka import Kafka from materialize.mzcompose.services.materialized import Materialized @@ -37,6 +38,7 @@ Postgres(), MySql(), Minio(setup_materialize=True, additional_directories=["copytos3"]), + Azurite(), Materialized(external_blob_store=True), FivetranDestination(volumes_extra=["tmp:/share/tmp"]), Testdrive(external_blob_store=True), @@ -88,6 +90,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: help="Rewrite results, disables junit reports", ) + parser.add_argument( + "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" + ) + parser.add_argument( "files", nargs="*", @@ -98,7 +104,6 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: dependencies = [ "fivetran-destination", - "minio", "materialized", "postgres", "mysql", @@ -134,6 +139,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: materialized = Materialized( default_size=args.default_size, external_blob_store=True, + blob_store_is_azure=args.azurite, additional_system_parameter_defaults=additional_system_parameter_defaults, ) @@ -145,6 +151,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: default_timeout=args.default_timeout, volumes_extra=["mzdata:/mzdata"], external_blob_store=True, + blob_store_is_azure=args.azurite, fivetran_destination=True, fivetran_destination_files_path="/share/tmp", entrypoint_extra=[ diff --git a/test/testdrive/mzcompose.py b/test/testdrive/mzcompose.py index a5c025883152f..7cd31db9646f6 100644 --- a/test/testdrive/mzcompose.py +++ b/test/testdrive/mzcompose.py @@ -104,11 +104,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: dependencies = [ "fivetran-destination", - "minio", - "azurite", "materialized", "postgres", "mysql", + "minio", ] if args.redpanda: dependencies += ["redpanda"] diff --git a/test/txn-wal-fencing/mzcompose.py b/test/txn-wal-fencing/mzcompose.py index 5c2275dca1b60..e080485f47d73 100644 --- a/test/txn-wal-fencing/mzcompose.py +++ b/test/txn-wal-fencing/mzcompose.py @@ -12,6 +12,7 @@ purpose of exercising fencing. """ +import argparse import random import time from concurrent import futures @@ -19,7 +20,8 @@ from enum import Enum from materialize import buildkite -from materialize.mzcompose.composition import Composition +from materialize.mzcompose.composition import Composition, WorkflowArgumentParser +from materialize.mzcompose.services.azure import Azurite from materialize.mzcompose.services.materialized import Materialized from materialize.mzcompose.services.minio import Minio from materialize.mzcompose.services.postgres import CockroachOrPostgresMetadata @@ -87,6 +89,7 @@ class SuccessfulCommit: SERVICES = [ Minio(setup_materialize=True), + Azurite(), CockroachOrPostgresMetadata(), # Overriden below Materialized(name="mz_first"), @@ -94,14 +97,19 @@ class SuccessfulCommit: ] -def workflow_default(c: Composition) -> None: +def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: + parser.add_argument( + "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" + ) + args = parser.parse_args() + workloads = buildkite.shard_list(WORKLOADS, lambda w: w.name) print( f"Workloads in shard with index {buildkite.get_parallelism_index()}: {[w.name for w in workloads]}" ) for workload in workloads: - run_workload(c, workload) + run_workload(c, workload, args) def execute_operation( @@ -161,12 +169,12 @@ def execute_operation( ) -def run_workload(c: Composition, workload: Workload) -> None: +def run_workload(c: Composition, workload: Workload, args: argparse.Namespace) -> None: print(f"+++ Running workload {workload.name} ...") c.silent = True c.down(destroy_volumes=True) - c.up("minio", c.metadata_store()) + c.up(c.metadata_store()) mzs = { "mz_first": workload.txn_wal_first, @@ -179,6 +187,7 @@ def run_workload(c: Composition, workload: Workload) -> None: name=mz_name, external_metadata_store=True, external_blob_store=True, + blob_store_is_azure=args.azurite, sanity_restart=False, ) for mz_name in mzs diff --git a/test/zippy/mzcompose.py b/test/zippy/mzcompose.py index b66bec4fc4551..2ebe32f5355ae 100644 --- a/test/zippy/mzcompose.py +++ b/test/zippy/mzcompose.py @@ -19,6 +19,7 @@ from enum import Enum from materialize.mzcompose.composition import Composition, WorkflowArgumentParser +from materialize.mzcompose.services.azure import Azurite from materialize.mzcompose.services.balancerd import Balancerd from materialize.mzcompose.services.clusterd import Clusterd from materialize.mzcompose.services.cockroach import Cockroach @@ -43,18 +44,36 @@ def create_mzs( - additional_system_parameter_defaults: dict[str, str] | None = None -) -> list[Materialized]: + azurite: bool, + transaction_isolation: bool, + additional_system_parameter_defaults: dict[str, str] | None = None, +) -> list[Testdrive | Materialized]: return [ Materialized( name=mz_name, external_blob_store=True, + blob_store_is_azure=azurite, external_metadata_store=True, sanity_restart=False, metadata_store="cockroach", additional_system_parameter_defaults=additional_system_parameter_defaults, ) for mz_name in ["materialized", "materialized2"] + ] + [ + Testdrive( + materialize_url="postgres://materialize@balancerd:6875", + no_reset=True, + seed=1, + # Timeout increased since Large Zippy occasionally runs into them + default_timeout="1200s", + materialize_params={ + "statement_timeout": "'1800s'", + "transaction_isolation": f"'{transaction_isolation}'", + }, + metadata_store="cockroach", + external_blob_store=True, + blob_store_is_azure=azurite, + ), ] @@ -65,11 +84,11 @@ def create_mzs( Postgres(), Cockroach(), Minio(setup_materialize=True, additional_directories=["copytos3"]), + Azurite(), Mc(), Balancerd(), - *create_mzs(), + *create_mzs(azurite=False, transaction_isolation=False), Clusterd(name="storaged"), - Testdrive(metadata_store="cockroach"), Grafana(), Prometheus(), SshBastionHost(), @@ -157,6 +176,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`", ) + parser.add_argument( + "--azurite", action="store_true", help="Use Azurite as blob store instead of S3" + ) + args = parser.parse_args() scenario_class = globals()[args.scenario] @@ -181,19 +204,11 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: restart="on-failure:5", setup_materialize=True, ), - Testdrive( - materialize_url="postgres://materialize@balancerd:6875", - no_reset=True, - seed=1, - # Timeout increased since Large Zippy occasionally runs into them - default_timeout="1200s", - materialize_params={ - "statement_timeout": "'1800s'", - "transaction_isolation": f"'{args.transaction_isolation}'", - }, - metadata_store="cockroach", + *create_mzs( + args.azurite, + args.transaction_isolation, + additional_system_parameter_defaults, ), - *create_mzs(additional_system_parameter_defaults), ): c.up("materialized")