Skip to content

Commit 60e2482

Browse files
committed
[JOIN] PoC
1 parent adbb4aa commit 60e2482

File tree

1 file changed

+22
-0
lines changed

1 file changed

+22
-0
lines changed

quixstreams/dataframe/dataframe.py

+22
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from quixstreams.sinks import BaseSink
5050
from quixstreams.state.base import State
5151
from quixstreams.state.base.transaction import PartitionTransaction
52+
from quixstreams.state.rocksdb.timestamped import TimestampedStore
5253
from quixstreams.utils.printing import (
5354
DEFAULT_COLUMN_NAME,
5455
DEFAULT_LIVE,
@@ -1604,6 +1605,27 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16041605
*self.topics, *other.topics, stream=merged_stream
16051606
)
16061607

1608+
def join(self, right: "StreamingDataFrame") -> "StreamingDataFrame":
1609+
# TODO: ensure copartitioning of left and right?
1610+
right.processing_context.state_manager.register_store(
1611+
stream_id=right.stream_id,
1612+
store_type=TimestampedStore,
1613+
changelog_config=self._topic_manager.derive_topic_config(right.topics),
1614+
)
1615+
1616+
def left_func(value, key, timestamp, headers):
1617+
right_tx = _get_transaction(right)
1618+
right_value = right_tx.get(timestamp=timestamp, prefix=key)
1619+
return {**value, **(right_value or {})}
1620+
1621+
def right_func(value, key, timestamp, headers):
1622+
right_tx = _get_transaction(right)
1623+
right_tx.set(timestamp=timestamp, value=value, prefix=key)
1624+
1625+
left = self.apply(left_func, metadata=True)
1626+
right = right.update(right_func, metadata=True).filter(lambda value: False)
1627+
return left.concat(right)
1628+
16071629
def ensure_topics_copartitioned(self):
16081630
partitions_counts = set(t.broker_config.num_partitions for t in self._topics)
16091631
if len(partitions_counts) > 1:

0 commit comments

Comments
 (0)