Skip to content

Feature: TimestampedStore #841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
afd4abd
Move iter_items to RocksDBStorePartition
gwaramadze Apr 8, 2025
09e3552
Add backwards param to iter_items
gwaramadze Apr 8, 2025
9600bbf
Move setting lower and upper bounds to iter_items method
gwaramadze Apr 9, 2025
040ca81
Define partition_transaction_class
gwaramadze Apr 9, 2025
d9b7c65
Define store_partition_class
gwaramadze Apr 9, 2025
1ebb67b
Rename store variable
gwaramadze Apr 8, 2025
0aa6f63
Implement TimestampedStore
gwaramadze Apr 9, 2025
ed324f6
Fix and rename get to get_last
gwaramadze Apr 10, 2025
76da934
Add docstrings and more tests
gwaramadze Apr 10, 2025
aaa2778
Prefix in TimestampedPartitionTransaction may be of any type
gwaramadze Apr 10, 2025
5f23fd1
Add _ensure_bytes helper method
gwaramadze Apr 11, 2025
1644003
Test PartitionTransactionCache.iter_items
gwaramadze Apr 11, 2025
8b1e664
Move cache fixture to fixtures
gwaramadze Apr 11, 2025
a4bb3f3
Test, correct and document RocksDBStorePartition.iter_items
gwaramadze Apr 15, 2025
424e73e
Ensure timestamped key is properly serialized
gwaramadze Apr 15, 2025
1b5dd53
Add expire method
gwaramadze Apr 16, 2025
5668cd5
Ignore deleted items
gwaramadze Apr 16, 2025
da6fcb4
Replace cache-level iter-items with get_updates_for_prefix
gwaramadze Apr 16, 2025
8883d65
Add a comment on a fixture
gwaramadze Apr 16, 2025
74b4507
Correct test name
gwaramadze Apr 16, 2025
9c21122
Improve test_get_last_prefix_not_bytes
gwaramadze Apr 16, 2025
36d0bfc
Correct iter_items
gwaramadze Apr 17, 2025
487550e
Correct argument order
gwaramadze Apr 17, 2025
0c94073
Tiny docstring correction
gwaramadze Apr 17, 2025
3bcf6ce
Add retention
gwaramadze Apr 17, 2025
d594e67
Add expiration mechanism
gwaramadze Apr 17, 2025
b6a8020
Remove redundant parentheses
gwaramadze Apr 17, 2025
978110e
Rename retention to retention_ms
gwaramadze Apr 23, 2025
211cdf0
wip
gwaramadze Apr 25, 2025
e70c98a
Move cache helpers to separate module
gwaramadze Apr 25, 2025
5fbeda8
refactor
gwaramadze Apr 25, 2025
71c7c74
correct
gwaramadze Apr 25, 2025
39eea9f
Ditch the match/case
gwaramadze Apr 25, 2025
d6d1d41
Revert get_updates_for_prefix - reduce the amount of new code
gwaramadze Apr 28, 2025
7bcfad1
Ensure additional column families in RocksDBStorePartition base class
gwaramadze Apr 28, 2025
e23a801
Set min eligible timestamp instead of the latest timestamp
gwaramadze Apr 28, 2025
ca83001
Refactor
gwaramadze Apr 28, 2025
4526bd9
Do not pass around default cf name
gwaramadze Apr 28, 2025
3b47088
Expire once per transaction
gwaramadze Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion quixstreams/state/base/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class StorePartition(ABC):
the persistent storage).
"""

partition_transaction_class = PartitionTransaction

def __init__(
self,
dumps: DumpsFunc,
Expand Down Expand Up @@ -112,7 +114,7 @@ def begin(self) -> PartitionTransaction:

Using `PartitionTransaction` is a recommended way for accessing the data.
"""
return PartitionTransaction(
return self.partition_transaction_class(
partition=self,
dumps=self._dumps,
loads=self._loads,
Expand Down
17 changes: 13 additions & 4 deletions quixstreams/state/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .memory import MemoryStore
from .recovery import ChangelogProducerFactory, RecoveryManager
from .rocksdb import RocksDBOptionsType, RocksDBStore
from .rocksdb.timestamped import TimestampedStore
from .rocksdb.windowed.store import WindowedRocksDBStore

__all__ = ("StateStoreManager", "DEFAULT_STATE_STORE_NAME", "StoreTypes")
Expand All @@ -24,7 +25,7 @@

DEFAULT_STATE_STORE_NAME = "default"

StoreTypes = Union[Type[RocksDBStore], Type[MemoryStore]]
StoreTypes = Union[Type[RocksDBStore], Type[MemoryStore], Type[TimestampedStore]]
SUPPORTED_STORES = [RocksDBStore, MemoryStore]


Expand Down Expand Up @@ -189,23 +190,31 @@ def register_store(

store_type = store_type or self.default_store_type
if store_type == RocksDBStore:
factory: Store = RocksDBStore(
store: Store = RocksDBStore(
name=store_name,
stream_id=stream_id,
base_dir=str(self._state_dir),
changelog_producer_factory=changelog_producer_factory,
options=self._rocksdb_options,
)
elif store_type == TimestampedStore:
store = TimestampedStore(
name=store_name,
stream_id=stream_id,
base_dir=str(self._state_dir),
changelog_producer_factory=changelog_producer_factory,
options=self._rocksdb_options,
)
elif store_type == MemoryStore:
factory = MemoryStore(
store = MemoryStore(
name=store_name,
stream_id=stream_id,
changelog_producer_factory=changelog_producer_factory,
)
else:
raise ValueError(f"invalid store type: {store_type}")

self._stores.setdefault(stream_id, {})[store_name] = factory
self._stores.setdefault(stream_id, {})[store_name] = store

def register_windowed_store(
self,
Expand Down
16 changes: 16 additions & 0 deletions quixstreams/state/rocksdb/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class TimestampsCache:
key: bytes
cf_name: str
timestamps: dict[bytes, Optional[int]] = field(default_factory=dict)


@dataclass
class CounterCache:
key: bytes
cf_name: str
counter: Optional[int] = None
61 changes: 59 additions & 2 deletions quixstreams/state/rocksdb/partition.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
import time
from typing import Dict, List, Literal, Optional, Union, cast
from typing import Dict, Iterator, List, Literal, Optional, Union, cast

from rocksdict import AccessType, ColumnFamily, Rdict, WriteBatch
from rocksdict import AccessType, ColumnFamily, Rdict, ReadOptions, WriteBatch

from quixstreams.state.base import PartitionTransactionCache, StorePartition
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
Expand Down Expand Up @@ -42,6 +42,8 @@ class RocksDBStorePartition(StorePartition):
:param options: RocksDB options. If `None`, the default options will be used.
"""

additional_column_families: tuple[str, ...] = ()

def __init__(
self,
path: str,
Expand All @@ -60,6 +62,8 @@ def __init__(
self._db = self._init_rocksdb()
self._cf_cache: Dict[str, Rdict] = {}
self._cf_handle_cache: Dict[str, ColumnFamily] = {}
for cf_name in self.additional_column_families:
self._ensure_column_family(cf_name)

def recover_from_changelog_message(
self, key: bytes, value: Optional[bytes], cf_name: str, offset: int
Expand Down Expand Up @@ -139,6 +143,53 @@ def get(
# RDict accept Any type as value but we only write bytes so we should only get bytes back.
return cast(Union[bytes, Literal[Marker.UNDEFINED]], result)

def iter_items(
self,
lower_bound: bytes, # inclusive
upper_bound: bytes, # exclusive
backwards: bool = False,
cf_name: str = "default",
) -> Iterator[tuple[bytes, bytes]]:
"""
Iterate over key-value pairs within a specified range in a column family.

:param lower_bound: The lower bound key (inclusive) for the iteration range.
:param upper_bound: The upper bound key (exclusive) for the iteration range.
:param backwards: If `True`, iterate in reverse order (descending).
Default is `False` (ascending).
:param cf_name: The name of the column family to iterate over.
Default is "default".
:return: An iterator yielding (key, value) tuples.
"""
cf = self.get_column_family(cf_name=cf_name)

# Set iterator bounds to reduce IO by limiting the range of keys fetched
read_opt = ReadOptions()
read_opt.set_iterate_lower_bound(lower_bound)
read_opt.set_iterate_upper_bound(upper_bound)

from_key = upper_bound if backwards else lower_bound

# RDict accepts Any type as value but we only write bytes so we should only get bytes back.
items = cast(
Iterator[tuple[bytes, bytes]],
cf.items(from_key=from_key, read_opt=read_opt, backwards=backwards),
)

if not backwards:
# NOTE: Forward iteration respects bounds correctly.
# Also, we need to use yield from notation to replace RdictItems
# with Python-native generator or else garbage collection
# will make the result unpredictable.
yield from items
else:
# NOTE: When iterating backwards, the `read_opt` lower bound
# is not respected by Rdict for some reason. We need to manually
# filter it here.
for key, value in items:
if lower_bound <= key:
yield key, value

def exists(self, key: bytes, cf_name: str = "default") -> bool:
"""
Check if a key is present in the DB.
Expand Down Expand Up @@ -328,3 +379,9 @@ def _update_changelog_offset(self, batch: WriteBatch, offset: int):
int_to_int64_bytes(offset),
self.get_column_family_handle(METADATA_CF_NAME),
)

def _ensure_column_family(self, cf_name: str) -> None:
try:
self.get_column_family(cf_name)
except ColumnFamilyDoesNotExist:
self.create_column_family(cf_name)
4 changes: 3 additions & 1 deletion quixstreams/state/rocksdb/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class RocksDBStore(Store):
partitions' transactions.
"""

store_partition_class = RocksDBStorePartition

def __init__(
self,
name: str,
Expand Down Expand Up @@ -61,6 +63,6 @@ def create_new_partition(
self._changelog_producer_factory.get_partition_producer(partition)
)

return RocksDBStorePartition(
return self.store_partition_class(
path=path, options=self._options, changelog_producer=changelog_producer
)
Loading