Skip to content

Commit b383f0b

Browse files
committed
wip
1 parent 55f0c18 commit b383f0b

File tree

16 files changed

+784
-275
lines changed

16 files changed

+784
-275
lines changed

nats-jetstream/examples/pull_consume_callback.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ async def main():
4343
s = await js.create_stream(name="TEST_STREAM", subjects=["FOO.*"])
4444

4545
cons = await s.upsert_consumer(
46-
name="TestConsumerConsume", durable_name="TestConsumerConsume", ack_policy="explicit"
46+
name="TestConsumerConsume",
47+
durable_name="TestConsumerConsume",
48+
ack_policy="explicit"
4749
)
4850

4951
publish_task = asyncio.create_task(endless_publish(nc, js))

nats-jetstream/examples/pull_consume_iterator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ async def main():
3030
s = await js.create_stream(name="TEST_STREAM", subjects=["FOO.*"])
3131

3232
cons = await s.upsert_consumer(
33-
name="TestConsumerIterator", durable_name="TestConsumerIterator", ack_policy="explicit"
33+
name="TestConsumerIterator",
34+
durable_name="TestConsumerIterator",
35+
ack_policy="explicit"
3436
)
3537

3638
publish_task = asyncio.create_task(endless_publish(nc, js))

nats-jetstream/examples/pull_fetch.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ async def main():
2929
s = await js.create_stream(name="TEST_STREAM", subjects=["FOO.*"])
3030

3131
cons = await s.upsert_consumer(
32-
name="TestConsumerListener", durable_name="TestConsumerListener", ack_policy="explicit"
32+
name="TestConsumerListener",
33+
durable_name="TestConsumerListener",
34+
ack_policy="explicit"
3335
)
3436

3537
publish_task = asyncio.create_task(endless_publish(nc, js))

nats-jetstream/src/nats/jetstream/__init__.py

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ def from_response(cls, data: api.AccountInfo) -> AccountInfo:
107107
limits=AccountLimits.from_response(data["limits"]),
108108
api=APIStats.from_response(data["api"]),
109109
domain=data.get("domain"),
110-
tiers={k: Tier.from_response(v) for k, v in data["tiers"].items()} if "tiers" in data else None,
110+
tiers={
111+
k: Tier.from_response(v)
112+
for k, v in data["tiers"].items()
113+
} if "tiers" in data else None,
111114
)
112115

113116

@@ -133,7 +136,12 @@ def from_response(cls, data: api.PublishAck) -> PublishAck:
133136
class JetStream:
134137
"""JetStream context."""
135138

136-
def __init__(self, client: NatsClient, prefix: str = "$JS.API", domain: str | None = None) -> None:
139+
def __init__(
140+
self,
141+
client: NatsClient,
142+
prefix: str = "$JS.API",
143+
domain: str | None = None
144+
) -> None:
137145
"""Initialize JetStream client.
138146
139147
Args:
@@ -185,7 +193,8 @@ async def publish(
185193

186194
return publish_ack
187195

188-
async def stream_names(self, subject: str | None = None) -> AsyncIterator[str]:
196+
async def stream_names(self,
197+
subject: str | None = None) -> AsyncIterator[str]:
189198
"""Get an async iterator over all stream names.
190199
191200
Args:
@@ -197,7 +206,9 @@ async def stream_names(self, subject: str | None = None) -> AsyncIterator[str]:
197206
offset = 0
198207
total = None
199208
while True:
200-
response = await self._api.stream_names(offset=offset, subject=subject)
209+
response = await self._api.stream_names(
210+
offset=offset, subject=subject
211+
)
201212
streams = response.get("streams", [])
202213
if streams is None:
203214
streams = []
@@ -216,7 +227,9 @@ async def stream_names(self, subject: str | None = None) -> AsyncIterator[str]:
216227
# Increment offset by the number of streams we received
217228
offset += len(streams)
218229

