Skip to content

Commit

Permalink
WIP: Other tests
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Jan 3, 2025
1 parent e77a42f commit f4f8281
Show file tree
Hide file tree
Showing 22 changed files with 347 additions and 171 deletions.
87 changes: 46 additions & 41 deletions ci/nightly/pipeline.template.yml

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ steps:
plugins:
- ./ci/plugins/mzcompose:
composition: cluster
args: ["--azurite"]
agents:
queue: hetzner-aarch64-8cpu-16gb

Expand Down Expand Up @@ -593,7 +594,7 @@ steps:
plugins:
- ./ci/plugins/mzcompose:
composition: zippy
args: [--scenario=KafkaSources, --actions=100]
args: [--scenario=KafkaSources, --actions=100, --azurite]

- group: "Platform checks"
key: platform-checks
Expand All @@ -613,6 +614,7 @@ steps:
[
--scenario=RestartEnvironmentdClusterdStorage,
"--seed=$BUILDKITE_JOB_ID",
--azurite,
]

- id: checks-no-restart-no-upgrade
Expand All @@ -626,7 +628,7 @@ steps:
plugins:
- ./ci/plugins/mzcompose:
composition: platform-checks
args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID"]
args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID", --azurite]

- id: source-sink-errors
label: "Source/Sink Error Reporting %N"
Expand All @@ -651,6 +653,7 @@ steps:
--scenario=KafkaUpsertUnique,
--other-tag=common-ancestor,
--ignore-other-tag-missing,
--azurite,
]
coverage: skip
agents:
Expand Down
2 changes: 2 additions & 0 deletions misc/python/materialize/checks/mzcompose_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self.restart = restart
self.force_migrations = force_migrations
self.publish = publish
self.scenario = scenario

def execute(self, e: Executor) -> None:
c = e.mzcompose_composition()
Expand All @@ -73,6 +74,7 @@ def execute(self, e: Executor) -> None:
image=image,
external_metadata_store=True,
external_blob_store=True,
blob_store_is_azure=self.scenario.azurite,
environment_extra=self.environment_extra,
system_parameter_defaults=self.system_parameter_defaults,
additional_system_parameter_defaults=self.additional_system_parameter_defaults,
Expand Down
10 changes: 8 additions & 2 deletions misc/python/materialize/checks/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@

class Scenario:
def __init__(
self, checks: list[type[Check]], executor: Executor, seed: str | None = None
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None = None,
) -> None:
self._checks = checks
self.executor = executor
self.azurite = azurite
self.rng = None if seed is None else Random(seed)
self._base_version = MzVersion.parse_cargo()

Expand Down Expand Up @@ -269,10 +274,11 @@ def __init__(
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None,
change_entries: list[SystemVarChangeEntry],
):
super().__init__(checks, executor, seed)
super().__init__(checks, executor, azurite, seed)
self.change_entries = change_entries

def actions(self) -> list[Action]:
Expand Down
8 changes: 6 additions & 2 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,14 @@ class UpgradeEntireMzFourVersions(Scenario):
"""Test upgrade X-4 -> X-3 -> X-2 -> X-1 -> X"""

def __init__(
self, checks: list[type[Check]], executor: Executor, seed: str | None = None
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None = None,
):
self.minor_versions = get_minor_versions()
super().__init__(checks, executor, seed)
super().__init__(checks, executor, azurite, seed)

def base_version(self) -> MzVersion:
return self.minor_versions[3]
Expand Down
8 changes: 6 additions & 2 deletions misc/python/materialize/checks/scenarios_zero_downtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,14 @@ class ZeroDowntimeUpgradeEntireMzFourVersions(Scenario):
"""Test 0dt upgrade from X-4 -> X-3 -> X-2 -> X-1 -> X"""

def __init__(
self, checks: list[type[Check]], executor: Executor, seed: str | None = None
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None = None,
):
self.minor_versions = get_minor_versions()
super().__init__(checks, executor, seed)
super().__init__(checks, executor, azurite, seed)

def base_version(self) -> MzVersion:
return self.minor_versions[3]
Expand Down
16 changes: 14 additions & 2 deletions misc/python/materialize/data_ingest/transaction_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,16 @@ class RestartMz(TransactionDef):
workload: "Workload"

def __init__(
self, composition: Composition, probability: float, workload: "Workload"
self,
composition: Composition,
probability: float,
workload: "Workload",
azurite: bool,
):
self.composition = composition
self.probability = probability
self.workload = workload
self.azurite = azurite

