Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM: Azurite testing #30907

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
751 changes: 643 additions & 108 deletions Cargo.lock

Large diffs are not rendered by default.

99 changes: 58 additions & 41 deletions ci/nightly/pipeline.template.yml

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ steps:
plugins:
- ./ci/plugins/mzcompose:
composition: cluster
args: ["--azurite"]
agents:
queue: hetzner-aarch64-8cpu-16gb

Expand Down Expand Up @@ -593,7 +594,7 @@ steps:
plugins:
- ./ci/plugins/mzcompose:
composition: zippy
args: [--scenario=KafkaSources, --actions=100]
args: [--scenario=KafkaSources, --actions=100, --azurite]

- group: "Platform checks"
key: platform-checks
Expand All @@ -613,6 +614,7 @@ steps:
[
--scenario=RestartEnvironmentdClusterdStorage,
"--seed=$BUILDKITE_JOB_ID",
--azurite,
]

- id: checks-no-restart-no-upgrade
Expand All @@ -626,7 +628,7 @@ steps:
plugins:
- ./ci/plugins/mzcompose:
composition: platform-checks
args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID"]
args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID", --azurite]

- id: source-sink-errors
label: "Source/Sink Error Reporting %N"
Expand All @@ -651,6 +653,7 @@ steps:
--scenario=KafkaUpsertUnique,
--other-tag=common-ancestor,
--ignore-other-tag-missing,
--azurite,
]
coverage: skip
agents:
Expand Down
4 changes: 3 additions & 1 deletion misc/python/materialize/checks/mzcompose_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self.restart = restart
self.force_migrations = force_migrations
self.publish = publish
self.scenario = scenario

def execute(self, e: Executor) -> None:
c = e.mzcompose_composition()
Expand All @@ -72,7 +73,8 @@ def execute(self, e: Executor) -> None:
name=self.mz_service,
image=image,
external_metadata_store=True,
external_minio=True,
external_blob_store=True,
blob_store_is_azure=self.scenario.azurite,
environment_extra=self.environment_extra,
system_parameter_defaults=self.system_parameter_defaults,
additional_system_parameter_defaults=self.additional_system_parameter_defaults,
Expand Down
10 changes: 8 additions & 2 deletions misc/python/materialize/checks/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@

class Scenario:
def __init__(
self, checks: list[type[Check]], executor: Executor, seed: str | None = None
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None = None,
) -> None:
self._checks = checks
self.executor = executor
self.azurite = azurite
self.rng = None if seed is None else Random(seed)
self._base_version = MzVersion.parse_cargo()

Expand Down Expand Up @@ -269,10 +274,11 @@ def __init__(
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None,
change_entries: list[SystemVarChangeEntry],
):
super().__init__(checks, executor, seed)
super().__init__(checks, executor, azurite, seed)
self.change_entries = change_entries

def actions(self) -> list[Action]:
Expand Down
8 changes: 6 additions & 2 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,14 @@ class UpgradeEntireMzFourVersions(Scenario):
"""Test upgrade X-4 -> X-3 -> X-2 -> X-1 -> X"""

def __init__(
self, checks: list[type[Check]], executor: Executor, seed: str | None = None
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None = None,
):
self.minor_versions = get_minor_versions()
super().__init__(checks, executor, seed)
super().__init__(checks, executor, azurite, seed)

def base_version(self) -> MzVersion:
return self.minor_versions[3]
Expand Down
8 changes: 6 additions & 2 deletions misc/python/materialize/checks/scenarios_zero_downtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,14 @@ class ZeroDowntimeUpgradeEntireMzFourVersions(Scenario):
"""Test 0dt upgrade from X-4 -> X-3 -> X-2 -> X-1 -> X"""

def __init__(
self, checks: list[type[Check]], executor: Executor, seed: str | None = None
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None = None,
):
self.minor_versions = get_minor_versions()
super().__init__(checks, executor, seed)
super().__init__(checks, executor, azurite, seed)

def base_version(self) -> MzVersion:
return self.minor_versions[3]
Expand Down
8 changes: 7 additions & 1 deletion misc/python/materialize/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
else f"{Arch.host()}-apple-darwin"
)
DEFAULT_POSTGRES = "postgres://root@localhost:26257/materialize"
DEFAULT_BLOB = "file://mzdata/persist/blob"

