Skip to content

Commit e283a39

Browse files
committed
feat: add sqla broker
1 parent fb3a9b9 commit e283a39

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

faststream/sqla/message.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,21 @@ def __init__(
4646

4747
super().__init__(raw_message=self, body=payload)
4848

49-
def mark_completed(self) -> None:
49+
def _mark_completed(self) -> None:
5050
self.state = SqlaMessageState.COMPLETED
5151
self.next_attempt_at = None
5252
self.to_archive = True
5353

54-
def mark_retryable(self, *, next_attempt_at: datetime) -> None:
54+
def _mark_retryable(self, *, next_attempt_at: datetime) -> None:
5555
self.state = SqlaMessageState.RETRYABLE
5656
self.next_attempt_at = next_attempt_at
5757

58-
def mark_failed(self) -> None:
58+
def _mark_failed(self) -> None:
5959
self.state = SqlaMessageState.FAILED
6060
self.next_attempt_at = None
6161
self.to_archive = True
6262

63-
def mark_pending(self) -> None:
63+
def _mark_pending(self) -> None:
6464
self.state = SqlaMessageState.PENDING
6565
self.acquired_at = None
6666

@@ -72,15 +72,15 @@ def allow_attempt(self) -> bool:
7272
first_attempt_at=self.first_attempt_at,
7373
attempts_count=self.attempts_count,
7474
):
75-
self.mark_failed()
75+
self._mark_failed()
7676
return False
7777
return True
7878

7979
async def ack(self) -> None:
8080
if self.decision_recorded:
8181
return
8282

83-
self.mark_completed()
83+
self._mark_completed()
8484
self.decision_recorded = True
8585

8686
async def nack(self) -> None:
@@ -94,14 +94,14 @@ async def nack(self) -> None:
9494
attempts_count=self.attempts_count,
9595
)
9696
):
97-
self.mark_failed()
97+
self._mark_failed()
9898
else:
99-
self.mark_retryable(next_attempt_at=next_attempt_at)
99+
self._mark_retryable(next_attempt_at=next_attempt_at)
100100
self.decision_recorded = True
101101

102102
async def reject(self) -> None:
103103
if self.decision_recorded:
104104
return
105105

106-
self.mark_failed()
106+
self._mark_failed()
107107
self.decision_recorded = True

faststream/sqla/subscriber/usecase.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async def stop(self) -> None:
6767
drained = self._drain_acquired()
6868
if drained:
6969
for message in drained:
70-
message.mark_pending()
70+
message._mark_pending()
7171
self._buffer_results(drained)
7272
self.add_task(self._flush_results)
7373
with suppress(asyncio.TimeoutError):

0 commit comments

Comments
 (0)