Skip to content

Commit

Permalink
WIP: Other tests
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Jan 3, 2025
1 parent e77a42f commit 5b270fc
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 115 deletions.
2 changes: 2 additions & 0 deletions misc/python/materialize/checks/mzcompose_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self.restart = restart
self.force_migrations = force_migrations
self.publish = publish
self.scenario = scenario

def execute(self, e: Executor) -> None:
c = e.mzcompose_composition()
Expand All @@ -73,6 +74,7 @@ def execute(self, e: Executor) -> None:
image=image,
external_metadata_store=True,
external_blob_store=True,
blob_store_is_azure=self.scenario.azurite,
environment_extra=self.environment_extra,
system_parameter_defaults=self.system_parameter_defaults,
additional_system_parameter_defaults=self.additional_system_parameter_defaults,
Expand Down
7 changes: 6 additions & 1 deletion misc/python/materialize/checks/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@

class Scenario:
def __init__(
self, checks: list[type[Check]], executor: Executor, seed: str | None = None
self,
checks: list[type[Check]],
executor: Executor,
azurite: bool,
seed: str | None = None,
) -> None:
self._checks = checks
self.executor = executor
self.azurite = azurite
self.rng = None if seed is None else Random(seed)
self._base_version = MzVersion.parse_cargo()

Expand Down
16 changes: 14 additions & 2 deletions misc/python/materialize/data_ingest/transaction_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,16 @@ class RestartMz(TransactionDef):
workload: "Workload"

def __init__(
self, composition: Composition, probability: float, workload: "Workload"
self,
composition: Composition,
probability: float,
workload: "Workload",
azurite: bool,
):
self.composition = composition
self.probability = probability
self.workload = workload
self.azurite = azurite