# sets entitlements on the built binary, e.g. environmentd, so you can inspect it with Instruments
MACOS_ENTITLEMENTS_DATA = """
Expand Down Expand Up @@ -88,6 +89,11 @@ def main() -> int:
help="Postgres/CockroachDB connection string",
default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES),
)
parser.add_argument(
"--blob",
help="Blob storage connection string",
default=os.getenv("MZDEV_BLOB", DEFAULT_BLOB),
)
parser.add_argument(
"--release",
help="Build artifacts in release mode, with optimizations",
Expand Down Expand Up @@ -276,7 +282,7 @@ def main() -> int:
f"--orchestrator-process-scratch-directory={scratch}",
"--secrets-controller=local-file",
f"--persist-consensus-url={args.postgres}?options=--search_path=consensus",
f"--persist-blob-url=file://{mzdata}/persist/blob",
f"--persist-blob-url={args.blob}",
f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle",
f"--environment-id={environment_id}",
"--bootstrap-role=materialize",
Expand Down
20 changes: 16 additions & 4 deletions misc/python/materialize/data_ingest/transaction_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,16 @@ class RestartMz(TransactionDef):
workload: "Workload"

def __init__(
self, composition: Composition, probability: float, workload: "Workload"
self,
composition: Composition,
probability: float,
workload: "Workload",
azurite: bool,
):
self.composition = composition
self.probability = probability
self.workload = workload
self.azurite = azurite

def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
if random.random() < self.probability:
Expand All @@ -82,7 +87,8 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
Materialized(
name=self.workload.mz_service,
ports=ports,
external_minio=True,
external_blob_store=True,
blob_store_is_azure=self.azurite,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(
zero_downtime=True
Expand All @@ -104,11 +110,16 @@ class ZeroDowntimeDeploy(TransactionDef):
workload: "Workload"

def __init__(
self, composition: Composition, probability: float, workload: "Workload"
self,
composition: Composition,
probability: float,
workload: "Workload",
azurite: bool,
):
self.composition = composition
self.probability = probability
self.workload = workload
self.azurite = azurite

def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
if random.random() < self.probability:
Expand All @@ -129,7 +140,8 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
Materialized(
name=self.workload.mz_service,
ports=ports,
external_minio=True,
external_blob_store=True,
blob_store_is_azure=self.azurite,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(
zero_downtime=True
Expand Down
48 changes: 35 additions & 13 deletions misc/python/materialize/data_ingest/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ class Workload:
deploy_generation: int

def __init__(
self, mz_service: str = "materailized", deploy_generation: int = 0
self,
azurite: bool,
mz_service: str = "materailized",
deploy_generation: int = 0,
) -> None:
self.azurite = azurite
self.mz_service = mz_service
self.deploy_generation = deploy_generation

Expand All @@ -62,11 +66,12 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction]:
class SingleSensorUpdating(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -83,11 +88,12 @@ def __init__(
class SingleSensorUpdatingDisruptions(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -100,17 +106,22 @@ def __init__(
),
]
if composition:
self.cycle.append(RestartMz(composition, probability=0.1, workload=self))
self.cycle.append(
RestartMz(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class SingleSensorUpdating0dtDeploy(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -124,18 +135,21 @@ def __init__(
]
if composition:
self.cycle.append(
ZeroDowntimeDeploy(composition, probability=0.1, workload=self)
ZeroDowntimeDeploy(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class DeleteDataAtEndOfDay(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand Down Expand Up @@ -163,11 +177,12 @@ def __init__(
class DeleteDataAtEndOfDayDisruptions(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand All @@ -192,17 +207,22 @@ def __init__(
]

if composition:
self.cycle.append(RestartMz(composition, probability=0.1, workload=self))
self.cycle.append(
RestartMz(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class DeleteDataAtEndOfDay0dtDeploys(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand All @@ -228,16 +248,18 @@ def __init__(

if composition:
self.cycle.append(
ZeroDowntimeDeploy(composition, probability=0.1, workload=self)
ZeroDowntimeDeploy(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


# TODO: Implement
# class ProgressivelyEnrichRecords(Workload):
# def __init__(
# self, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0
# self, azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0
# ) -> None:
# super().__init__(mz_service, deploy_generation)
# super().__init__(azurite, mz_service, deploy_generation)
# self.cycle: list[Definition] = [
# ]

Expand Down
Loading
Loading