File tree 3 files changed +27
-1
lines changed
3 files changed +27
-1
lines changed Original file line number Diff line number Diff line change @@ -394,6 +394,9 @@ class ConsumerConfig(Base):
394
394
deliver_subject : Optional [str ] = None
395
395
deliver_group : Optional [str ] = None
396
396
397
+ # Ephemeral inactivity threshold
398
+ inactive_threshold : Optional [float ] = None # in seconds
399
+
397
400
# Generally inherited by parent stream and other markers, now can
398
401
# be configured directly.
399
402
num_replicas : Optional [int ] = None
@@ -405,12 +408,16 @@ class ConsumerConfig(Base):
405
408
def from_response (cls , resp : Dict [str , Any ]):
406
409
cls ._convert_nanoseconds (resp , 'ack_wait' )
407
410
cls ._convert_nanoseconds (resp , 'idle_heartbeat' )
411
+ cls ._convert_nanoseconds (resp , 'inactive_threshold' )
408
412
return super ().from_response (resp )
409
413
410
414
def as_dict (self ) -> Dict [str , object ]:
411
415
result = super ().as_dict ()
412
416
result ['ack_wait' ] = self ._to_nanoseconds (self .ack_wait )
413
417
result ['idle_heartbeat' ] = self ._to_nanoseconds (self .idle_heartbeat )
418
+ result ['inactive_threshold' ] = self ._to_nanoseconds (
419
+ self .inactive_threshold
420
+ )
414
421
return result
415
422
416
423
Original file line number Diff line number Diff line change @@ -1840,7 +1840,6 @@ async def reconnected_cb():
1840
1840
reconnected .set_result (True )
1841
1841
1842
1842
async def err_cb (e ):
1843
- print ("ERROR: " , e )
1844
1843
nonlocal errors
1845
1844
errors .append (e )
1846
1845
Original file line number Diff line number Diff line change @@ -1091,6 +1091,26 @@ async def test_consumer_with_name(self):
1091
1091
assert err .value .err_code == 10017
1092
1092
assert err .value .description == 'consumer name in subject does not match durable name in request'
1093
1093
1094
+ # Create ephemeral pull consumer with a name and inactive threshold.
1095
+ stream_name = "ctests"
1096
+ consumer_name = "inactive"
1097
+ cinfo = await jsm .add_consumer (
1098
+ stream_name ,
1099
+ name = consumer_name ,
1100
+ ack_policy = "explicit" ,
1101
+ inactive_threshold = 2 , # seconds
1102
+ )
1103
+ assert cinfo .config .name == consumer_name
1104
+
1105
+ sub = await js .pull_subscribe_bind (consumer_name , stream_name )
1106
+ msgs = await sub .fetch (1 )
1107
+ assert msgs [0 ].data == b'hello world!'
1108
+ ok = await msgs [0 ].ack_sync ()
1109
+ assert ok
1110
+
1111
+ cinfo = await sub .consumer_info ()
1112
+ assert cinfo .config .inactive_threshold == 2.0
1113
+
1094
1114
await nc .close ()
1095
1115
1096
1116
You can’t perform that action at this time.
0 commit comments