219-
async def list_streams(self, subject: str | None = None) -> AsyncIterator[StreamInfo]:
230+
async def list_streams(self,
231+
subject: str | None = None
232+
) -> AsyncIterator[StreamInfo]:
220233
"""Get an async iterator over all streams.
221234
222235
Args:
@@ -228,7 +241,9 @@ async def list_streams(self, subject: str | None = None) -> AsyncIterator[Stream
228241
offset = 0
229242
total = None
230243
while True:
231-
response = await self._api.stream_list(offset=offset, subject=subject)
244+
response = await self._api.stream_list(
245+
offset=offset, subject=subject
246+
)
232247
streams = response.get("streams", [])
233248
if streams is None:
234249
streams = []
@@ -285,7 +300,13 @@ async def get_stream(self, name: str) -> Stream:
285300
info = await self.get_stream_info(name)
286301
return Stream(self, name, info)
287302

288-
async def create_consumer(self, stream_name: str, name: str, durable_name: str | None = None, **config) -> Consumer:
303+
async def create_consumer(
304+
self,
305+
stream_name: str,
306+
name: str,
307+
durable_name: str | None = None,
308+
**config
309+
) -> Consumer:
289310
"""Create a consumer for a stream.
290311
291312
Args:
@@ -309,7 +330,9 @@ async def create_consumer(self, stream_name: str, name: str, durable_name: str |
309330
# Create the consumer via the stream
310331
return await stream.create_consumer(name, **consumer_config)
311332

312-
async def get_consumer(self, stream_name: str, consumer_name: str) -> Consumer:
333+
async def get_consumer(
334+
self, stream_name: str, consumer_name: str
335+
) -> Consumer:
313336
"""Get a consumer by name.
314337
315338
Args:
@@ -322,7 +345,9 @@ async def get_consumer(self, stream_name: str, consumer_name: str) -> Consumer:
322345
stream = await self.get_stream(stream_name)
323346
return await stream.get_consumer(consumer_name)
324347

325-
async def delete_consumer(self, stream_name: str, consumer_name: str) -> bool:
348+
async def delete_consumer(
349+
self, stream_name: str, consumer_name: str
350+
) -> bool:
326351
"""Delete a consumer.
327352
328353
Args:
@@ -335,7 +360,9 @@ async def delete_consumer(self, stream_name: str, consumer_name: str) -> bool:
335360
stream = await self.get_stream(stream_name)
336361
return await stream.delete_consumer(consumer_name)
337362

338-
async def update_consumer(self, stream_name: str, consumer_name: str, **config) -> Consumer:
363+
async def update_consumer(
364+
self, stream_name: str, consumer_name: str, **config
365+
) -> Consumer:
339366
"""Update a consumer.
340367
341368
Args:
@@ -362,7 +389,9 @@ async def consumer_names(self, stream_name: str) -> AsyncIterator[str]:
362389
total = None
363390

364391
while True:
365-
response = await self._api.consumer_list(stream_name, offset=offset)
392+
response = await self._api.consumer_list(
393+
stream_name, offset=offset
394+
)
366395
consumers = response.get("consumers", [])
367396

368397
if consumers is None:
@@ -382,7 +411,8 @@ async def consumer_names(self, stream_name: str) -> AsyncIterator[str]:
382411
# Increment offset
383412
offset += len(consumers)
384413

385-
async def consumers_info(self, stream_name: str) -> AsyncIterator[ConsumerInfo]:
414+
async def consumers_info(self,
415+
stream_name: str) -> AsyncIterator[ConsumerInfo]:
386416
"""Get an async iterator over all consumer info objects for a stream.
387417
388418
Args:
@@ -395,7 +425,9 @@ async def consumers_info(self, stream_name: str) -> AsyncIterator[ConsumerInfo]:
395425
total = None
396426

397427
while True:
398-
response = await self._api.consumer_names(stream_name, offset=offset)
428+
response = await self._api.consumer_names(
429+
stream_name, offset=offset
430+
)
399431
consumers = response.get("consumers", [])
400432

401433
if consumers is None:
@@ -416,7 +448,9 @@ async def consumers_info(self, stream_name: str) -> AsyncIterator[ConsumerInfo]:
416448
# Increment offset
417449
offset += len(consumers)
418450

419-
async def get_consumer_info(self, stream_name: str, consumer_name: str) -> ConsumerInfo:
451+
async def get_consumer_info(
452+
self, stream_name: str, consumer_name: str
453+
) -> ConsumerInfo:
420454
"""Get consumer info.
421455
422456
Args:
@@ -467,11 +501,15 @@ async def get_message(self, stream: str, sequence: int) -> StreamMessage:
467501
subject=message["subject"],
468502
sequence=message["seq"],
469503
data=data or b"",
470-
time=datetime.fromisoformat(message["time"].replace("Z", "+00:00")),
504+
time=datetime.fromisoformat(
505+
message["time"].replace("Z", "+00:00")
506+
),
471507
headers=headers,
472508
)
473509

474-
async def get_last_message_for_subject(self, stream: str, subject: str) -> StreamMessage:
510+
async def get_last_message_for_subject(
511+
self, stream: str, subject: str
512+
) -> StreamMessage:
475513
"""Get the last message for a subject directly from a stream.
476514
477515
This is a direct message get that requires the stream to have allow_direct=true.
@@ -504,12 +542,18 @@ async def get_last_message_for_subject(self, stream: str, subject: str) -> Strea
504542
subject=message["subject"],
505543
sequence=message["seq"],
506544
data=data or b"",
507-
time=datetime.fromisoformat(message["time"].replace("Z", "+00:00")),
545+
time=datetime.fromisoformat(
546+
message["time"].replace("Z", "+00:00")
547+
),
508548
headers=headers,
509549
)
510550

511551

512-
def new(client: NatsClient, prefix: str = "$JS.API", domain: str | None = None) -> JetStream:
552+
def new(
553+
client: NatsClient,
554+
prefix: str = "$JS.API",
555+
domain: str | None = None
556+
) -> JetStream:
513557
"""Create a new JetStream instance.
514558
515559
Args:

0 commit comments

Comments
 (0)