From c8afd5f8955ba64c8e7f39d810c223968f827f3f Mon Sep 17 00:00:00 2001 From: Meshan Khosla Date: Thu, 11 Jan 2024 22:20:26 -0800 Subject: [PATCH 1/4] String cursor --- upstash_qstash/asyncio/client.py | 16 +++++++++------- upstash_qstash/asyncio/dlq.py | 14 +++++++++++++- upstash_qstash/asyncio/events.py | 6 +++--- upstash_qstash/client.py | 17 ++++++++++------- upstash_qstash/dlq.py | 14 +++++++++++++- upstash_qstash/events.py | 8 ++++---- 6 files changed, 52 insertions(+), 23 deletions(-) diff --git a/upstash_qstash/asyncio/client.py b/upstash_qstash/asyncio/client.py index 05b927b..98b86c5 100644 --- a/upstash_qstash/asyncio/client.py +++ b/upstash_qstash/asyncio/client.py @@ -89,19 +89,21 @@ async def events(self, req: Optional[EventsRequest] = None) -> GetEventsResponse The logs endpoint is paginated and returns only 100 logs at a time. If you want to receive more logs, you can use the cursor to paginate. - The cursor is a unix timestamp with millisecond precision + The cursor is a stringified unix timestamp with millisecond precision :param req: An instance of EventsRequest containing the cursor :return: The events response object. Example: -------- - Initialize the cursor to the current timestamp in milliseconds: - >>> cursor = int(time.time() * 1000) - >>> logs = [] - >>> while cursor > 0: + >>> all_events = [] + >>> cursor = None + >>> while True: >>> res = await client.events({"cursor": cursor}) - >>> logs.extend(res['events']) - >>> cursor = res.get('cursor', 0) + >>> print(len(res["events"])) + >>> all_events.extend(res["events"]) + >>> cursor = res.get("cursor") + >>> if cursor is None: + >>> break """ return await Events.get(self.http, req) diff --git a/upstash_qstash/asyncio/dlq.py b/upstash_qstash/asyncio/dlq.py index 13e682c..f0cd635 100644 --- a/upstash_qstash/asyncio/dlq.py +++ b/upstash_qstash/asyncio/dlq.py @@ -12,7 +12,19 @@ async def list_messages( self, opts: Optional[ListMessagesOpts] = None ) -> ListMessageResponse: """ - Asynchronously list messages in the dlq + Asynchronously list messages in the dlq. + + Example: + -------- + >>> dlq = client.dlq() + >>> all_events = [] + >>> cursor = None + >>> while True: + >>> res = await dlq.list_messages({"cursor": cursor}) + >>> all_events.extend(res["events"]) + >>> cursor = res.get("cursor") + >>> if cursor is None: + >>> break """ req: UpstashRequest = { "path": ["v2", "dlq"], diff --git a/upstash_qstash/asyncio/events.py b/upstash_qstash/asyncio/events.py index f75957f..9c67bac 100644 --- a/upstash_qstash/asyncio/events.py +++ b/upstash_qstash/asyncio/events.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional +from typing import Optional from upstash_qstash.upstash_http import HttpClient from upstash_qstash.events import EventsRequest, GetEventsResponse @@ -12,8 +12,8 @@ async def get( Asynchronously retrieve logs. """ query = {} - if req is not None and req.get("cursor") is not None and req["cursor"] > 0: - query["cursor"] = str(req["cursor"]) + if req is not None and req.get("cursor"): + query["cursor"] = req["cursor"] return await http.request_async( { diff --git a/upstash_qstash/client.py b/upstash_qstash/client.py index 0416299..0369f5f 100644 --- a/upstash_qstash/client.py +++ b/upstash_qstash/client.py @@ -73,7 +73,7 @@ def dlq(self): """ Access the dlq API. - Read or remove messages from the DLQ. + Read or remove messages from the DLQ.s """ return DLQ(self.http) @@ -91,7 +91,7 @@ def events(self, req: Optional[EventsRequest] = None) -> GetEventsResponse: The logs endpoint is paginated and returns only 100 logs at a time. If you want to receive more logs, you can use the cursor to paginate. - The cursor is a unix timestamp with millisecond precision + The cursor is a stringified unix timestamp with millisecond precision :param req: An instance of EventsRequest containing the cursor :return: The events response object. @@ -99,11 +99,14 @@ def events(self, req: Optional[EventsRequest] = None) -> GetEventsResponse: Example: -------- Initialize the cursor to the current timestamp in milliseconds: - >>> cursor = int(time.time() * 1000) - >>> logs = [] - >>> while cursor > 0: + >>> all_events = [] + >>> cursor = None + >>> while True: >>> res = client.events({"cursor": cursor}) - >>> logs.extend(res['events']) - >>> cursor = res.get('cursor', 0) + >>> print(len(res["events"])) + >>> all_events.extend(res["events"]) + >>> cursor = res.get("cursor") + >>> if cursor is None: + >>> break """ return Events.get(self.http, req) diff --git a/upstash_qstash/dlq.py b/upstash_qstash/dlq.py index c8ce002..d1b6700 100644 --- a/upstash_qstash/dlq.py +++ b/upstash_qstash/dlq.py @@ -44,7 +44,19 @@ def list_messages( self, opts: Optional[ListMessagesOpts] = None ) -> ListMessageResponse: """ - List messages in the dlq + List messages in the dlq. + + Example: + -------- + >>> dlq = client.dlq() + >>> all_events = [] + >>> cursor = None + >>> while True: + >>> res = dlq.list_messages({"cursor": cursor}) + >>> all_events.extend(res["events"]) + >>> cursor = res.get("cursor") + >>> if cursor is None: + >>> break """ req: UpstashRequest = { "path": ["v2", "dlq"], diff --git a/upstash_qstash/events.py b/upstash_qstash/events.py index 4ec5aff..1b88ef1 100644 --- a/upstash_qstash/events.py +++ b/upstash_qstash/events.py @@ -1,4 +1,4 @@ -from typing import TypedDict, Optional, List, Dict +from typing import TypedDict, Optional, List from enum import Enum from upstash_qstash.upstash_http import HttpClient @@ -29,7 +29,7 @@ class State(Enum): EventsRequest = TypedDict( "EventsRequest", { - "cursor": int, + "cursor": str, }, ) @@ -49,8 +49,8 @@ def get(http: HttpClient, req: Optional[EventsRequest] = None) -> GetEventsRespo Retrieve logs. """ query = {} - if req is not None and req.get("cursor") is not None and req["cursor"] > 0: - query["cursor"] = str(req["cursor"]) + if req is not None and req.get("cursor") is not None: + query["cursor"] = req["cursor"] return http.request( { From 0fd0e623e775d3dd9c1bf394e7b3a2e12ba4cac8 Mon Sep 17 00:00:00 2001 From: Meshan Khosla Date: Thu, 11 Jan 2024 22:22:20 -0800 Subject: [PATCH 2/4] Formatting --- upstash_qstash/asyncio/dlq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/upstash_qstash/asyncio/dlq.py b/upstash_qstash/asyncio/dlq.py index f0cd635..4c606fd 100644 --- a/upstash_qstash/asyncio/dlq.py +++ b/upstash_qstash/asyncio/dlq.py @@ -16,7 +16,7 @@ async def list_messages( Example: -------- - >>> dlq = client.dlq() + >>> dlq = client.dlq() >>> all_events = [] >>> cursor = None >>> while True: From 3f0c39518e5de06562bf1841f26d1b7cfc2f182f Mon Sep 17 00:00:00 2001 From: Meshan Khosla Date: Thu, 11 Jan 2024 22:23:28 -0800 Subject: [PATCH 3/4] Fix typo --- upstash_qstash/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/upstash_qstash/client.py b/upstash_qstash/client.py index 0369f5f..4208d61 100644 --- a/upstash_qstash/client.py +++ b/upstash_qstash/client.py @@ -73,7 +73,7 @@ def dlq(self): """ Access the dlq API. - Read or remove messages from the DLQ.s + Read or remove messages from the DLQ. """ return DLQ(self.http) From f40066e2b5fdb57d194c4d05c563b3ced039af37 Mon Sep 17 00:00:00 2001 From: Meshan Khosla Date: Thu, 11 Jan 2024 22:28:20 -0800 Subject: [PATCH 4/4] events->messages --- upstash_qstash/asyncio/dlq.py | 4 ++-- upstash_qstash/dlq.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/upstash_qstash/asyncio/dlq.py b/upstash_qstash/asyncio/dlq.py index 4c606fd..6591a9e 100644 --- a/upstash_qstash/asyncio/dlq.py +++ b/upstash_qstash/asyncio/dlq.py @@ -17,11 +17,11 @@ async def list_messages( Example: -------- >>> dlq = client.dlq() - >>> all_events = [] + >>> all_messages = [] >>> cursor = None >>> while True: >>> res = await dlq.list_messages({"cursor": cursor}) - >>> all_events.extend(res["events"]) + >>> all_messages.extend(res["messages"]) >>> cursor = res.get("cursor") >>> if cursor is None: >>> break diff --git a/upstash_qstash/dlq.py b/upstash_qstash/dlq.py index d1b6700..b7c40f5 100644 --- a/upstash_qstash/dlq.py +++ b/upstash_qstash/dlq.py @@ -46,14 +46,14 @@ def list_messages( """ List messages in the dlq. - Example: + Example: -------- >>> dlq = client.dlq() - >>> all_events = [] + >>> all_messages = [] >>> cursor = None >>> while True: >>> res = dlq.list_messages({"cursor": cursor}) - >>> all_events.extend(res["events"]) + >>> all_messages.extend(res["messages"]) >>> cursor = res.get("cursor") >>> if cursor is None: >>> break