def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
if random.random() < self.probability:
Expand All @@ -83,6 +88,7 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
name=self.workload.mz_service,
ports=ports,
external_blob_store=True,
blob_store_is_azure=self.azurite,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(
zero_downtime=True
Expand All @@ -104,11 +110,16 @@ class ZeroDowntimeDeploy(TransactionDef):
workload: "Workload"

def __init__(
self, composition: Composition, probability: float, workload: "Workload"
self,
composition: Composition,
probability: float,
workload: "Workload",
azurite: bool,
):
self.composition = composition
self.probability = probability
self.workload = workload
self.azurite = azurite

def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
if random.random() < self.probability:
Expand All @@ -130,6 +141,7 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction | None]:
name=self.workload.mz_service,
ports=ports,
external_blob_store=True,
blob_store_is_azure=self.azurite,
external_metadata_store=True,
system_parameter_defaults=get_default_system_parameters(
zero_downtime=True
Expand Down
48 changes: 35 additions & 13 deletions misc/python/materialize/data_ingest/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ class Workload:
deploy_generation: int

def __init__(
self, mz_service: str = "materailized", deploy_generation: int = 0
self,
azurite: bool,
mz_service: str = "materailized",
deploy_generation: int = 0,
) -> None:
self.azurite = azurite
self.mz_service = mz_service
self.deploy_generation = deploy_generation

Expand All @@ -62,11 +66,12 @@ def generate(self, fields: list[Field]) -> Iterator[Transaction]:
class SingleSensorUpdating(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -83,11 +88,12 @@ def __init__(
class SingleSensorUpdatingDisruptions(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -100,17 +106,22 @@ def __init__(
),
]
if composition:
self.cycle.append(RestartMz(composition, probability=0.1, workload=self))
self.cycle.append(
RestartMz(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class SingleSensorUpdating0dtDeploy(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
self.cycle = [
TransactionDef(
[
Expand All @@ -124,18 +135,21 @@ def __init__(
]
if composition:
self.cycle.append(
ZeroDowntimeDeploy(composition, probability=0.1, workload=self)
ZeroDowntimeDeploy(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class DeleteDataAtEndOfDay(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand Down Expand Up @@ -163,11 +177,12 @@ def __init__(
class DeleteDataAtEndOfDayDisruptions(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand All @@ -192,17 +207,22 @@ def __init__(
]

if composition:
self.cycle.append(RestartMz(composition, probability=0.1, workload=self))
self.cycle.append(
RestartMz(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


class DeleteDataAtEndOfDay0dtDeploys(Workload):
def __init__(
self,
azurite: bool,
composition: Composition | None = None,
mz_service: str = "materialized",
deploy_generation: int = 0,
) -> None:
super().__init__(mz_service, deploy_generation)
super().__init__(azurite, mz_service, deploy_generation)
insert = Insert(
count=Records.SOME,
record_size=RecordSize.SMALL,
Expand All @@ -228,16 +248,18 @@ def __init__(

if composition:
self.cycle.append(
ZeroDowntimeDeploy(composition, probability=0.1, workload=self)
ZeroDowntimeDeploy(
composition, probability=0.1, workload=self, azurite=self.azurite
)
)


# TODO: Implement
# class ProgressivelyEnrichRecords(Workload):
# def __init__(
# self, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0
# self, azurite: bool, composition: Composition | None = None, mz_service: str = "materialized", deploy_generation: int = 0
# ) -> None:
# super().__init__(mz_service, deploy_generation)
# super().__init__(azurite, mz_service, deploy_generation)
# self.cycle: list[Definition] = [
# ]

Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/services/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
"--blobPort",
"10000",
"--disableProductStyleUrl",
"--loose",
]

if in_memory:
Expand Down
6 changes: 6 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1599,12 +1599,14 @@ def __init__(
self,
rng: random.Random,
composition: Composition | None,
azurite: bool,
sanity_restart: bool,
system_param_fn: Callable[[dict[str, str]], dict[str, str]] = lambda x: x,
):
super().__init__(rng, composition)
self.system_param_fn = system_param_fn
self.system_parameters = {}
self.azurite = azurite
self.sanity_restart = sanity_restart

def run(self, exe: Executor) -> bool:
Expand All @@ -1615,6 +1617,7 @@ def run(self, exe: Executor) -> bool:
Materialized(
restart="on-failure",
external_blob_store="toxiproxy",
blob_store_is_azure=self.azurite,
external_metadata_store="toxiproxy",
ports=["6975:6875", "6976:6876", "6977:6877"],
sanity_restart=self.sanity_restart,
Expand All @@ -1632,9 +1635,11 @@ def __init__(
self,
rng: random.Random,
composition: Composition | None,
azurite: bool,
sanity_restart: bool,
):
super().__init__(rng, composition)
self.azurite = azurite
self.sanity_restart = sanity_restart
self.deploy_generation = 0

Expand All @@ -1656,6 +1661,7 @@ def run(self, exe: Executor) -> bool:
Materialized(
name=mz_service,
external_blob_store="toxiproxy",
blob_store_is_azure=self.azurite,
external_metadata_store="toxiproxy",
ports=ports,
sanity_restart=self.sanity_restart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def run(
num_threads: int | None,
naughty_identifiers: bool,
composition: Composition | None,
azurite: bool,
sanity_restart: bool,
) -> None:
num_threads = num_threads or os.cpu_count() or 10
Expand Down Expand Up @@ -222,7 +223,7 @@ def run(
assert composition, "Kill scenario only works in mzcompose"
worker = Worker(
worker_rng,
[KillAction(worker_rng, composition, sanity_restart)],
[KillAction(worker_rng, composition, azurite, sanity_restart)],
[1],
end_time,
autocommit=False,
Expand All @@ -246,6 +247,7 @@ def run(
ZeroDowntimeDeployAction(
worker_rng,
composition,
azurite,
sanity_restart,
)
],
Expand Down Expand Up @@ -474,6 +476,9 @@ def parse_common_args(parser: argparse.ArgumentParser) -> None:
action="store_true",
help="Whether to initialize expensive parts like SQLsmith, sources, sinks (for fast local testing, reduces coverage)",
)
parser.add_argument(
"--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
)


def main() -> int:
Expand Down Expand Up @@ -525,6 +530,7 @@ def main() -> int:
args.threads,
args.naughty_identifiers,
composition=None, # only works in mzcompose
azurite=args.azurite,
sanity_restart=False, # only works in mzcompose
)
return 0
Expand Down
Loading

0 comments on commit 5b270fc

Please sign in to comment.