Skip to content

Commit 8dbfab1

Browse files
committed
pubsub: Implement optional durable event delivery
Major feature: removing "fire-and-forget" event delivery mode with Redis to allow durable event delivery over MongoDB. Unified storage: Events stored once in eventhistory (was duplicated) Auto-migration: Detects old 24h TTL format, drops and recreates Backwards compatible: Without subscriber_id = fire-and-forget (old behavior) Durable mode: With subscriber_id = catch-up on missed events Implicit ACK: Polling for next event acknowledges previous Signed-off-by: Denys Fedoryshchenko <[email protected]>
1 parent 0f6f84e commit 8dbfab1

File tree

5 files changed

+1150
-14
lines changed

5 files changed

+1150
-14
lines changed

api/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class PubSubSettings(BaseSettings):
2525
redis_host: str = "redis"
2626
redis_db_number: int = 1
2727
keep_alive_period: int = 45
28+
# MongoDB durable pub/sub settings
29+
event_ttl_days: int = 7 # Auto-delete events after N days
30+
max_catchup_events: int = 1000 # Max events to deliver on reconnect
31+
subscriber_state_ttl_days: int = 30 # Cleanup unused subscriber states
2832

2933

3034
# pylint: disable=too-few-public-methods

api/main.py

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
)
5252
from .auth import Authentication
5353
from .db import Database
54-
from .pubsub import PubSub
54+
from .pubsub_mongo import PubSub
5555
from .user_manager import get_user_manager, create_user_manager
5656
from .models import (
5757
PageModel,
@@ -418,11 +418,8 @@ async def update_password(request: Request,
418418
)
419419

420420

421-
def _get_eventhistory(evdict):
422-
"""Get EventHistory object from dictionary"""
423-
evhist = EventHistory()
424-
evhist.data = evdict
425-
return evhist
421+
# EventHistory is now stored by pubsub.publish_cloudevent()
422+
# No need for separate _get_eventhistory function
426423

427424

428425
# TBD: Restrict response by Pydantic model
@@ -681,9 +678,8 @@ async def post_node(node: Node,
681678
attributes = {}
682679
if data.get('owner', None):
683680
attributes['owner'] = data['owner']
681+
# publish_cloudevent now stores to eventhistory collection
684682
await pubsub.publish_cloudevent('node', data, attributes)
685-
evhist = _get_eventhistory(data)
686-
await db.create(evhist)
687683
return obj
688684

689685

@@ -751,9 +747,8 @@ async def put_node(node_id: str, node: Node,
751747
if data.get('owner', None):
752748
attributes['owner'] = data['owner']
753749
if not noevent:
750+
# publish_cloudevent now stores to eventhistory collection
754751
await pubsub.publish_cloudevent('node', data, attributes)
755-
evhist = _get_eventhistory(data)
756-
await db.create(evhist)
757752
return obj
758753

759754

@@ -842,9 +837,8 @@ async def put_nodes(
842837
attributes = {}
843838
if data.get('owner', None):
844839
attributes['owner'] = data['owner']
840+
# publish_cloudevent now stores to eventhistory collection
845841
await pubsub.publish_cloudevent('node', data, attributes)
846-
evhist = _get_eventhistory(data)
847-
await db.create(evhist)
848842
return obj_list
849843

850844

@@ -894,12 +888,31 @@ async def delete_kv(namespace: str, key: str,
894888

895889
@app.post('/subscribe/{channel}', response_model=Subscription)
896890
async def subscribe(channel: str, user: User = Depends(get_current_user),
897-
promisc: Optional[bool] = Query(None)):
898-
"""Subscribe handler for Pub/Sub channel"""
891+
promisc: Optional[bool] = Query(None),
892+
subscriber_id: Optional[str] = Query(
893+
None,
894+
description="Unique subscriber ID for durable delivery. "
895+
"If provided, missed events will be delivered "
896+
"on reconnection. Without this, events are "
897+
"fire-and-forget."
898+
)):
899+
"""Subscribe handler for Pub/Sub channel
900+
901+
Args:
902+
channel: Channel name to subscribe to
903+
promisc: If true, receive all messages regardless of owner
904+
subscriber_id: Optional unique ID for durable event delivery.
905+
When provided, the subscriber's position is tracked and
906+
missed events are delivered on reconnection. Use a stable
907+
identifier like "scheduler-prod-1" or "dashboard-main".
908+
Without subscriber_id, standard fire-and-forget pub/sub.
909+
"""
899910
metrics.add('http_requests_total', 1)
900911
options = {}
901912
if promisc:
902913
options['promiscuous'] = promisc
914+
if subscriber_id:
915+
options['subscriber_id'] = subscriber_id
903916
return await pubsub.subscribe(channel, user.username, options)
904917

905918

api/models.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,43 @@ class SubscriptionStats(Subscription):
6161
)
6262

6363

64+
# MongoDB-based durable Pub/Sub models
65+
# Note: Event storage uses EventHistory model from kernelci-core
66+
# (stored in 'eventhistory' collection with sequence_id, channel, owner fields)
67+
68+
class SubscriberState(BaseModel):
69+
"""Tracks subscriber position for durable event delivery
70+
71+
Only created when subscriber_id is provided during subscription.
72+
Enables catch-up on missed events after reconnection.
73+
"""
74+
subscriber_id: str = Field(
75+
description='Unique subscriber identifier (client-provided)'
76+
)
77+
channel: str = Field(
78+
description='Subscribed channel name'
79+
)
80+
user: str = Field(
81+
description='Username of subscriber (for ownership validation)'
82+
)
83+
promiscuous: bool = Field(
84+
default=False,
85+
description='If true, receive all messages regardless of owner'
86+
)
87+
last_event_id: int = Field(
88+
default=0,
89+
description='Last acknowledged event ID (implicit ACK on next poll)'
90+
)
91+
created_at: datetime = Field(
92+
default_factory=datetime.utcnow,
93+
description='Subscription creation timestamp'
94+
)
95+
last_poll: Optional[datetime] = Field(
96+
default=None,
97+
description='Last poll timestamp (used for stale cleanup)'
98+
)
99+
100+
64101
# User model definitions
65102

66103
class UserGroup(DatabaseModel):

0 commit comments

Comments
 (0)