2424import org .apache .fluss .exception .FlussRuntimeException ;
2525import org .apache .fluss .exception .InvalidMetadataException ;
2626import org .apache .fluss .exception .LeaderNotAvailableException ;
27+ import org .apache .fluss .exception .RetriableException ;
2728import org .apache .fluss .metadata .PhysicalTablePath ;
2829import org .apache .fluss .metadata .TableBucket ;
2930import org .apache .fluss .metadata .TablePartition ;
@@ -74,10 +75,17 @@ class LookupSender implements Runnable {
7475
7576 private final Semaphore maxInFlightReuqestsSemaphore ;
7677
77- LookupSender (MetadataUpdater metadataUpdater , LookupQueue lookupQueue , int maxFlightRequests ) {
78+ private final int maxRetries ;
79+
80+ LookupSender (
81+ MetadataUpdater metadataUpdater ,
82+ LookupQueue lookupQueue ,
83+ int maxFlightRequests ,
84+ int maxRetries ) {
7885 this .metadataUpdater = metadataUpdater ;
7986 this .lookupQueue = lookupQueue ;
8087 this .maxInFlightReuqestsSemaphore = new Semaphore (maxFlightRequests );
88+ this .maxRetries = maxRetries ;
8189 this .running = true ;
8290 }
8391
@@ -307,10 +315,8 @@ private void handleLookupResponse(
307315 pbLookupRespForBucket .getBucketId ());
308316 LookupBatch lookupBatch = lookupsByBucket .get (tableBucket );
309317 if (pbLookupRespForBucket .hasErrorCode ()) {
310- // TODO for re-triable error, we should retry here instead of throwing exception.
311318 ApiError error = ApiError .fromErrorMessage (pbLookupRespForBucket );
312- handleLookupExceptionForBucket (tableBucket , destination , error , "lookup" );
313- lookupBatch .completeExceptionally (error .exception ());
319+ handleLookupError (tableBucket , destination , error , lookupBatch .lookups (), "" );
314320 } else {
315321 List <byte []> byteValues =
316322 pbLookupRespForBucket .getValuesList ().stream ()
@@ -345,10 +351,9 @@ private void handlePrefixLookupResponse(
345351
346352 PrefixLookupBatch prefixLookupBatch = prefixLookupsByBucket .get (tableBucket );
347353 if (pbRespForBucket .hasErrorCode ()) {
348- // TODO for re-triable error, we should retry here instead of throwing exception.
349354 ApiError error = ApiError .fromErrorMessage (pbRespForBucket );
350- handleLookupExceptionForBucket ( tableBucket , destination , error , "prefixLookup" );
351- prefixLookupBatch .completeExceptionally ( error . exception () );
355+ handleLookupError (
356+ tableBucket , destination , error , prefixLookupBatch .lookups (), "prefix " );
352357 } else {
353358 List <List <byte []>> result = new ArrayList <>(pbRespForBucket .getValueListsCount ());
354359 for (int i = 0 ; i < pbRespForBucket .getValueListsCount (); i ++) {
@@ -368,58 +373,106 @@ private void handleLookupRequestException(
368373 Throwable t , int destination , Map <TableBucket , LookupBatch > lookupsByBucket ) {
369374 ApiError error = ApiError .fromThrowable (t );
370375 for (LookupBatch lookupBatch : lookupsByBucket .values ()) {
371- // TODO for re-triable error, we should retry here instead of throwing exception.
372- handleLookupExceptionForBucket (lookupBatch .tableBucket (), destination , error , "lookup" );
373- lookupBatch .completeExceptionally (error .exception ());
376+ handleLookupError (
377+ lookupBatch .tableBucket (), destination , error , lookupBatch .lookups (), "" );
374378 }
375379 }
376380
377381 private void handlePrefixLookupException (
378382 Throwable t , int destination , Map <TableBucket , PrefixLookupBatch > lookupsByBucket ) {
379383 ApiError error = ApiError .fromThrowable (t );
380- // TODO If error, we need to retry send the request instead of throw exception.
381384 for (PrefixLookupBatch lookupBatch : lookupsByBucket .values ()) {
382- handleLookupExceptionForBucket (
383- lookupBatch .tableBucket (), destination , error , "prefixLookup" );
384- lookupBatch .completeExceptionally (error .exception ());
385+ handleLookupError (
386+ lookupBatch .tableBucket (),
387+ destination ,
388+ error ,
389+ lookupBatch .lookups (),
390+ "prefix " );
385391 }
386392 }
387393
388- void forceClose ( ) {
389- forceClose = true ;
390- initiateClose ( );
394+ private void reEnqueueLookup ( AbstractLookupQuery <?> lookup ) {
395+ lookup . incrementRetries () ;
396+ lookupQueue . appendLookup ( lookup );
391397 }
392398
393- void initiateClose () {
394- // Ensure accumulator is closed first to guarantee that no more appends are accepted after
395- // breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
396- lookupQueue .close ();
397- running = false ;
399+ private boolean canRetry (AbstractLookupQuery <?> lookup , Exception exception ) {
400+ return lookup .retries () < maxRetries
401+ && !lookup .future ().isDone ()
402+ && exception instanceof RetriableException ;
398403 }
399404
400- private void handleLookupExceptionForBucket (
401- TableBucket tb , int destination , ApiError error , String lookupType ) {
405+ /**
406+ * Handle lookup error with retry logic. For each lookup in the list, check if it can be
407+ * retried. If yes, re-enqueue it; otherwise, complete it exceptionally.
408+ *
409+ * @param tableBucket the table bucket
410+ * @param error the error from server response
411+ * @param lookups the list of lookups to handle
412+ * @param lookupType the type of lookup ("" for regular lookup, "prefix " for prefix lookup)
413+ */
414+ private void handleLookupError (
415+ TableBucket tableBucket ,
416+ int destination ,
417+ ApiError error ,
418+ List <? extends AbstractLookupQuery <?>> lookups ,
419+ String lookupType ) {
402420 ApiException exception = error .error ().exception ();
403421 LOG .error (
404- "Failed to {} from node {} for bucket {}" , lookupType , destination , tb , exception );
422+ "Failed to {} from node {} for bucket {}" ,
423+ lookupType ,
424+ destination ,
425+ tableBucket ,
426+ exception );
405427 if (exception instanceof InvalidMetadataException ) {
406428 LOG .warn (
407429 "Invalid metadata error in {} request. Going to request metadata update." ,
408430 lookupType ,
409431 exception );
410- long tableId = tb .getTableId ();
432+ long tableId = tableBucket .getTableId ();
411433 TableOrPartitions tableOrPartitions ;
412- if (tb .getPartitionId () == null ) {
434+ if (tableBucket .getPartitionId () == null ) {
413435 tableOrPartitions = new TableOrPartitions (Collections .singleton (tableId ), null );
414436 } else {
415437 tableOrPartitions =
416438 new TableOrPartitions (
417439 null ,
418440 Collections .singleton (
419- new TablePartition (tableId , tb .getPartitionId ())));
441+ new TablePartition (tableId , tableBucket .getPartitionId ())));
420442 }
421443 invalidTableOrPartitions (tableOrPartitions );
422444 }
445+
446+ for (AbstractLookupQuery <?> lookup : lookups ) {
447+ if (canRetry (lookup , error .exception ())) {
448+ LOG .warn (
449+ "Get error {}lookup response on table bucket {}, retrying ({} attempts left). Error: {}" ,
450+ lookupType ,
451+ tableBucket ,
452+ maxRetries - lookup .retries (),
453+ error .formatErrMsg ());
454+ reEnqueueLookup (lookup );
455+ } else {
456+ LOG .warn (
457+ "Get error {}lookup response on table bucket {}, fail. Error: {}" ,
458+ lookupType ,
459+ tableBucket ,
460+ error .formatErrMsg ());
461+ lookup .future ().completeExceptionally (error .exception ());
462+ }
463+ }
464+ }
465+
466+ void forceClose () {
467+ forceClose = true ;
468+ initiateClose ();
469+ }
470+
471+ void initiateClose () {
472+ // Ensure accumulator is closed first to guarantee that no more appends are accepted after
473+ // breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
474+ lookupQueue .close ();
475+ running = false ;
423476 }
424477
425478 /** A helper class to hold table ids or table partitions. */
0 commit comments