diff --git a/quixstreams/app.py b/quixstreams/app.py index 935033325..6b988889a 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -994,15 +994,13 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]): ) committed_offsets[tp.partition][tp.topic] = tp.offset - # Match the assigned TP with a stream ID via DataFrameRegistry + # Match the assigned TP with a state ID via DataFrameRegistry for tp in non_changelog_tps: - stream_ids = self._dataframe_registry.get_stream_ids( - topic_name=tp.topic - ) - # Assign store partitions for the given stream ids - for stream_id in stream_ids: + state_ids = self._dataframe_registry.get_state_ids(topic_name=tp.topic) + # Assign store partitions for the given state ids + for state_id in state_ids: self._state_manager.on_partition_assign( - stream_id=stream_id, + state_id=state_id, partition=tp.partition, committed_offsets=committed_offsets[tp.partition], ) @@ -1044,12 +1042,10 @@ def _revoke_state_partitions(self, topic_partitions: List[TopicPartition]): ] for tp in non_changelog_tps: if self._state_manager.stores: - stream_ids = self._dataframe_registry.get_stream_ids( - topic_name=tp.topic - ) - for stream_id in stream_ids: + state_ids = self._dataframe_registry.get_state_ids(topic_name=tp.topic) + for state_id in state_ids: self._state_manager.on_partition_revoke( - stream_id=stream_id, partition=tp.partition + state_id=state_id, partition=tp.partition ) def _setup_signal_handlers(self): diff --git a/quixstreams/checkpointing/checkpoint.py b/quixstreams/checkpointing/checkpoint.py index 481c97683..5dab15489 100644 --- a/quixstreams/checkpointing/checkpoint.py +++ b/quixstreams/checkpointing/checkpoint.py @@ -148,28 +148,26 @@ def __init__( self._producer.begin_transaction() def get_store_transaction( - self, stream_id: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME + self, state_id: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME ) -> PartitionTransaction: """ Get a PartitionTransaction for the given store, topic and partition. It will return already started transaction if there's one. - :param stream_id: stream id + :param state_id: state id :param partition: partition number :param store_name: store name :return: instance of `PartitionTransaction` """ - transaction = self._store_transactions.get((stream_id, partition, store_name)) + transaction = self._store_transactions.get((state_id, partition, store_name)) if transaction is not None: return transaction - store = self._state_manager.get_store( - stream_id=stream_id, store_name=store_name - ) + store = self._state_manager.get_store(state_id=state_id, store_name=store_name) transaction = store.start_partition_transaction(partition=partition) - self._store_transactions[(stream_id, partition, store_name)] = transaction + self._store_transactions[(state_id, partition, store_name)] = transaction return transaction def close(self): @@ -227,13 +225,11 @@ def commit(self): # Step 2. Produce the changelogs for ( - stream_id, + state_id, partition, store_name, ), transaction in self._store_transactions.items(): - topics = self._dataframe_registry.get_topics_for_stream_id( - stream_id=stream_id - ) + topics = self._dataframe_registry.get_topics_for_state_id(state_id=state_id) processed_offsets = { topic: offset for (topic, partition_), offset in self._tp_offsets.items() diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index e956506e7..e84cc0f1c 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -135,7 +135,7 @@ def __init__( registry: DataFrameRegistry, processing_context: ProcessingContext, stream: Optional[Stream] = None, - stream_id: Optional[str] = None, + state_id: Optional[str] = None, ): if not topics: raise ValueError("At least one Topic must be passed") @@ -146,15 +146,15 @@ def __init__( ) self._stream: Stream = stream or Stream() - self._stream_id: str = stream_id or topic_manager.stream_id_from_topics( + self._state_id: str = state_id or topic_manager.state_id_from_topics( self.topics ) self._topic_manager = topic_manager self._registry = registry self._processing_context = processing_context self._producer = processing_context.producer - self._registry.register_stream_id( - stream_id=self.stream_id, topic_names=[t.name for t in self._topics] + self._registry.register_state_id( + state_id=self.state_id, topic_names=[t.name for t in self._topics] ) @property @@ -166,20 +166,20 @@ def stream(self) -> Stream: return self._stream @property - def stream_id(self) -> str: + def state_id(self) -> str: """ An identifier of the data stream this StreamingDataFrame manipulates in the application. It is used as a common prefix for state stores and group-by topics. - A new `stream_id` is set when StreamingDataFrames are merged via `.merge()` + A new `state_id` is set when StreamingDataFrames are merged via `.merge()` or grouped via `.group_by()`. - StreamingDataFrames with different `stream_id` cannot access the same state stores. + StreamingDataFrames with different `state_id` cannot access the same state stores. - By default, a topic name or a combination of topic names are used as `stream_id`. + By default, a topic name or a combination of topic names are used as `state_id`. """ - return self._stream_id + return self._state_id @property def topics(self) -> tuple[Topic, ...]: @@ -286,7 +286,7 @@ def func(d: dict, state: State): stateful_func = _as_stateful( func=with_metadata_func, processing_context=self._processing_context, - stream_id=self.stream_id, + state_id=self.state_id, ) stream = self.stream.add_apply(stateful_func, expand=expand, metadata=True) # type: ignore[call-overload] else: @@ -395,7 +395,7 @@ def func(values: list, state: State): stateful_func = _as_stateful( func=with_metadata_func, processing_context=self._processing_context, - stream_id=self.stream_id, + state_id=self.state_id, ) return self._add_update(stateful_func, metadata=True) else: @@ -497,7 +497,7 @@ def func(d: dict, state: State): stateful_func = _as_stateful( func=with_metadata_func, processing_context=self._processing_context, - stream_id=self.stream_id, + state_id=self.state_id, ) stream = self.stream.add_filter(stateful_func, metadata=True) else: @@ -603,7 +603,7 @@ def func(d: dict, state: State): groupby_topic = self._topic_manager.repartition_topic( operation=operation, - stream_id=self.stream_id, + state_id=self.state_id, config=repartition_config, key_serializer=key_serializer, value_serializer=value_serializer, @@ -631,7 +631,7 @@ def _callback(value, _, timestamp, headers): stream = self.stream.add_transform(_callback, expand=False) groupby_sdf = self.__dataframe_clone__( - stream=stream, stream_id=f"{self.stream_id}--groupby--{operation}" + stream=stream, state_id=f"{self.state_id}--groupby--{operation}" ) self._registry.register_groupby( source_sdf=self, new_sdf=groupby_sdf, register_new_root=False @@ -1683,7 +1683,7 @@ def _add_update( def _register_store(self): """ - Register the default store for the current stream_id in StateStoreManager. + Register the default store for the current state_id in StateStoreManager. """ self.ensure_topics_copartitioned() @@ -1691,7 +1691,7 @@ def _register_store(self): changelog_topic_config = self._topic_manager.derive_topic_config(self._topics) self._processing_context.state_manager.register_store( - stream_id=self.stream_id, changelog_config=changelog_topic_config + state_id=self.state_id, changelog_config=changelog_topic_config ) def _groupby_key( @@ -1711,21 +1711,22 @@ def __dataframe_clone__( self, *topics: Topic, stream: Optional[Stream] = None, - stream_id: Optional[str] = None, + state_id: Optional[str] = None, ) -> "StreamingDataFrame": """ Clone the StreamingDataFrame with a new `stream`, `topics`, - and optional `stream_id` parameters. + and optional `state_id` parameters. :param topics: one or more `Topic` objects :param stream: instance of `Stream`, optional. + :param state_id: str, optional. :return: a new `StreamingDataFrame`. """ clone = self.__class__( *(topics or self._topics), stream=stream, - stream_id=stream_id, + state_id=state_id, processing_context=self._processing_context, topic_manager=self._topic_manager, registry=self._registry, @@ -1840,13 +1841,13 @@ def wrapper( def _as_stateful( func: Callable[[Any, Any, int, Any, State], T], processing_context: ProcessingContext, - stream_id: str, + state_id: str, ) -> Callable[[Any, Any, int, Any], T]: @functools.wraps(func) def wrapper(value: Any, key: Any, timestamp: int, headers: Any) -> Any: ctx = message_context() transaction = processing_context.checkpoint.get_store_transaction( - stream_id=stream_id, + state_id=state_id, partition=ctx.partition, ) # Pass a State object with an interface limited to the key updates only diff --git a/quixstreams/dataframe/registry.py b/quixstreams/dataframe/registry.py index 2c4590150..26847c24b 100644 --- a/quixstreams/dataframe/registry.py +++ b/quixstreams/dataframe/registry.py @@ -24,8 +24,8 @@ def __init__(self) -> None: self._registry: dict[str, Stream] = {} self._topics: list[Topic] = [] self._repartition_origins: set[str] = set() - self._topics_to_stream_ids: dict[str, set[str]] = {} - self._stream_ids_to_topics: dict[str, set[str]] = {} + self._topics_to_state_ids: dict[str, set[str]] = {} + self._state_ids_to_topics: dict[str, set[str]] = {} @property def consumer_topics(self) -> list[Topic]: @@ -71,19 +71,19 @@ def register_groupby( :param source_sdf: the SDF used by `sdf.group_by()` :param new_sdf: the SDF generated by `sdf.group_by()`. """ - if source_sdf.stream_id in self._repartition_origins: + if source_sdf.state_id in self._repartition_origins: raise GroupByNestingLimit( "Subsequent (nested) `SDF.group_by()` operations are not allowed." ) - if new_sdf.stream_id in self._repartition_origins: + if new_sdf.state_id in self._repartition_origins: raise GroupByDuplicate( "A `SDF.group_by()` operation appears to be the same as another, " "either from using the same column or name parameter; " "adjust by setting a unique name with `SDF.group_by(name=)` " ) - self._repartition_origins.add(new_sdf.stream_id) + self._repartition_origins.add(new_sdf.state_id) if register_new_root: try: @@ -113,34 +113,34 @@ def compose_all( executors[topic] = root_executors[root_stream] return executors - def register_stream_id(self, stream_id: str, topic_names: list[str]): + def register_state_id(self, state_id: str, topic_names: list[str]): """ - Register a mapping between the stream_id and topic names. + Register a mapping between the state_id and topic names. This mapping is later used to match topics to state stores during assignment and commits. - The same stream id can be registered multiple times. - :param stream_id: stream id of StreamingDataFrame + The same state id can be registered multiple times. + :param state_id: state id of StreamingDataFrame :param topic_names: list of topics to map the stream id with """ for topic_name in topic_names: - self._topics_to_stream_ids.setdefault(topic_name, set()).add(stream_id) - self._stream_ids_to_topics.setdefault(stream_id, set()).add(topic_name) + self._topics_to_state_ids.setdefault(topic_name, set()).add(state_id) + self._state_ids_to_topics.setdefault(state_id, set()).add(topic_name) - def get_stream_ids(self, topic_name: str) -> list[str]: + def get_state_ids(self, topic_name: str) -> list[str]: """ - Get a list of stream ids for the given topic name + Get a list of state ids for the given topic name :param topic_name: a name of the topic - :return: a list of stream ids + :return: a list of state ids """ - return list(self._topics_to_stream_ids[topic_name]) + return list(self._topics_to_state_ids[topic_name]) - def get_topics_for_stream_id(self, stream_id: str) -> list[str]: + def get_topics_for_state_id(self, state_id: str) -> list[str]: """ Get a list of topics for the given stream id. - :param stream_id: stream id + :param state_id: state id :return: a list of topic names """ - return list(self._stream_ids_to_topics[stream_id]) + return list(self._state_ids_to_topics[state_id]) diff --git a/quixstreams/dataframe/windows/base.py b/quixstreams/dataframe/windows/base.py index d3da8ac0a..1df8ccb20 100644 --- a/quixstreams/dataframe/windows/base.py +++ b/quixstreams/dataframe/windows/base.py @@ -74,7 +74,7 @@ def register_store(self) -> None: # Create a config for the changelog topic based on the underlying SDF topics changelog_config = TopicManager.derive_topic_config(self._dataframe.topics) self._dataframe.processing_context.state_manager.register_windowed_store( - stream_id=self._dataframe.stream_id, + state_id=self._dataframe.state_id, store_name=self._name, changelog_config=changelog_config, ) @@ -88,7 +88,7 @@ def _apply_window( windowed_func = _as_windowed( func=func, - stream_id=self._dataframe.stream_id, + state_id=self._dataframe.state_id, processing_context=self._dataframe.processing_context, store_name=name, ) @@ -400,7 +400,7 @@ def _as_windowed( func: TransformRecordCallbackExpandedWindowed, processing_context: "ProcessingContext", store_name: str, - stream_id: str, + state_id: str, ) -> TransformExpandedCallback: @functools.wraps(func) def wrapper( @@ -410,7 +410,7 @@ def wrapper( transaction = cast( WindowedPartitionTransaction, processing_context.checkpoint.get_store_transaction( - stream_id=stream_id, partition=ctx.partition, store_name=store_name + state_id=state_id, partition=ctx.partition, store_name=store_name ), ) if key is None: diff --git a/quixstreams/models/topics/manager.py b/quixstreams/models/topics/manager.py index e72ffa84d..4b4d2b79b 100644 --- a/quixstreams/models/topics/manager.py +++ b/quixstreams/models/topics/manager.py @@ -203,7 +203,7 @@ def register(self, topic: Topic) -> Topic: def repartition_topic( self, operation: str, - stream_id: str, + state_id: str, config: TopicConfig, value_deserializer: Optional[DeserializerType] = "json", key_deserializer: Optional[DeserializerType] = "json", @@ -214,7 +214,7 @@ def repartition_topic( Create an internal repartition topic. :param operation: name of the GroupBy operation (column name or user-defined). - :param stream_id: stream id. + :param state_id: state id. :param config: a config for the repartition topic. :param value_deserializer: a deserializer type for values; default - JSON :param key_deserializer: a deserializer type for keys; default - JSON @@ -224,7 +224,7 @@ def repartition_topic( :return: `Topic` object (which is also stored on the TopicManager) """ topic = Topic( - name=self._internal_name("repartition", stream_id, operation), + name=self._internal_name("repartition", state_id, operation), value_deserializer=value_deserializer, key_deserializer=key_deserializer, value_serializer=value_serializer, @@ -238,12 +238,12 @@ def repartition_topic( def changelog_topic( self, - stream_id: Optional[str], + state_id: Optional[str], store_name: str, config: TopicConfig, ) -> Topic: """ - Create and register a changelog topic for the given "stream_id" and store name. + Create and register a changelog topic for the given "state_id" and store name. If the topic already exists, validate that the partition count is the same as requested. @@ -252,7 +252,7 @@ def changelog_topic( generate changelog topics. To turn off changelogs, init an Application with "use_changelog_topics"=`False`. - :param stream_id: stream id + :param state_id: state id :param store_name: name of the store this changelog belongs to (default, rolling10s, etc.) :param config: the changelog topic configuration @@ -263,7 +263,7 @@ def changelog_topic( config.extra_config.update({"cleanup.policy": "compact"}) topic = Topic( - name=self._internal_name("changelog", stream_id, store_name), + name=self._internal_name("changelog", state_id, store_name), key_serializer="bytes", value_serializer="bytes", key_deserializer="bytes", @@ -279,7 +279,7 @@ def changelog_topic( f'got {topic.broker_config.num_partitions}"' ) - self._changelog_topics.setdefault(stream_id, {})[store_name] = topic + self._changelog_topics.setdefault(state_id, {})[store_name] = topic return topic @classmethod @@ -333,9 +333,9 @@ def derive_topic_config(cls, topics: Iterable[Topic]) -> TopicConfig: }, ) - def stream_id_from_topics(self, topics: Sequence[Topic]) -> str: + def state_id_from_topics(self, topics: Sequence[Topic]) -> str: """ - Generate a stream_id by combining names of the provided topics. + Generate a state_id by combining names of the provided topics. """ if not topics: raise ValueError("At least one Topic must be passed") diff --git a/quixstreams/platforms/quix/topic_manager.py b/quixstreams/platforms/quix/topic_manager.py index 8a164e2cd..e88358277 100644 --- a/quixstreams/platforms/quix/topic_manager.py +++ b/quixstreams/platforms/quix/topic_manager.py @@ -60,9 +60,9 @@ def __init__( ) self._quix_config_builder = quix_config_builder - def stream_id_from_topics(self, topics: Sequence[Topic]) -> str: + def state_id_from_topics(self, topics: Sequence[Topic]) -> str: """ - Generate a stream_id by combining names of the provided topics. + Generate a state_id by combining names of the provided topics. """ if not topics: raise ValueError("At least one Topic must be passed") @@ -71,7 +71,7 @@ def stream_id_from_topics(self, topics: Sequence[Topic]) -> str: # for backwards compatibility return topics[0].name - # Use the "quix_name" to generate stream_id. + # Use the "quix_name" to generate state_id. # In Quix Cloud, the "quix_name" can differ from the actual broker topic name return "--".join(sorted(t.quix_name for t in topics)) diff --git a/quixstreams/sources/base/manager.py b/quixstreams/sources/base/manager.py index 47a0c1a3a..ccda566e1 100644 --- a/quixstreams/sources/base/manager.py +++ b/quixstreams/sources/base/manager.py @@ -137,7 +137,7 @@ def _recover_state(self, source: StatefulSource) -> StorePartition: ) state_manager.register_store( - stream_id=None, + state_id=None, store_name=source.store_name, store_type=MemoryStore, changelog_config=TopicConfig( @@ -156,7 +156,7 @@ def _recover_state(self, source: StatefulSource) -> StorePartition: self._consumer.assign([changelog_tp]) store_partitions = state_manager.on_partition_assign( - stream_id=None, + state_id=None, partition=source.assigned_store_partition, committed_offsets={}, ) diff --git a/quixstreams/state/base/store.py b/quixstreams/state/base/store.py index 1423ea631..79b41e369 100644 --- a/quixstreams/state/base/store.py +++ b/quixstreams/state/base/store.py @@ -23,12 +23,12 @@ class Store(ABC): def __init__( self, name: str, - stream_id: Optional[str], + state_id: Optional[str], ) -> None: super().__init__() self._name = name - self._stream_id = stream_id + self._state_id = state_id self._partitions: Dict[int, StorePartition] = {} @abstractmethod @@ -36,11 +36,11 @@ def create_new_partition(self, partition: int) -> StorePartition: pass @property - def stream_id(self) -> Optional[str]: + def state_id(self) -> Optional[str]: """ Topic name """ - return self._stream_id + return self._state_id @property def name(self) -> str: @@ -68,7 +68,7 @@ def assign_partition(self, partition: int) -> StorePartition: if store_partition is not None: logger.debug( f'Partition "{partition}" for store "{self._name}" ' - f'(stream "{self._stream_id}") ' + f'(state "{self._state_id}") ' f"is already assigned" ) return store_partition @@ -80,7 +80,7 @@ def assign_partition(self, partition: int) -> StorePartition: 'Assigned store partition "%s[%s]" (stream "%s")', self._name, partition, - self._stream_id, + self._state_id, ) return store_partition @@ -99,7 +99,7 @@ def revoke_partition(self, partition: int): 'Revoked store partition "%s[%s]" (stream "%s")', self._name, partition, - self._stream_id, + self._state_id, ) def start_partition_transaction(self, partition: int) -> PartitionTransaction: @@ -115,7 +115,7 @@ def start_partition_transaction(self, partition: int) -> PartitionTransaction: # Requested partition has not been assigned. Something went completely wrong raise PartitionNotAssignedError( f'Store partition "{self._name}[{partition}]" ' - f'(stream "{self._stream_id}") is not assigned' + f'(state "{self._state_id}") is not assigned' ) return store_partition.begin() @@ -124,10 +124,10 @@ def close(self): """ Close store and revoke all store partitions """ - logger.debug(f'Closing store "{self.name}" (stream "{self.stream_id}")') + logger.debug(f'Closing store "{self.name}" (state "{self.state_id}")') for partition in list(self._partitions.keys()): self.revoke_partition(partition) - logger.debug(f'Closed store "{self.name}" (stream "{self.stream_id}")') + logger.debug(f'Closed store "{self.name}" (state "{self.state_id}")') def __enter__(self): return self diff --git a/quixstreams/state/manager.py b/quixstreams/state/manager.py index 63ad61848..d1e0b0195 100644 --- a/quixstreams/state/manager.py +++ b/quixstreams/state/manager.py @@ -80,7 +80,7 @@ def _init_state_dir(self) -> None: def stores(self) -> Dict[Optional[str], Dict[str, Store]]: """ Map of registered state stores - :return: dict in format {stream_id: {store_name: store}} + :return: dict in format {state_id: {store_name: store}} """ return self._stores @@ -125,25 +125,25 @@ def stop_recovery(self) -> None: return self._recovery_manager.stop_recovery() def get_store( - self, stream_id: str, store_name: str = DEFAULT_STATE_STORE_NAME + self, state_id: str, store_name: str = DEFAULT_STATE_STORE_NAME ) -> Store: """ - Get a store for given name and stream id + Get a store for given name and state id - :param stream_id: stream id + :param state_id: state id :param store_name: store name :return: instance of `Store` """ - store = self._stores.get(stream_id, {}).get(store_name) + store = self._stores.get(state_id, {}).get(store_name) if store is None: raise StoreNotRegisteredError( - f'Store "{store_name}" (stream_id "{stream_id}") is not registered' + f'Store "{store_name}" (state_id "{state_id}") is not registered' ) return store def _setup_changelogs( self, - stream_id: Optional[str], + state_id: Optional[str], store_name: str, topic_config: Optional[TopicConfig], ) -> Optional[ChangelogProducerFactory]: @@ -155,7 +155,7 @@ def _setup_changelogs( return None changelog_topic = self._recovery_manager.register_changelog( - stream_id=stream_id, store_name=store_name, topic_config=topic_config + state_id=state_id, store_name=store_name, topic_config=topic_config ) return ChangelogProducerFactory( changelog_name=changelog_topic.name, @@ -164,7 +164,7 @@ def _setup_changelogs( def register_store( self, - stream_id: Optional[str], + state_id: Optional[str], store_name: str = DEFAULT_STATE_STORE_NAME, store_type: Optional[StoreTypes] = None, changelog_config: Optional[TopicConfig] = None, @@ -175,23 +175,23 @@ def register_store( During processing, the StateStoreManager will react to rebalancing callbacks and assign/revoke the partitions for registered stores. - :param stream_id: stream id + :param state_id: state id :param store_name: store name :param store_type: the storage type used for this store. Default to StateStoreManager `default_store_type` :param changelog_config: changelog topic config. Note: the compaction will be enabled for the changelog topic. """ - if self._stores.get(stream_id, {}).get(store_name) is None: + if self._stores.get(state_id, {}).get(store_name) is None: changelog_producer_factory = self._setup_changelogs( - stream_id, store_name, topic_config=changelog_config + state_id, store_name, topic_config=changelog_config ) store_type = store_type or self.default_store_type if store_type == RocksDBStore: factory: Store = RocksDBStore( name=store_name, - stream_id=stream_id, + state_id=state_id, base_dir=str(self._state_dir), changelog_producer_factory=changelog_producer_factory, options=self._rocksdb_options, @@ -199,17 +199,17 @@ def register_store( elif store_type == MemoryStore: factory = MemoryStore( name=store_name, - stream_id=stream_id, + state_id=state_id, changelog_producer_factory=changelog_producer_factory, ) else: raise ValueError(f"invalid store type: {store_type}") - self._stores.setdefault(stream_id, {})[store_name] = factory + self._stores.setdefault(state_id, {})[store_name] = factory def register_windowed_store( self, - stream_id: str, + state_id: str, store_name: str, changelog_config: Optional[TopicConfig] = None, ) -> None: @@ -219,26 +219,26 @@ def register_windowed_store( During processing, the StateStoreManager will react to rebalancing callbacks and assign/revoke the partitions for registered stores. - Each window store can be registered only once for each stream_id. + Each window store can be registered only once for each state_id. - :param stream_id: stream id + :param state_id: stream id :param store_name: store name :param changelog_config: changelog topic config """ - store = self._stores.get(stream_id, {}).get(store_name) + store = self._stores.get(state_id, {}).get(store_name) if store: raise WindowedStoreAlreadyRegisteredError( "This window range and type combination already exists; " "to use this window, provide a unique name via the `name` parameter." ) - self._stores.setdefault(stream_id, {})[store_name] = WindowedRocksDBStore( + self._stores.setdefault(state_id, {})[store_name] = WindowedRocksDBStore( name=store_name, - stream_id=stream_id, + state_id=state_id, base_dir=str(self._state_dir), changelog_producer_factory=self._setup_changelogs( - stream_id=stream_id, + state_id=state_id, store_name=store_name, topic_config=changelog_config, ), @@ -264,27 +264,27 @@ def clear_stores(self) -> None: def on_partition_assign( self, - stream_id: Optional[str], + state_id: Optional[str], partition: int, committed_offsets: dict[str, int], ) -> Dict[str, StorePartition]: """ - Assign store partitions for each registered store for the given stream_id + Assign store partitions for each registered store for the given state_id and partition number, and return a list of assigned `StorePartition` objects. - :param stream_id: stream id + :param state_id: state id :param partition: Kafka topic partition number :param committed_offsets: a dict with latest committed offsets of all assigned topics for this partition number. :return: list of assigned `StorePartition` """ store_partitions = {} - for name, store in self._stores.get(stream_id, {}).items(): + for name, store in self._stores.get(state_id, {}).items(): store_partition = store.assign_partition(partition) store_partitions[name] = store_partition if self._recovery_manager and store_partitions: self._recovery_manager.assign_partition( - topic=stream_id, + topic=state_id, partition=partition, committed_offsets=committed_offsets, store_partitions=store_partitions, @@ -293,17 +293,17 @@ def on_partition_assign( def on_partition_revoke( self, - stream_id: str, + state_id: str, partition: int, ) -> None: """ Revoke store partitions for each registered store - for the given stream_id and partition number. + for the given state_id and partition number. - :param stream_id: stream id + :param state_id: state id :param partition: partition number """ - if stores := self._stores.get(stream_id, {}).values(): + if stores := self._stores.get(state_id, {}).values(): if self._recovery_manager: self._recovery_manager.revoke_partition(partition_num=partition) for store in stores: diff --git a/quixstreams/state/memory/store.py b/quixstreams/state/memory/store.py index a2ba9e374..8a0817428 100644 --- a/quixstreams/state/memory/store.py +++ b/quixstreams/state/memory/store.py @@ -24,16 +24,16 @@ class MemoryStore(Store): def __init__( self, name: str, - stream_id: Optional[str], + state_id: Optional[str], changelog_producer_factory: Optional[ChangelogProducerFactory] = None, ) -> None: """ :param name: a unique store name - :param stream_id: a topic name for this store + :param state_id: a state ID for this store :param changelog_producer_factory: a ChangelogProducerFactory instance if using changelogs topics. """ - super().__init__(name, stream_id) + super().__init__(name, state_id) self._changelog_producer_factory = changelog_producer_factory diff --git a/quixstreams/state/recovery.py b/quixstreams/state/recovery.py index 44b26491e..34ec815fc 100644 --- a/quixstreams/state/recovery.py +++ b/quixstreams/state/recovery.py @@ -344,19 +344,19 @@ def recovering(self) -> bool: def register_changelog( self, - stream_id: Optional[str], + state_id: Optional[str], store_name: str, topic_config: TopicConfig, ) -> Topic: """ Register a changelog Topic with the TopicManager. - :param stream_id: stream id + :param state_id: state id :param store_name: name of the store :param topic_config: a TopicConfig to use """ return self._topic_manager.changelog_topic( - stream_id=stream_id, + state_id=state_id, store_name=store_name, config=topic_config, ) diff --git a/quixstreams/state/rocksdb/store.py b/quixstreams/state/rocksdb/store.py index b3ec4c158..a03d20d56 100644 --- a/quixstreams/state/rocksdb/store.py +++ b/quixstreams/state/rocksdb/store.py @@ -26,24 +26,24 @@ class RocksDBStore(Store): def __init__( self, name: str, - stream_id: Optional[str], + state_id: Optional[str], base_dir: str, changelog_producer_factory: Optional[ChangelogProducerFactory] = None, options: Optional[RocksDBOptionsType] = None, ): """ :param name: a unique store name - :param stream_id: a topic name for this store + :param state_id: a state ID for this store :param base_dir: path to a directory with the state :param changelog_producer_factory: a ChangelogProducerFactory instance if using changelogs :param options: RocksDB options. If `None`, the default options will be used. """ - super().__init__(name, stream_id) + super().__init__(name, state_id) partitions_dir = Path(base_dir).absolute() / self._name - if self._stream_id: - partitions_dir = partitions_dir / self._stream_id + if self._state_id: + partitions_dir = partitions_dir / self._state_id self._partitions_dir = partitions_dir self._changelog_producer_factory = changelog_producer_factory diff --git a/quixstreams/state/rocksdb/windowed/store.py b/quixstreams/state/rocksdb/windowed/store.py index 293ff434f..fda4319e7 100644 --- a/quixstreams/state/rocksdb/windowed/store.py +++ b/quixstreams/state/rocksdb/windowed/store.py @@ -17,14 +17,14 @@ class WindowedRocksDBStore(RocksDBStore): def __init__( self, name: str, - stream_id: str, + state_id: str, base_dir: str, changelog_producer_factory: Optional[ChangelogProducerFactory] = None, options: Optional[RocksDBOptionsType] = None, ): """ :param name: a unique store name - :param stream_id: a topic name for this store + :param state_id: a state ID for this store :param base_dir: path to a directory with the state :param changelog_producer_factory: a ChangelogProducerFactory instance if using changelogs @@ -32,7 +32,7 @@ def __init__( """ super().__init__( name=name, - stream_id=stream_id, + state_id=state_id, base_dir=base_dir, changelog_producer_factory=changelog_producer_factory, options=options, diff --git a/tests/test_quixstreams/test_app.py b/tests/test_quixstreams/test_app.py index 74d80ba7e..7bd41e33b 100644 --- a/tests/test_quixstreams/test_app.py +++ b/tests/test_quixstreams/test_app.py @@ -1098,14 +1098,14 @@ class TestApplicationWithState: def _validate_state( self, stores, - stream_id, + state_id, partition_index, state_manager_factory, consumer_group, state_dir, validator, ): - store = stores[stream_id] + store = stores[state_id] partition = store.partitions[partition_index] with partition.begin() as tx: validator(tx) @@ -1117,13 +1117,13 @@ def _validate_state( state_manager = state_manager_factory( group_id=consumer_group, state_dir=state_dir ) - state_manager.register_store(stream_id, "default") + state_manager.register_store(state_id, "default") state_manager.on_partition_assign( - stream_id=stream_id, + state_id=state_id, partition=partition_index, - committed_offsets={stream_id: -1001}, + committed_offsets={state_id: -1001}, ) - store = state_manager.get_store(stream_id=stream_id, store_name="default") + store = state_manager.get_store(state_id=state_id, store_name="default") with store.start_partition_transaction(partition=partition_index) as tx: validator(tx) @@ -1178,7 +1178,7 @@ def count(_, state: State): stores = {} def revoke_partition(store, partition): - stores[store.stream_id] = store + stores[store.state_id] = store # Stop app when the future is resolved executor.submit(_stop_app_on_future, app, total_consumed, 15.0) @@ -1192,7 +1192,7 @@ def validate_state(tx): self._validate_state( stores, - sdf.stream_id, + sdf.state_id, partition_num, state_manager_factory, consumer_group, @@ -1249,13 +1249,13 @@ def count_and_fail(_, state: State): state_manager = state_manager_factory( group_id=consumer_group, state_dir=state_dir ) - state_manager.register_store(sdf.stream_id, "default") + state_manager.register_store(sdf.state_id, "default") state_manager.on_partition_assign( - stream_id=sdf.stream_id, + state_id=sdf.state_id, partition=0, committed_offsets={}, ) - store = state_manager.get_store(stream_id=sdf.stream_id, store_name="default") + store = state_manager.get_store(state_id=sdf.state_id, store_name="default") with store.start_partition_transaction(partition=0) as tx: assert tx.get("total", prefix=key) is None @@ -1313,7 +1313,7 @@ def fail(_): stores = {} def revoke_partition(store, partition): - stores[store.stream_id] = store + stores[store.state_id] = store # Stop app when the future is resolved executor.submit(_stop_app_on_future, app, total_consumed, 10.0) @@ -1328,7 +1328,7 @@ def validate_state(tx): self._validate_state( stores, - sdf.stream_id, + sdf.state_id, partition_num, state_manager_factory, consumer_group, @@ -1368,12 +1368,12 @@ def test_clear_state( with state_manager: state_manager.register_store(topic_in_name, "default") state_manager.on_partition_assign( - stream_id=topic_in_name, + state_id=topic_in_name, partition=0, committed_offsets={topic_in_name: -1001}, ) store = state_manager.get_store( - stream_id=topic_in_name, store_name="default" + state_id=topic_in_name, store_name="default" ) with store.start_partition_transaction(partition=0) as tx: # All keys in state must be prefixed with the message key @@ -1386,12 +1386,12 @@ def test_clear_state( with state_manager: state_manager.register_store(topic_in_name, "default") state_manager.on_partition_assign( - stream_id=topic_in_name, + state_id=topic_in_name, partition=0, committed_offsets={topic_in_name: -1001}, ) store = state_manager.get_store( - stream_id=topic_in_name, store_name="default" + state_id=topic_in_name, store_name="default" ) with store.start_partition_transaction(partition=0) as tx: assert tx.get("my_state", prefix=prefix) is None @@ -1462,7 +1462,7 @@ def _validate_transaction_state(tx, partition, count): def validate_state(stores): for p_num, count in partition_msg_count.items(): - store = stores[sdf.stream_id] + store = stores[sdf.state_id] partition = store.partitions[p_num] assert partition.get_changelog_offset() == count - 1 with partition.begin() as tx: @@ -1474,15 +1474,15 @@ def validate_state(stores): group_id=consumer_group, state_dir=state_dir, ) as state_manager: - state_manager.register_store(sdf.stream_id, store_name) + state_manager.register_store(sdf.state_id, store_name) for p_num, count in partition_msg_count.items(): state_manager.on_partition_assign( - stream_id=sdf.stream_id, + state_id=sdf.state_id, partition=p_num, committed_offsets={topic.name: -1001}, ) store = state_manager.get_store( - stream_id=sdf.stream_id, store_name=store_name + state_id=sdf.state_id, store_name=store_name ) partition = store.partitions[p_num] assert partition.get_changelog_offset() == count - 1 @@ -1507,7 +1507,7 @@ def validate_state(stores): stores = {} def revoke_partition(store, partition): - stores[store.stream_id] = store + stores[store.state_id] = store # run app to populate state with data done = Future() @@ -1630,15 +1630,15 @@ def validate_state(): with state_manager_factory( group_id=consumer_group, state_dir=state_dir ) as state_manager: - state_manager.register_windowed_store(sdf.stream_id, actual_store_name) + state_manager.register_windowed_store(sdf.state_id, actual_store_name) for p_num, windows in expected_window_updates.items(): state_manager.on_partition_assign( - stream_id=sdf.stream_id, + state_id=sdf.state_id, partition=p_num, committed_offsets={topic.name: -1001}, ) store = state_manager.get_store( - stream_id=sdf.stream_id, + state_id=sdf.state_id, store_name=actual_store_name, ) @@ -1793,7 +1793,7 @@ def _validate_transaction_state(tx): assert state.get("latest") == value def validate_state(stores): - store = stores[sdf.stream_id] + store = stores[sdf.state_id] partition = store.partitions[0] with partition.begin() as tx: _validate_transaction_state(tx) @@ -1811,9 +1811,9 @@ def validate_state(stores): committed_offset = consumer.committed( [TopicPartition(topic=topic_name, partition=0)] )[0].offset - state_manager.register_store(sdf.stream_id, store_name) + state_manager.register_store(sdf.state_id, store_name) partition = state_manager.on_partition_assign( - stream_id=sdf.stream_id, + state_id=sdf.state_id, partition=0, committed_offsets={topic_name: committed_offset}, )["default"] @@ -1830,7 +1830,7 @@ def validate_state(stores): stores = {} def revoke_partition(store, partition): - stores[store.stream_id] = store + stores[store.state_id] = store # Run the application to apply changes to state done = Future() @@ -2501,7 +2501,7 @@ def on_message_processed(*_): def _validate_state( self, - stream_ids, + state_ids, stores, partition_num, message_key, @@ -2510,8 +2510,8 @@ def _validate_state( consumer_group, state_dir, ): - for stream_id in stream_ids: - store = stores[stream_id] + for state_id in state_ids: + store = stores[state_id] partition = store.partitions[partition_num] with partition.begin() as tx: assert tx.get("total", prefix=message_key) == messages_per_topic @@ -2523,15 +2523,13 @@ def _validate_state( state_manager = state_manager_factory( group_id=consumer_group, state_dir=state_dir ) - state_manager.register_store(stream_id, "default") + state_manager.register_store(state_id, "default") state_manager.on_partition_assign( - stream_id=stream_id, + state_id=state_id, partition=partition_num, committed_offsets={}, ) - store = state_manager.get_store( - stream_id=stream_id, store_name="default" - ) + store = state_manager.get_store(state_id=state_id, store_name="default") with store.start_partition_transaction(partition=partition_num) as tx: # All keys in state must be prefixed with the message key assert tx.get("total", prefix=message_key) == messages_per_topic @@ -2609,7 +2607,7 @@ def count(_, state: State): stores = {} def revoke_partition(store, partition): - stores[store.stream_id] = store + stores[store.state_id] = store done = Future() # Stop app when the future is resolved @@ -2620,7 +2618,7 @@ def revoke_partition(store, partition): assert processed_count == total_messages self._validate_state( - [sdf_a.stream_id, sdf_b.stream_id], + [sdf_a.state_id, sdf_b.state_id], stores, partition_num, message_key, @@ -2722,7 +2720,7 @@ def get_app(): stores = {} def revoke_partition(store, partition): - stores[store.stream_id] = store + stores[store.state_id] = store with patch("quixstreams.state.base.Store.revoke_partition", revoke_partition): app.run() @@ -2730,7 +2728,7 @@ def revoke_partition(store, partition): assert processed_count == total_messages self._validate_state( - [sdf.stream_id for sdf in sdfs], + [sdf.state_id for sdf in sdfs], stores, partition_num, message_key, @@ -2812,14 +2810,14 @@ def count(_, state: State): stores = {} def revoke_partition(store_, partition): - stores[store_.stream_id] = store_ + stores[store_.state_id] = store_ with patch("quixstreams.state.base.Store.revoke_partition", revoke_partition): app.run() assert processed_count == total_messages - store = stores[sdf_concat.stream_id] + store = stores[sdf_concat.state_id] partition = store.partitions[partition_num] with partition.begin() as tx: assert tx.get("total", prefix=message_key) == total_messages diff --git a/tests/test_quixstreams/test_checkpointing.py b/tests/test_quixstreams/test_checkpointing.py index 8c8f6d7be..12e911ad7 100644 --- a/tests/test_quixstreams/test_checkpointing.py +++ b/tests/test_quixstreams/test_checkpointing.py @@ -148,7 +148,7 @@ def test_commit_with_state_no_changelog_success( topic_name, _ = topic_factory() dataframe_registry = DataFrameRegistry() - dataframe_registry.register_stream_id(topic_name, [topic_name]) + dataframe_registry.register_state_id(topic_name, [topic_name]) state_manager = state_manager_factory(producer=rowproducer_mock) checkpoint = checkpoint_factory( consumer_=consumer, @@ -200,7 +200,7 @@ def test_commit_with_state_with_changelog_success( producer=row_producer, recovery_manager=recovery_manager ) dataframe_registry = DataFrameRegistry() - dataframe_registry.register_stream_id(topic_name, [topic_name]) + dataframe_registry.register_state_id(topic_name, [topic_name]) checkpoint = checkpoint_factory( consumer_=consumer, @@ -253,7 +253,7 @@ def test_commit_with_state_and_changelog_no_updates_success( producer=row_producer, recovery_manager=recovery_manager ) dataframe_registry = DataFrameRegistry() - dataframe_registry.register_stream_id(topic_name, [topic_name]) + dataframe_registry.register_state_id(topic_name, [topic_name]) checkpoint = checkpoint_factory( consumer_=consumer, @@ -322,7 +322,7 @@ def test_commit_has_failed_transactions_fails( state_manager = state_manager_factory(producer=rowproducer_mock) dataframe_registry = DataFrameRegistry() topic_name = "topic" - dataframe_registry.register_stream_id(topic_name, [topic_name]) + dataframe_registry.register_state_id(topic_name, [topic_name]) checkpoint = checkpoint_factory( consumer_=consumer_mock, state_manager_=state_manager, @@ -377,7 +377,7 @@ def test_commit_producer_flush_fails( state_manager = state_manager_factory(producer=rowproducer_mock) topic_name = "topic" dataframe_registry = DataFrameRegistry() - dataframe_registry.register_stream_id(topic_name, [topic_name]) + dataframe_registry.register_state_id(topic_name, [topic_name]) checkpoint = checkpoint_factory( consumer_=consumer_mock, state_manager_=state_manager, @@ -415,7 +415,7 @@ def test_commit_consumer_commit_fails( state_manager = state_manager_factory(producer=rowproducer_mock) topic_name = "topic" dataframe_registry = DataFrameRegistry() - dataframe_registry.register_stream_id(topic_name, [topic_name]) + dataframe_registry.register_state_id(topic_name, [topic_name]) checkpoint = checkpoint_factory( consumer_=consumer_mock, diff --git a/tests/test_quixstreams/test_dataframe/test_dataframe.py b/tests/test_quixstreams/test_dataframe/test_dataframe.py index f845fd7af..b9f887a74 100644 --- a/tests/test_quixstreams/test_dataframe/test_dataframe.py +++ b/tests/test_quixstreams/test_dataframe/test_dataframe.py @@ -780,7 +780,7 @@ def stateful_func(value_: dict, state: State) -> int: sdf = sdf.apply(stateful_func, stateful=True) state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) values = [ {"number": 1}, @@ -819,7 +819,7 @@ def stateful_func(value_: dict, state: State): sdf = sdf.update(stateful_func, stateful=True) state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) result = None values = [ @@ -859,7 +859,7 @@ def stateful_func(value_: dict, state: State): sdf = sdf.filter(lambda v, state: state.get("max") >= 3, stateful=True) state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) values = [ {"number": 1}, @@ -900,7 +900,7 @@ def stateful_func(value_: dict, state: State): sdf = sdf[sdf.apply(lambda v, state: state.get("max") >= 3, stateful=True)] state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) values = [ {"number": 1}, @@ -971,7 +971,7 @@ def test_tumbling_window_current( ) state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ @@ -1047,7 +1047,7 @@ def on_late( ) state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ # Create window [0, 10) @@ -1100,7 +1100,7 @@ def test_tumbling_window_final( sdf = sdf.tumbling_window(duration_ms=10, grace_ms=0).sum().final() state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ # Create window [0, 10) @@ -1162,7 +1162,7 @@ def test_tumbling_window_none_key_messages( sdf = sdf.tumbling_window(duration_ms=10).sum().current() state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ # Create window [0,10) @@ -1212,7 +1212,7 @@ def test_tumbling_window_two_windows( ) state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ @@ -1325,7 +1325,7 @@ def test_hopping_window_current( sdf = sdf.hopping_window(duration_ms=10, step_ms=5).sum().current() state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ # Create window [0,10) @@ -1376,7 +1376,7 @@ def test_hopping_window_current_out_of_order_late( sdf = sdf.hopping_window(duration_ms=10, step_ms=5).sum().current() state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ # Create window [0,10) @@ -1420,7 +1420,7 @@ def test_hopping_window_final( sdf = sdf.hopping_window(duration_ms=10, step_ms=5).sum().final() state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ @@ -1487,7 +1487,7 @@ def test_hopping_window_none_key_messages( sdf = sdf.hopping_window(duration_ms=10, step_ms=5).sum().current() state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ # Create window [0,10) @@ -1534,7 +1534,7 @@ def test_sliding_window_current( ) state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ @@ -1607,7 +1607,7 @@ def on_late( ) state_manager.on_partition_assign( - stream_id=topic.name, partition=0, committed_offsets={topic.name: -1001} + state_id=topic.name, partition=0, committed_offsets={topic.name: -1001} ) records = [ # Create window [0, 1] @@ -2557,7 +2557,7 @@ def test_concat_same_topic_success(self, topic_manager_factory, dataframe_factor # Branching is not exclusive, and it duplicates data in this case. # Check that we receive the results from both branches sdf_concatenated = sdf_branch1.concat(sdf_branch2) - assert sdf_concatenated.stream_id == sdf.stream_id + assert sdf_concatenated.state_id == sdf.state_id assert sdf_concatenated.test(value=1, key=b"key1", timestamp=1) == [ (2, b"key1", 1, None), (3, b"key1", 1, None), @@ -2585,7 +2585,7 @@ def accumulate(value: dict, state: State): sdf_concatenated = sdf1.concat(sdf2).apply(accumulate, stateful=True) state_manager.on_partition_assign( - stream_id=sdf_concatenated.stream_id, + state_id=sdf_concatenated.state_id, partition=0, committed_offsets={}, ) diff --git a/tests/test_quixstreams/test_dataframe/test_registry.py b/tests/test_quixstreams/test_dataframe/test_registry.py index 058236b22..56856762c 100644 --- a/tests/test_quixstreams/test_dataframe/test_registry.py +++ b/tests/test_quixstreams/test_dataframe/test_registry.py @@ -15,12 +15,12 @@ def test_register_root_multi_topic_sdf_fails(self): ): registry.register_root(sdf_mock) - def test_register_stream_id_success(self): + def test_register_state_id_success(self): registry = DataFrameRegistry() - registry.register_stream_id(stream_id="id", topic_names=["topic1", "topic2"]) + registry.register_state_id(state_id="id", topic_names=["topic1", "topic2"]) - assert sorted(registry.get_topics_for_stream_id("id")) == sorted( + assert sorted(registry.get_topics_for_state_id("id")) == sorted( ["topic1", "topic2"] ) - assert registry.get_stream_ids("topic1") == ["id"] - assert registry.get_stream_ids("topic2") == ["id"] + assert registry.get_state_ids("topic1") == ["id"] + assert registry.get_state_ids("topic2") == ["id"] diff --git a/tests/test_quixstreams/test_dataframe/test_windows/test_hopping.py b/tests/test_quixstreams/test_dataframe/test_windows/test_hopping.py index 6a0b1fd5f..a830a310d 100644 --- a/tests/test_quixstreams/test_dataframe/test_windows/test_hopping.py +++ b/tests/test_quixstreams/test_dataframe/test_windows/test_hopping.py @@ -77,7 +77,7 @@ def test_multiaggregation( window.final() assert window.name == "hopping_window_10_5" - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) key = b"key" with store.start_partition_transaction(0) as tx: @@ -264,7 +264,7 @@ def test_hoppingwindow_count( assert window.name == "hopping_window_10_5_count" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -291,7 +291,7 @@ def test_hoppingwindow_sum( assert window.name == "hopping_window_10_5_sum" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -318,7 +318,7 @@ def test_hoppingwindow_mean( assert window.name == "hopping_window_10_5_mean" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -348,7 +348,7 @@ def test_hoppingwindow_reduce( assert window.name == "hopping_window_10_5_reduce" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -374,7 +374,7 @@ def test_hoppingwindow_max( assert window.name == "hopping_window_10_5_max" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -400,7 +400,7 @@ def test_hoppingwindow_min( assert window.name == "hopping_window_10_5_min" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -426,7 +426,7 @@ def test_hoppingwindow_collect( assert window.name == "hopping_window_10_5_collect" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -478,7 +478,7 @@ def test_hopping_window_process_window_expired( ) window = window_def.sum() window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) key = b"key" with store.start_partition_transaction(0) as tx: @@ -519,7 +519,7 @@ def test_hopping_partition_expiration( ) window = window_def.sum() window.final(closing_strategy="partition") - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key1 = b"key1" @@ -575,7 +575,7 @@ def test_hopping_key_expiration_to_partition( ) window = window_def.sum() window.final(closing_strategy="key") - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key1 = b"key1" @@ -613,7 +613,7 @@ def test_hopping_partition_expiration_to_key( ) window = window_def.sum() window.final(closing_strategy="partition") - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key1 = b"key1" @@ -701,7 +701,7 @@ def test_multiaggregation( window.final() assert window.name == "hopping_count_window" - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) key = b"key" with store.start_partition_transaction(0) as tx: @@ -893,7 +893,7 @@ def test_count(self, count_hopping_window_definition_factory, state_manager): assert window.name == "hopping_count_window_count" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: updated, expired = process( @@ -950,7 +950,7 @@ def test_sum(self, count_hopping_window_definition_factory, state_manager): assert window.name == "hopping_count_window_sum" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: updated, expired = process( @@ -1007,7 +1007,7 @@ def test_mean(self, count_hopping_window_definition_factory, state_manager): assert window.name == "hopping_count_window_mean" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: updated, expired = process( @@ -1068,7 +1068,7 @@ def test_reduce(self, count_hopping_window_definition_factory, state_manager): assert window.name == "hopping_count_window_reduce" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: updated, expired = process( @@ -1125,7 +1125,7 @@ def test_max(self, count_hopping_window_definition_factory, state_manager): assert window.name == "hopping_count_window_max" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: updated, expired = process( @@ -1182,7 +1182,7 @@ def test_min(self, count_hopping_window_definition_factory, state_manager): assert window.name == "hopping_count_window_min" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: updated, expired = process( @@ -1239,7 +1239,7 @@ def test_collect(self, count_hopping_window_definition_factory, state_manager): assert window.name == "hopping_count_window_collect" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: updated, expired = process( @@ -1287,7 +1287,7 @@ def test_unaligned_steps( window_def = count_hopping_window_definition_factory(count=5, step=2) window = window_def.collect() window.register_store() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: updated, expired = process( @@ -1364,7 +1364,7 @@ def test_multiple_keys_sum( window_def = count_hopping_window_definition_factory(count=3, step=1) window = window_def.sum() window.register_store() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: @@ -1457,7 +1457,7 @@ def test_multiple_keys_collect( window_def = count_hopping_window_definition_factory(count=3, step=1) window = window_def.collect() window.register_store() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: diff --git a/tests/test_quixstreams/test_dataframe/test_windows/test_sliding.py b/tests/test_quixstreams/test_dataframe/test_windows/test_sliding.py index fc5ab8eba..d955e7f2f 100644 --- a/tests/test_quixstreams/test_dataframe/test_windows/test_sliding.py +++ b/tests/test_quixstreams/test_dataframe/test_windows/test_sliding.py @@ -785,7 +785,7 @@ def transaction_factory(state_manager): def factory(window): nonlocal store if store is None: - store = state_manager.get_store(stream_id="topic", store_name=window.name) + store = state_manager.get_store(state_id="topic", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: yield tx diff --git a/tests/test_quixstreams/test_dataframe/test_windows/test_tumbling.py b/tests/test_quixstreams/test_dataframe/test_windows/test_tumbling.py index 98d9f56c1..c808daad7 100644 --- a/tests/test_quixstreams/test_dataframe/test_windows/test_tumbling.py +++ b/tests/test_quixstreams/test_dataframe/test_windows/test_tumbling.py @@ -73,7 +73,7 @@ def test_multiaggregation( window.final(closing_strategy="key") assert window.name == "tumbling_window_10" - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) key = b"key" with store.start_partition_transaction(0) as tx: @@ -241,7 +241,7 @@ def test_tumblingwindow_count( assert window.name == "tumbling_window_10_count" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -262,7 +262,7 @@ def test_tumblingwindow_sum( assert window.name == "tumbling_window_10_sum" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -283,7 +283,7 @@ def test_tumblingwindow_mean( assert window.name == "tumbling_window_10_mean" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -307,7 +307,7 @@ def test_tumblingwindow_reduce( assert window.name == "tumbling_window_10_reduce" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -328,7 +328,7 @@ def test_tumblingwindow_max( assert window.name == "tumbling_window_10_max" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -349,7 +349,7 @@ def test_tumblingwindow_min( assert window.name == "tumbling_window_10_min" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -370,7 +370,7 @@ def test_tumblingwindow_collect( assert window.name == "tumbling_window_10_collect" window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -411,7 +411,7 @@ def test_tumbling_window_process_window_expired( window_def = tumbling_window_definition_factory(duration_ms=10, grace_ms=0) window = window_def.sum() window.final(closing_strategy=expiration) - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" @@ -446,7 +446,7 @@ def test_tumbling_partition_expiration( window_def = tumbling_window_definition_factory(duration_ms=10, grace_ms=2) window = window_def.sum() window.final(closing_strategy="partition") - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key1 = b"key1" @@ -498,7 +498,7 @@ def test_tumbling_key_expiration_to_partition( window_def = tumbling_window_definition_factory(duration_ms=10, grace_ms=0) window = window_def.sum() window.final(closing_strategy="key") - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key1 = b"key1" @@ -533,7 +533,7 @@ def test_tumbling_partition_expiration_to_key( window_def = tumbling_window_definition_factory(duration_ms=10, grace_ms=0) window = window_def.sum() window.final(closing_strategy="partition") - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: key1 = b"key1" @@ -609,7 +609,7 @@ def test_multiaggregation( window.final() assert window.name == "tumbling_count_window" - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) key = b"key" with store.start_partition_transaction(0) as tx: @@ -774,7 +774,7 @@ def test_count(self, count_tumbling_window_definition_factory, state_manager): assert window.name == "tumbling_count_window_count" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: process(window, key="", value=0, transaction=tx, timestamp_ms=100) @@ -791,7 +791,7 @@ def test_sum(self, count_tumbling_window_definition_factory, state_manager): assert window.name == "tumbling_count_window_sum" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: process(window, key="", value=2, transaction=tx, timestamp_ms=100) @@ -808,7 +808,7 @@ def test_mean(self, count_tumbling_window_definition_factory, state_manager): assert window.name == "tumbling_count_window_mean" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: process(window, key="", value=2, transaction=tx, timestamp_ms=100) @@ -828,7 +828,7 @@ def test_reduce(self, count_tumbling_window_definition_factory, state_manager): assert window.name == "tumbling_count_window_reduce" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: process(window, key="", value=2, transaction=tx, timestamp_ms=100) @@ -845,7 +845,7 @@ def test_max(self, count_tumbling_window_definition_factory, state_manager): assert window.name == "tumbling_count_window_max" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: process(window, key="", value=2, transaction=tx, timestamp_ms=100) @@ -862,7 +862,7 @@ def test_min(self, count_tumbling_window_definition_factory, state_manager): assert window.name == "tumbling_count_window_min" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: process(window, key="", value=2, transaction=tx, timestamp_ms=100) @@ -879,7 +879,7 @@ def test_collect(self, count_tumbling_window_definition_factory, state_manager): assert window.name == "tumbling_count_window_collect" window.final() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: process(window, key="", value=1, transaction=tx, timestamp_ms=100) @@ -904,7 +904,7 @@ def test_window_expired( window_def = count_tumbling_window_definition_factory(count=2) window = window_def.sum() window.register_store() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: # Add first item to the window @@ -938,7 +938,7 @@ def test_multiple_keys_sum( window_def = count_tumbling_window_definition_factory(count=3) window = window_def.sum() window.register_store() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: @@ -1005,7 +1005,7 @@ def test_multiple_keys_collect( window_def = count_tumbling_window_definition_factory(count=3) window = window_def.collect() window.register_store() - store = state_manager.get_store(stream_id="test", store_name=window.name) + store = state_manager.get_store(state_id="test", store_name=window.name) store.assign_partition(0) with store.start_partition_transaction(0) as tx: diff --git a/tests/test_quixstreams/test_models/test_topics/test_manager.py b/tests/test_quixstreams/test_models/test_topics/test_manager.py index f6e435773..c1c8f3632 100644 --- a/tests/test_quixstreams/test_models/test_topics/test_manager.py +++ b/tests/test_quixstreams/test_models/test_topics/test_manager.py @@ -166,16 +166,16 @@ def test_changelog_topic(self, topic_manager_factory): group = "my_consumer_group" topic_manager = topic_manager_factory(consumer_group=group) store_name = "default" - stream_id = str(uuid.uuid4()) + state_id = str(uuid.uuid4()) changelog = topic_manager.changelog_topic( - stream_id=stream_id, + state_id=state_id, store_name=store_name, config=TopicConfig(num_partitions=1, replication_factor=1), ) - assert topic_manager.changelog_topics[stream_id][store_name] == changelog + assert topic_manager.changelog_topics[state_id][store_name] == changelog - assert changelog.name == f"changelog__{group}--{stream_id}--{store_name}" + assert changelog.name == f"changelog__{group}--{state_id}--{store_name}" for attr in [ "_key_serializer", @@ -198,10 +198,10 @@ def test_changelog_topic_partition_count_mismatch( """ topic_manager = topic_manager_factory() - stream_id = str(uuid.uuid4()) + state_id = str(uuid.uuid4()) # Create a new changelog topic with 1 partition topic_manager.changelog_topic( - stream_id=stream_id, + state_id=state_id, store_name="store", config=TopicConfig(num_partitions=1, replication_factor=1), ) @@ -211,7 +211,7 @@ def test_changelog_topic_partition_count_mismatch( # and has only 1 partition with pytest.raises(TopicConfigurationMismatch, match="partition count"): topic_manager.changelog_topic( - stream_id=stream_id, + state_id=state_id, store_name="store", config=TopicConfig(num_partitions=2, replication_factor=1), ) @@ -227,7 +227,7 @@ def test_changelog_name_len_exceeded(self, topic_manager_factory): topic_manager = topic_manager_factory() with pytest.raises(TopicNameLengthExceeded): topic_manager.changelog_topic( - stream_id=str(uuid.uuid4()), + state_id=str(uuid.uuid4()), store_name="store" * 100, config=TopicConfig(num_partitions=1, replication_factor=1), ) @@ -241,10 +241,10 @@ def test_repartition_topic(self, topic_manager_factory): topic_manager = topic_manager_factory(consumer_group=group) operation = "my_op" - stream_id = str(uuid.uuid4()) + state_id = str(uuid.uuid4()) repartition = topic_manager.repartition_topic( operation=operation, - stream_id=stream_id, + state_id=state_id, key_serializer="bytes", value_serializer="bytes", config=TopicConfig( @@ -255,7 +255,7 @@ def test_repartition_topic(self, topic_manager_factory): ) assert topic_manager.repartition_topics[repartition.name] == repartition - assert repartition.name == f"repartition__{group}--{stream_id}--{operation}" + assert repartition.name == f"repartition__{group}--{state_id}--{operation}" assert repartition.broker_config.num_partitions == 1 assert repartition.broker_config.replication_factor == 1 assert repartition.broker_config.extra_config["retention.ms"] == "1000" @@ -270,11 +270,11 @@ def test_changelog_nested_internal_topic_naming(self, topic_manager_factory): group = "my_consumer_group" topic_manager = topic_manager_factory(consumer_group=group) - stream_id = str(uuid.uuid4()) + state_id = str(uuid.uuid4()) operation = "my_op" repartition_topic = topic_manager.repartition_topic( operation=operation, - stream_id=stream_id, + state_id=state_id, key_serializer="bytes", value_serializer="bytes", config=TopicConfig( @@ -283,7 +283,7 @@ def test_changelog_nested_internal_topic_naming(self, topic_manager_factory): ), ) changelog = topic_manager.changelog_topic( - stream_id=repartition_topic.name, + state_id=repartition_topic.name, store_name=store_name, config=TopicConfig( num_partitions=1, @@ -293,7 +293,7 @@ def test_changelog_nested_internal_topic_naming(self, topic_manager_factory): assert ( changelog.name - == f"changelog__{group}--repartition.{stream_id}.{operation}--{store_name}" + == f"changelog__{group}--repartition.{state_id}.{operation}--{store_name}" ) def test_non_changelog_topics(self, topic_manager_factory): @@ -307,14 +307,14 @@ def test_non_changelog_topics(self, topic_manager_factory): operation = "my_op" repartition_topic = topic_manager.repartition_topic( operation=operation, - stream_id=data_topic.name, + state_id=data_topic.name, key_serializer="bytes", value_serializer="bytes", config=TopicConfig(num_partitions=1, replication_factor=1), ) changelog_topic = topic_manager.changelog_topic( - stream_id=data_topic.name, + state_id=data_topic.name, store_name="default", config=TopicConfig(num_partitions=1, replication_factor=1), ) @@ -323,24 +323,24 @@ def test_non_changelog_topics(self, topic_manager_factory): assert repartition_topic.name in topic_manager.non_changelog_topics assert changelog_topic.name not in topic_manager.non_changelog_topics - def test_stream_id_from_topics_success(self, topic_manager_factory): + def test_state_id_from_topics_success(self, topic_manager_factory): topic_manager = topic_manager_factory() topic1 = topic_manager.topic("test1") topic2 = topic_manager.topic("test2") - stream_id = topic_manager.stream_id_from_topics([topic1, topic2]) + state_id = topic_manager.state_id_from_topics([topic1, topic2]) - assert stream_id == "test1--test2" + assert state_id == "test1--test2" - def test_stream_id_from_topics_sorted(self, topic_manager_factory): + def test_state_id_from_topics_sorted(self, topic_manager_factory): topic_manager = topic_manager_factory() topic1 = topic_manager.topic("test1") topic2 = topic_manager.topic("test2") - assert topic_manager.stream_id_from_topics( + assert topic_manager.state_id_from_topics( [topic1, topic2] - ) == topic_manager.stream_id_from_topics([topic2, topic1]) + ) == topic_manager.state_id_from_topics([topic2, topic1]) - def test_stream_id_from_topics_no_topics_fails(self, topic_manager_factory): + def test_state_id_from_topics_no_topics_fails(self, topic_manager_factory): topic_manager = topic_manager_factory() with pytest.raises(ValueError): - topic_manager.stream_id_from_topics([]) + topic_manager.state_id_from_topics([]) diff --git a/tests/test_quixstreams/test_platforms/test_quix/test_topic_manager.py b/tests/test_quixstreams/test_platforms/test_quix/test_topic_manager.py index 8600e5a3c..0ff1fc62d 100644 --- a/tests/test_quixstreams/test_platforms/test_quix/test_topic_manager.py +++ b/tests/test_quixstreams/test_platforms/test_quix/test_topic_manager.py @@ -40,7 +40,7 @@ def test_quix_internal_topic(self, quix_topic_manager_factory): assert topic.name == expected_topic_id changelog = topic_manager.changelog_topic( - stream_id=topic.name, store_name=store_name, config=topic.broker_config + state_id=topic.name, store_name=store_name, config=topic.broker_config ) assert changelog.name == expected_changelog_id assert topic_manager.changelog_topics[topic.name][store_name] == changelog @@ -89,13 +89,13 @@ def test_quix_changelog_nested_internal_topic_naming( assert topic.name == topic_id repartition = topic_manager.repartition_topic( - operation=operation, stream_id=topic.name, config=topic.broker_config + operation=operation, state_id=topic.name, config=topic.broker_config ) assert repartition.name == repartition_id assert topic_manager.repartition_topics[repartition.name] == repartition changelog = topic_manager.changelog_topic( - stream_id=repartition.name, + state_id=repartition.name, store_name=store_name, config=repartition.broker_config, ) @@ -119,39 +119,39 @@ def test_quix_changelog_nested_internal_topic_naming( == repartition.broker_config.replication_factor ) - def test_stream_id_from_topics_multiple_topics_success( + def test_state_id_from_topics_multiple_topics_success( self, quix_topic_manager_factory ): topic_manager = quix_topic_manager_factory(workspace_id="workspace_id") topic1 = topic_manager.topic("test1") topic2 = topic_manager.topic("test2") - stream_id = topic_manager.stream_id_from_topics([topic1, topic2]) + state_id = topic_manager.state_id_from_topics([topic1, topic2]) - assert stream_id == "test1--test2" + assert state_id == "test1--test2" - def test_stream_id_from_topics_single_topic_prefixed_with_workspace( + def test_state_id_from_topics_single_topic_prefixed_with_workspace( self, quix_topic_manager_factory ): """ - Test that stream_id is prefixed with workspace_id if the single topic is passed + Test that state_id is prefixed with workspace_id if the single topic is passed for the backwards compatibility. """ topic_manager = quix_topic_manager_factory(workspace_id="workspace_id") topic1 = topic_manager.topic("test1") - stream_id = topic_manager.stream_id_from_topics([topic1]) + state_id = topic_manager.state_id_from_topics([topic1]) - assert stream_id == "workspace_id-test1" + assert state_id == "workspace_id-test1" - def test_stream_id_from_topics_no_topics_fails(self, quix_topic_manager_factory): + def test_state_id_from_topics_no_topics_fails(self, quix_topic_manager_factory): topic_manager = quix_topic_manager_factory() with pytest.raises(ValueError): - topic_manager.stream_id_from_topics([]) + topic_manager.state_id_from_topics([]) - def test_stream_id_from_topics_sorted(self, quix_topic_manager_factory): + def test_state_id_from_topics_sorted(self, quix_topic_manager_factory): topic_manager = quix_topic_manager_factory() topic1 = topic_manager.topic("test1") topic2 = topic_manager.topic("test2") - assert topic_manager.stream_id_from_topics( + assert topic_manager.state_id_from_topics( [topic1, topic2] - ) == topic_manager.stream_id_from_topics([topic2, topic1]) + ) == topic_manager.state_id_from_topics([topic2, topic1]) diff --git a/tests/test_quixstreams/test_processing/test_pausing.py b/tests/test_quixstreams/test_processing/test_pausing.py index 1f7dd3e26..56360a11b 100644 --- a/tests/test_quixstreams/test_processing/test_pausing.py +++ b/tests/test_quixstreams/test_processing/test_pausing.py @@ -57,7 +57,7 @@ def test_resume_if_ready(self, topic_manager_factory): ) # Create a changelog topic topic_manager.changelog_topic( - stream_id=topic.name, store_name="default", config=topic.broker_config + state_id=topic.name, store_name="default", config=topic.broker_config ) offset_to_seek = 999 diff --git a/tests/test_quixstreams/test_state/fixtures.py b/tests/test_quixstreams/test_state/fixtures.py index 9009ba42b..d2400fae0 100644 --- a/tests/test_quixstreams/test_state/fixtures.py +++ b/tests/test_quixstreams/test_state/fixtures.py @@ -78,7 +78,7 @@ def factory( changelog_producer_factory: Optional[ChangelogProducerFactory] = None, ): return MemoryStore( - stream_id=topic or str(uuid.uuid4()), + state_id=topic or str(uuid.uuid4()), name=name, changelog_producer_factory=changelog_producer_factory, ) @@ -94,7 +94,7 @@ def factory( ) -> RocksDBStore: topic = topic or str(uuid.uuid4()) return RocksDBStore( - stream_id=topic, + state_id=topic, name=name, base_dir=str(tmp_path), changelog_producer_factory=changelog_producer_factory, diff --git a/tests/test_quixstreams/test_state/test_manager.py b/tests/test_quixstreams/test_state/test_manager.py index c0b920378..53291365f 100644 --- a/tests/test_quixstreams/test_state/test_manager.py +++ b/tests/test_quixstreams/test_state/test_manager.py @@ -48,9 +48,9 @@ def test_init_state_dir_exists_not_a_dir_fails( def test_rebalance_partitions_stores_not_registered(self, state_manager): # It's ok to rebalance partitions when there are no stores registered state_manager.on_partition_assign( - stream_id="topic", partition=0, committed_offsets={"topic": -1001} + state_id="topic", partition=0, committed_offsets={"topic": -1001} ) - state_manager.on_partition_revoke(stream_id="topic", partition=0) + state_manager.on_partition_revoke(state_id="topic", partition=0) def test_register_store(self, state_manager): state_manager = state_manager @@ -75,7 +75,7 @@ def test_assign_revoke_partitions_stores_registered(self, state_manager): for tp in partitions: store_partitions.extend( state_manager.on_partition_assign( - stream_id=tp.topic, + state_id=tp.topic, partition=tp.partition, committed_offsets=committed_offsets, ) @@ -87,9 +87,7 @@ def test_assign_revoke_partitions_stores_registered(self, state_manager): assert len(state_manager.get_store("topic2", "store1").partitions) == 1 for tp in partitions: - state_manager.on_partition_revoke( - stream_id=tp.topic, partition=tp.partition - ) + state_manager.on_partition_revoke(state_id=tp.topic, partition=tp.partition) assert not state_manager.get_store("topic1", "store1").partitions assert not state_manager.get_store("topic1", "store2").partitions @@ -100,9 +98,9 @@ def test_register_store_twice(self, state_manager): state_manager.register_store("topic", "store") def test_register_windowed_store_twice(self, state_manager): - state_manager.register_windowed_store("stream_id", "store") + state_manager.register_windowed_store("state_id", "store") with pytest.raises(WindowedStoreAlreadyRegisteredError): - state_manager.register_windowed_store("stream_id", "store") + state_manager.register_windowed_store("state_id", "store") def test_get_store_not_registered(self, state_manager): with pytest.raises(StoreNotRegisteredError): @@ -134,7 +132,7 @@ def test_clear_stores(self, state_manager): # Assign partitions for tp in partitions: state_manager.on_partition_assign( - stream_id=tp.topic, + state_id=tp.topic, partition=tp.partition, committed_offsets={"topic1": -1001, "topic2": -1001}, ) @@ -149,9 +147,7 @@ def test_clear_stores(self, state_manager): # Revoke partitions for tp in partitions: - state_manager.on_partition_revoke( - stream_id=tp.topic, partition=tp.partition - ) + state_manager.on_partition_revoke(state_id=tp.topic, partition=tp.partition) # Act - Delete stores state_manager.clear_stores() @@ -166,7 +162,7 @@ def test_clear_stores_fails(self, state_manager): # Assign the partition state_manager.on_partition_assign( - stream_id="topic1", partition=0, committed_offsets={"topic1": -1001} + state_id="topic1", partition=0, committed_offsets={"topic1": -1001} ) # Act - Delete stores @@ -198,9 +194,9 @@ def test_rebalance_partitions_stores_not_registered( ) # It's ok to rebalance partitions when there are no stores registered state_manager.on_partition_assign( - stream_id="topic", partition=0, committed_offsets={"topic": -1001} + state_id="topic", partition=0, committed_offsets={"topic": -1001} ) - state_manager.on_partition_revoke(stream_id="topic", partition=0) + state_manager.on_partition_revoke(state_id="topic", partition=0) def test_register_store( self, @@ -266,7 +262,7 @@ def test_assign_revoke_partitions_stores_registered( # Assign a topic partition state_manager.on_partition_assign( - stream_id=topic_name, + state_id=topic_name, partition=partition, committed_offsets={"topic1": -1001}, ) @@ -275,7 +271,7 @@ def test_assign_revoke_partitions_stores_registered( assert recovery_manager.partitions # Revoke a topic partition - state_manager.on_partition_revoke(stream_id=topic_name, partition=partition) + state_manager.on_partition_revoke(state_id=topic_name, partition=partition) # Check that RecoveryManager has a partition revoked too assert not recovery_manager.partitions diff --git a/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py b/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py index 70a3ecead..0ee81a190 100644 --- a/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py +++ b/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py @@ -25,13 +25,13 @@ def test_register_changelog(self, recovery_manager_factory): config = TopicConfig(num_partitions=1, replication_factor=1) with patch.object(TopicManager, "changelog_topic") as make_changelog: recovery_manager.register_changelog( - stream_id=topic, + state_id=topic, store_name=store_name, topic_config=config, ) make_changelog.assert_called_with( - stream_id=topic, store_name=store_name, config=config + state_id=topic, store_name=store_name, config=config ) def test_assign_partition_invalid_offset( @@ -53,7 +53,7 @@ def test_assign_partition_invalid_offset( # Register a source topic and a changelog topic with one partition topic_manager = topic_manager_factory() topic_manager.changelog_topic( - stream_id=topic_name, + state_id=topic_name, store_name=store_name, config=topic_manager.topic_config(), ) @@ -108,7 +108,7 @@ def test_single_changelog_message_recovery( # make topics topic_manager = topic_manager_factory() changelog_topic = topic_manager.changelog_topic( - stream_id=topic_name, + state_id=topic_name, store_name=store_name, config=topic_manager.topic_config(), ) @@ -160,7 +160,7 @@ def test_assign_partitions_during_recovery( topic_manager = topic_manager_factory() changelog_topic = topic_manager.changelog_topic( - stream_id=topic_name, + state_id=topic_name, store_name=store_name, config=TopicConfig(num_partitions=2, replication_factor=1), ) @@ -237,7 +237,7 @@ def test_assign_partition_changelog_tp_is_missing( # Register a source topic and a changelog topic with 2 partitions topic_manager = topic_manager_factory() changelog_topic = topic_manager.changelog_topic( - stream_id=topic_name, + state_id=topic_name, store_name=store_name, config=TopicConfig(num_partitions=2, replication_factor=1), ) @@ -278,7 +278,7 @@ def test_revoke_partition(self, recovery_manager_factory, topic_manager_factory) # Register a source topic and a changelog topic with two partitions topic_manager = topic_manager_factory() changelog_topic = topic_manager.changelog_topic( - stream_id=topic_name, + state_id=topic_name, store_name=store_name, config=TopicConfig(num_partitions=2, replication_factor=1), ) @@ -389,11 +389,11 @@ def test_assign_partition( # Create Store and assign a StorePartition (which also sets up changelog topics) store_partitions = {} state_manager.register_store( - stream_id=topic_name, + state_id=topic_name, store_name=store_name, changelog_config=TopicConfig(num_partitions=1, replication_factor=1), ) - store = state_manager.get_store(stream_id=topic_name, store_name=store_name) + store = state_manager.get_store(state_id=topic_name, store_name=store_name) # Mock the Consumer assignment with changelog topic-partition changelog_topic = topic_manager.changelog_topics[topic_name][store_name] @@ -448,7 +448,7 @@ def test_do_recovery( topic_manager = topic_manager_factory() data_topic = topic_manager.topic(topic_name) changelog_topic = topic_manager.changelog_topic( - stream_id=topic_name, store_name=store_name, config=data_topic.broker_config + state_id=topic_name, store_name=store_name, config=data_topic.broker_config ) data_tp = TopicPartition(topic=data_topic.name, partition=0) diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/fixtures.py b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/fixtures.py index da03ced34..b48347136 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/fixtures.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/fixtures.py @@ -19,7 +19,7 @@ def factory( ) -> WindowedRocksDBStore: topic = topic or str(uuid.uuid4()) return WindowedRocksDBStore( - stream_id=topic, + state_id=topic, name=name, base_dir=str(tmp_path), ) @@ -59,7 +59,7 @@ def factory( ) -> WindowedRocksDBStore: topic = topic or str(uuid.uuid4()) return WindowedRocksDBStore( - stream_id=topic, + state_id=topic, name=name, base_dir=str(tmp_path), changelog_producer_factory=ChangelogProducerFactory(