def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
if random.random() < self.probability:
Expand All @@ -83,6 +88,7 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
name=self.workload.mz_service,
ports=ports,
external_blob_store=True,
blob_store_is_azure=self.azurite,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(
zero_downtime=True
Expand All @@ -104,11 +110,16 @@ class ZeroDowntimeDeploy(TransactionDef):
workload: "Workload"

def __init__(
self, composition: Composition, probability: float, workload: "Workload"
self,
composition: Composition,
probability: float,
workload: "Workload",
azurite: bool,
):
self.composition = composition
self.probability = probability
self.workload = workload
self.azurite = azurite

def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
if random.random() < self.probability:
Expand All @@ -130,6 +141,7 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
name=self.workload.mz_service,
ports=ports,
external_blob_store=True,
blob_store_is_azure=self.azurite,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(
zero_downtime=True
Expand Down
48 changes: 35 additions & 13 deletions misc/python/materialize/data_ingest/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ class Workload:
deploy_generation: int

def __init__(
self, mz_service: str = "materailized", deploy_generation: int = 0
self,
azurite: bool,
mz_service: str = "materailized",
deploy_generation: int = 0,
) -> None:
self.azurite = azurite
self.mz_service = mz_service
self.deploy_generation = deploy_generation

Expand All @@ -62,11 +66,12 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction]:
class SingleSensorUpdating(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -83,11 +88,12 @@ def __init__(
class SingleSensorUpdatingDisruptions(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -100,17 +106,22 @@ def __init__(
),
]
if composition:
self.cycle.append(RestartMz(composition, probability=0.1, workload=self))
self.cycle.append(
RestartMz(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class SingleSensorUpdating0dtDeploy(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -124,18 +135,21 @@ def __init__(
]
if composition:
self.cycle.append(
ZeroDowntimeDeploy(composition, probability=0.1, workload=self)
ZeroDowntimeDeploy(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class DeleteDataAtEndOfDay(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand Down Expand Up @@ -163,11 +177,12 @@ def __init__(
class DeleteDataAtEndOfDayDisruptions(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand All @@ -192,17 +207,22 @@ def __init__(
]

if composition:
self.cycle.append(RestartMz(composition, probability=0.1, workload=self))
self.cycle.append(
RestartMz(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class DeleteDataAtEndOfDay0dtDeploys(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand All @@ -228,16 +248,18 @@ def __init__(

if composition:
self.cycle.append(
ZeroDowntimeDeploy(composition, probability=0.1, workload=self)
ZeroDowntimeDeploy(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


# TODO: Implement
# class ProgressivelyEnrichRecords(Workload):
# def __init__(
# self, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0
# self, azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0
# ) -> None:
# super().__init__(mz_service, deploy_generation)
# super().__init__(azurite, mz_service, deploy_generation)
# self.cycle: list[Definition] = [
# ]

Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
"--blobPort",
"10000",
"--disableProductStyleUrl",
"--loose",
]

if in_memory:
Expand Down
6 changes: 6 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1599,12 +1599,14 @@ def __init__(
self,
rng: random.Random,
composition: Composition | None,
azurite: bool,
sanity_restart: bool,
system_param_fn: Callable[[dict[str, str]], dict[str, str]] = lambda x: x,
):
super().__init__(rng, composition)
self.system_param_fn = system_param_fn
self.system_parameters = {}
self.azurite = azurite
self.sanity_restart = sanity_restart

def run(self, exe: Executor) -> bool:
Expand All @@ -1615,6 +1617,7 @@ def run(self, exe: Executor) -> bool:
Materialized(
restart="on-failure",
external_blob_store="toxiproxy",
blob_store_is_azure=self.azurite,
external_metadata_store="toxiproxy",
ports=["6975:6875", "6976:6876", "6977:6877"],
sanity_restart=self.sanity_restart,
Expand All @@ -1632,9 +1635,11 @@ def __init__(
self,
rng: random.Random,
composition: Composition | None,
azurite: bool,
sanity_restart: bool,
):
super().__init__(rng, composition)
self.azurite = azurite
self.sanity_restart = sanity_restart
self.deploy_generation = 0

Expand All @@ -1656,6 +1661,7 @@ def run(self, exe: Executor) -> bool:
Materialized(
name=mz_service,
external_blob_store="toxiproxy",
blob_store_is_azure=self.azurite,
external_metadata_store="toxiproxy",
ports=ports,
sanity_restart=self.sanity_restart,
Expand Down
Loading

0 comments on commit f4f8281

Please sign in to comment.