@@ -24,7 +24,6 @@ const {
2424const { Buffer } = require ( 'buffer' ) ;
2525const MessageCache = require ( './_consumer_cache' ) ;
2626const { hrtime } = require ( 'process' ) ;
27- const { LinkedList } = require ( './_linked-list' ) ;
2827
2928const ConsumerState = Object . freeze ( {
3029 INIT : 0 ,
@@ -203,11 +202,10 @@ class Consumer {
203202 * It's set to null when no fetch is in progress.
204203 */
205204 #fetchInProgress;
206-
207205 /**
208- * List of DeferredPromises waiting on consumer queue to be non-empty.
206+ * Are we waiting for the queue to be non-empty?
209207 */
210- #queueWaiters = new LinkedList ( ) ;
208+ #nonEmpty = null ;
211209
212210 /**
213211 * Whether any rebalance callback is in progress.
@@ -363,7 +361,6 @@ class Consumer {
363361 */
364362 async #rebalanceCallback( err , assignment ) {
365363 const isLost = this . #internalClient. assignmentLost ( ) ;
366- this . #rebalanceCbInProgress = new DeferredPromise ( ) ;
367364 let assignmentFnCalled = false ;
368365 this . #logger. info (
369366 `Received rebalance event with message: '${ err . message } ' and ${ assignment . length } partition(s), isLost: ${ isLost } ` ,
@@ -468,7 +465,7 @@ class Consumer {
468465 */
469466 const workersToSpawn = Math . max ( 1 , Math . min ( this . #concurrency, this . #partitionCount) ) ;
470467 if ( workersToSpawn !== this . #workers. length ) {
471- this . #workerTerminationScheduled . resolve ( ) ;
468+ this . #resolveWorkerTerminationScheduled ( ) ;
472469 /* We don't need to await the workers here. We are OK if the termination and respawning
473470 * occurs later, since even if we have a few more or few less workers for a while, it's
474471 * not a big deal. */
@@ -639,11 +636,14 @@ class Consumer {
639636 /* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks.
640637 * TODO: add trampoline method for offset commit callback. */
641638 rdKafkaConfig [ 'offset_commit_cb' ] = true ;
642- rdKafkaConfig [ 'rebalance_cb' ] = ( err , assignment ) => this . #rebalanceCallback( err , assignment ) . catch ( e =>
639+ rdKafkaConfig [ 'rebalance_cb' ] = ( err , assignment ) => {
640+ this . #rebalanceCbInProgress = new DeferredPromise ( ) ;
641+ this . #rebalanceCallback( err , assignment ) . catch ( e =>
643642 {
644643 if ( this . #logger)
645644 this . #logger. error ( `Error from rebalance callback: ${ e . stack } ` ) ;
646645 } ) ;
646+ } ;
647647
648648 /* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this
649649 * setting and set it to false. */
@@ -904,6 +904,7 @@ class Consumer {
904904 const returnPayload = {
905905 batch,
906906 _stale : false ,
907+ _seeked : false ,
907908 _lastResolvedOffset : { offset : - 1 , leaderEpoch : - 1 } ,
908909 heartbeat : async ( ) => { /* no op */ } ,
909910 pause : this . pause . bind ( this , [ { topic, partitions : [ partition ] } ] ) ,
@@ -922,9 +923,25 @@ class Consumer {
922923
923924 async #fetchAndResolveWith( takeFromCache , size ) {
924925 if ( this . #fetchInProgress) {
926+ await this . #fetchInProgress;
927+ /* Restart with the checks as we might have
928+ * a new fetch in progress already. */
929+ return null ;
930+ }
931+
932+ if ( this . #nonEmpty) {
933+ await this . #nonEmpty;
934+ /* Restart with the checks as we might have
935+ * a new fetch in progress already. */
925936 return null ;
926937 }
927938
939+ if ( this . #workerTerminationScheduled. resolved ) {
940+ /* Return without fetching. */
941+ return null ;
942+ }
943+
944+ let err , messages , processedRebalance = false ;
928945 try {
929946 this . #fetchInProgress = new DeferredPromise ( ) ;
930947 const fetchResult = new DeferredPromise ( ) ;
@@ -933,8 +950,9 @@ class Consumer {
933950 this . #internalClient. consume ( size , ( err , messages ) =>
934951 fetchResult . resolve ( [ err , messages ] ) ) ;
935952
936- let [ err , messages ] = await fetchResult ;
953+ [ err , messages ] = await fetchResult ;
937954 if ( this . #rebalanceCbInProgress) {
955+ processedRebalance = true ;
938956 await this . #rebalanceCbInProgress;
939957 this . #rebalanceCbInProgress = null ;
940958 }
@@ -956,6 +974,8 @@ class Consumer {
956974 } finally {
957975 this . #fetchInProgress. resolve ( ) ;
958976 this . #fetchInProgress = null ;
977+ if ( ! err && ! processedRebalance && this . #messageCache. assignedSize === 0 )
978+ this . #nonEmpty = new DeferredPromise ( ) ;
959979 }
960980 }
961981
@@ -973,10 +993,13 @@ class Consumer {
973993 }
974994
975995 /* It's possible that we get msg = null, but that's because partitionConcurrency
976- * exceeds the number of partitions containing messages. So in this case,
977- * we should not call for new fetches, rather, try to focus on what we have left .
996+ * exceeds the number of partitions containing messages. So
997+ * we should wait for a new partition to be available .
978998 */
979999 if ( ! msg && this . #messageCache. assignedSize !== 0 ) {
1000+ await this . #messageCache. availablePartitions ( ) ;
1001+ /* Restart with the checks as we might have
1002+ * the cache full. */
9801003 return null ;
9811004 }
9821005
@@ -1000,10 +1023,13 @@ class Consumer {
10001023 }
10011024
10021025 /* It's possible that we get msgs = null, but that's because partitionConcurrency
1003- * exceeds the number of partitions containing messages. So in this case,
1004- * we should not call for new fetches, rather, try to focus on what we have left .
1026+ * exceeds the number of partitions containing messages. So
1027+ * we should wait for a new partition to be available .
10051028 */
10061029 if ( ! msgs && this . #messageCache. assignedSize !== 0 ) {
1030+ await this . #messageCache. availablePartitions ( ) ;
1031+ /* Restart with the checks as we might have
1032+ * the cache full. */
10071033 return null ;
10081034 }
10091035
@@ -1316,7 +1342,7 @@ class Consumer {
13161342
13171343 /* If any message is unprocessed, either due to an error or due to the user not marking it processed, we must seek
13181344 * back to get it so it can be reprocessed. */
1319- if ( lastOffsetProcessed . offset !== lastOffset ) {
1345+ if ( ! payload . _seeked && lastOffsetProcessed . offset !== lastOffset ) {
13201346 const offsetToSeekTo = lastOffsetProcessed . offset === - 1 ? firstMessage . offset : ( lastOffsetProcessed . offset + 1 ) ;
13211347 const leaderEpoch = lastOffsetProcessed . offset === - 1 ? firstMessage . leaderEpoch : lastOffsetProcessed . leaderEpoch ;
13221348 this . seek ( {
@@ -1348,36 +1374,27 @@ class Consumer {
13481374 return ppc ;
13491375 }
13501376
1351- #queueNonEmptyCb( ) {
1352- for ( const waiter of this . #queueWaiters) {
1353- waiter . resolve ( ) ;
1377+ #notifyNonEmpty( ) {
1378+ if ( this . #nonEmpty) {
1379+ this . #nonEmpty. resolve ( ) ;
1380+ this . #nonEmpty = null ;
13541381 }
1382+ if ( this . #messageCache)
1383+ this . #messageCache. notifyAvailablePartitions ( ) ;
13551384 }
13561385
1357- async #nextFetchRetry( ) {
1358- if ( this . #fetchInProgress) {
1359- await this . #fetchInProgress;
1360- } else {
1361- /* Backoff a little. If m is null, we might be without messages
1362- * or in available partition starvation, and calling consumeSingleCached
1363- * in a tight loop will help no one.
1364- * In case there is any message in the queue, we'll be woken up before the
1365- * timer expires.
1366- * We have a per-worker promise, otherwise we end up awakening
1367- * other workers when they've already looped and just restarted awaiting.
1368- * The `Promise` passed to `Timer.withTimeout` cannot be reused
1369- * in next call to this method, to avoid memory leaks caused
1370- * by `Promise.race`. */
1371- const waiter = new DeferredPromise ( ) ;
1372- const waiterNode = this . #queueWaiters. addLast ( waiter ) ;
1373- await Timer . withTimeout ( 1000 , waiter ) ;
1374-
1375- /* Resolves the "extra" promise that has been spawned when creating the timer. */
1376- waiter . resolve ( ) ;
1377- this . #queueWaiters. remove ( waiterNode ) ;
1378- }
1379- }
1386+ #queueNonEmptyCb( ) {
1387+ const nonEmptyAction = async ( ) => {
1388+ if ( this . #fetchInProgress)
1389+ await this . #fetchInProgress;
13801390
1391+ this . #notifyNonEmpty( ) ;
1392+ } ;
1393+ nonEmptyAction ( ) . catch ( ( e ) => {
1394+ this . #logger. error ( `Error in queueNonEmptyCb: ${ e } ` ,
1395+ this . #createConsumerBindingMessageMetadata( ) ) ;
1396+ } ) ;
1397+ }
13811398 /**
13821399 * Starts a worker to fetch messages/batches from the internal consumer and process them.
13831400 *
@@ -1393,27 +1410,24 @@ class Consumer {
13931410 */
13941411 async #worker( config , perMessageProcessor , fetcher ) {
13951412 let ppc = null ;
1396-
13971413 while ( ! this . #workerTerminationScheduled. resolved ) {
1414+ try {
1415+ const ms = await fetcher ( ppc ) ;
1416+ if ( ! ms )
1417+ continue ;
13981418
1399- const ms = await fetcher ( ppc ) . catch ( e => {
1419+ if ( this . #pendingOperations. length ) {
1420+ ppc = this . #discardMessages( ms , ppc ) ;
1421+ break ;
1422+ }
1423+
1424+ ppc = await perMessageProcessor ( ms , config ) ;
1425+ } catch ( e ) {
14001426 /* Since this error cannot be exposed to the user in the current situation, just log and retry.
14011427 * This is due to restartOnFailure being set to always true. */
14021428 if ( this . #logger)
14031429 this . #logger. error ( `Consumer encountered error while consuming. Retrying. Error details: ${ e } : ${ e . stack } ` , this . #createConsumerBindingMessageMetadata( ) ) ;
1404- } ) ;
1405-
1406- if ( this . #pendingOperations. length ) {
1407- ppc = this . #discardMessages( ms , ppc ) ;
1408- break ;
1409- }
1410-
1411- if ( ! ms ) {
1412- await this . #nextFetchRetry( ) ;
1413- continue ;
14141430 }
1415-
1416- ppc = await perMessageProcessor ( ms , config ) ;
14171431 }
14181432
14191433 if ( ppc )
@@ -1447,19 +1461,32 @@ class Consumer {
14471461 * @private
14481462 */
14491463 async #cacheExpirationLoop( ) {
1464+ const cacheExpirationInterval = BigInt ( this . #cacheExpirationTimeoutMs * 1e6 ) ;
1465+ const maxFetchInterval = BigInt ( 1000 * 1e6 ) ;
14501466 while ( ! this . #workerTerminationScheduled. resolved ) {
14511467 let now = hrtime . bigint ( ) ;
1452- const cacheExpiration = this . #lastFetchClockNs +
1453- BigInt ( this . #cacheExpirationTimeoutMs * 1e6 ) ;
1468+ const cacheExpirationTimeout = this . #lastFetchClockNs +
1469+ cacheExpirationInterval ;
1470+ const maxFetchTimeout = this . #lastFetchClockNs +
1471+ maxFetchInterval ;
14541472
1455- if ( now > cacheExpiration ) {
1473+ if ( now > cacheExpirationTimeout ) {
14561474 this . #addPendingOperation( ( ) =>
14571475 this . #clearCacheAndResetPositions( ) ) ;
14581476 await this . #checkMaxPollIntervalNotExceeded( now ) ;
14591477 break ;
14601478 }
1479+ if ( now > maxFetchTimeout ) {
1480+ /* We need to continue fetching even when we're
1481+ * not getting any messages, for example when all partitions are
1482+ * paused. */
1483+ this . #notifyNonEmpty( ) ;
1484+ }
14611485
1462- let interval = Number ( cacheExpiration - now ) / 1e6 ;
1486+ const awakeTime = maxFetchTimeout < cacheExpirationTimeout ?
1487+ maxFetchTimeout : cacheExpirationTimeout ;
1488+
1489+ let interval = Number ( awakeTime - now ) / 1e6 ;
14631490 if ( interval < 100 )
14641491 interval = 100 ;
14651492 await Timer . withTimeout ( interval , this . #maxPollIntervalRestart) ;
@@ -1481,6 +1508,13 @@ class Consumer {
14811508 this . #pendingOperations = [ ] ;
14821509 }
14831510
1511+ #resolveWorkerTerminationScheduled( ) {
1512+ if ( this . #workerTerminationScheduled) {
1513+ this . #workerTerminationScheduled. resolve ( ) ;
1514+ this . #queueNonEmptyCb( ) ;
1515+ }
1516+ }
1517+
14841518 /**
14851519 * Internal polling loop.
14861520 * Spawns and awaits workers until disconnect is initiated.
@@ -1662,7 +1696,7 @@ class Consumer {
16621696
16631697 #addPendingOperation( fun ) {
16641698 if ( this . #pendingOperations. length === 0 ) {
1665- this . #workerTerminationScheduled . resolve ( ) ;
1699+ this . #resolveWorkerTerminationScheduled ( ) ;
16661700 }
16671701 this . #pendingOperations. push ( fun ) ;
16681702 }
@@ -1727,11 +1761,15 @@ class Consumer {
17271761 }
17281762 }
17291763
1730- #markBatchPayloadsStale( topicPartitions ) {
1764+ #markBatchPayloadsStale( topicPartitions , isSeek ) {
17311765 for ( const topicPartition of topicPartitions ) {
17321766 const key = partitionKey ( topicPartition ) ;
1733- if ( this . #topicPartitionToBatchPayload. has ( key ) )
1734- this . #topicPartitionToBatchPayload. get ( key ) . _stale = true ;
1767+ if ( this . #topicPartitionToBatchPayload. has ( key ) ) {
1768+ const payload = this . #topicPartitionToBatchPayload. get ( key ) ;
1769+ payload . _stale = true ;
1770+ if ( isSeek )
1771+ payload . _seeked = true ;
1772+ }
17351773 }
17361774 }
17371775
@@ -1757,7 +1795,7 @@ class Consumer {
17571795 }
17581796 }
17591797 if ( seekOffsets . length ) {
1760- await this . #seekInternal( seekOffsets , false ) ;
1798+ await this . #seekInternal( seekOffsets ) ;
17611799 }
17621800 }
17631801
@@ -1801,7 +1839,7 @@ class Consumer {
18011839 }
18021840
18031841 /* If anyone's using eachBatch, mark the batch as stale. */
1804- this . #markBatchPayloadsStale( [ rdKafkaTopicPartitionOffset ] ) ;
1842+ this . #markBatchPayloadsStale( [ rdKafkaTopicPartitionOffset ] , true ) ;
18051843
18061844 this . #addPendingOperation( ( ) =>
18071845 this . #seekInternal( [ rdKafkaTopicPartitionOffset ] ) ) ;
@@ -2010,7 +2048,7 @@ class Consumer {
20102048 }
20112049
20122050 this . #disconnectStarted = true ;
2013- this . #workerTerminationScheduled . resolve ( ) ;
2051+ this . #resolveWorkerTerminationScheduled ( ) ;
20142052 this . #logger. debug ( "Signalling disconnection attempt to workers" , this . #createConsumerBindingMessageMetadata( ) ) ;
20152053 await this . #lock. write ( async ( ) => {
20162054
0 commit comments