From 6ec6e33c6c9f6e6b0550d0ca1171894d83a8c11b Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Fri, 31 Oct 2025 10:11:18 +0800 Subject: [PATCH 1/4] fix thread-safety problem of PrimaryKeyLoookuper and PrefixKeyLookuper --- .../java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java | 2 +- .../java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java index 258d9b1c21..4d519d4d82 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java @@ -146,7 +146,7 @@ private void validatePrefixLookup(TableInfo tableInfo, List lookupColumn } @Override - public CompletableFuture lookup(InternalRow prefixKey) { + public synchronized CompletableFuture lookup(InternalRow prefixKey) { byte[] bucketKeyBytes = bucketKeyEncoder.encodeKey(prefixKey); int bucketId = bucketingFunction.bucketing(bucketKeyBytes, numBuckets); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java index 2a6233e46b..c0014417ae 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java @@ -101,7 +101,7 @@ public PrimaryKeyLookuper( } @Override - public CompletableFuture lookup(InternalRow lookupKey) { + public synchronized CompletableFuture lookup(InternalRow lookupKey) { // encoding the key row using a compacted way consisted with how the key is encoded when put // a row byte[] pkBytes = primaryKeyEncoder.encodeKey(lookupKey); From 94ad89dae360b62b86f9ae0d24b4a7b9973d8fe3 Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Wed, 5 Nov 2025 10:47:59 +0800 Subject: [PATCH 2/4] make lookup retries thread safety --- .../client/lookup/AbstractLookupQuery.java | 10 + .../fluss/client/lookup/LookupClient.java | 3 +- .../fluss/client/lookup/LookupSender.java | 80 +++- .../client/lookup/PrefixKeyLookuper.java | 4 +- .../client/lookup/PrimaryKeyLookuper.java | 4 +- .../fluss/client/lookup/LookupSenderTest.java | 429 ++++++++++++++++++ .../metadata/TestingMetadataUpdater.java | 60 ++- .../apache/fluss/config/ConfigOptions.java | 8 + .../flink/catalog/FlinkTableFactory.java | 7 + .../lookup/FlinkAsyncLookupFunction.java | 116 ++--- .../source/lookup/FlinkLookupFunction.java | 51 +-- 11 files changed, 630 insertions(+), 142 deletions(-) create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java index 1737a7696b..efaf0100a0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java @@ -28,10 +28,12 @@ public abstract class AbstractLookupQuery { private final TableBucket tableBucket; private final byte[] key; + private int retries; public AbstractLookupQuery(TableBucket tableBucket, byte[] key) { this.tableBucket = tableBucket; this.key = key; + this.retries = 0; } public byte[] key() { @@ -42,6 +44,14 @@ public TableBucket tableBucket() { return tableBucket; } + public int retries() { + return retries; + } + + public void incrementRetries() { + retries++; + } + public abstract LookupType lookupType(); public abstract CompletableFuture future(); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java index 3c201541a1..a974ba972f 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java @@ -67,7 +67,8 @@ public LookupClient(Configuration conf, MetadataUpdater metadataUpdater) { new LookupSender( metadataUpdater, lookupQueue, - conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE)); + conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE), + conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES)); lookupSenderThreadPool.submit(lookupSender); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java index 87fa0dff1c..22bdfce4ba 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.LeaderNotAvailableException; +import org.apache.fluss.exception.RetriableException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.LookupRequest; @@ -65,10 +66,17 @@ class LookupSender implements Runnable { private final Semaphore maxInFlightReuqestsSemaphore; - LookupSender(MetadataUpdater metadataUpdater, LookupQueue lookupQueue, int maxFlightRequests) { + private final int maxRetries; + + LookupSender( + MetadataUpdater metadataUpdater, + LookupQueue lookupQueue, + int maxFlightRequests, + int maxRetries) { this.metadataUpdater = metadataUpdater; this.lookupQueue = lookupQueue; this.maxInFlightReuqestsSemaphore = new Semaphore(maxFlightRequests); + this.maxRetries = maxRetries; this.running = true; } @@ -286,13 +294,8 @@ private void handleLookupResponse( pbLookupRespForBucket.getBucketId()); LookupBatch lookupBatch = lookupsByBucket.get(tableBucket); if (pbLookupRespForBucket.hasErrorCode()) { - // TODO for re-triable error, we should retry here instead of throwing exception. ApiError error = ApiError.fromErrorMessage(pbLookupRespForBucket); - LOG.warn( - "Get error lookup response on table bucket {}, fail. Error: {}", - tableBucket, - error.formatErrMsg()); - lookupBatch.completeExceptionally(error.exception()); + handleLookupError(tableBucket, error, lookupBatch.lookups(), ""); } else { List byteValues = pbLookupRespForBucket.getValuesList().stream() @@ -326,13 +329,8 @@ private void handlePrefixLookupResponse( PrefixLookupBatch prefixLookupBatch = prefixLookupsByBucket.get(tableBucket); if (pbRespForBucket.hasErrorCode()) { - // TODO for re-triable error, we should retry here instead of throwing exception. ApiError error = ApiError.fromErrorMessage(pbRespForBucket); - LOG.warn( - "Get error prefix lookup response on table bucket {}, fail. Error: {}", - tableBucket, - error.formatErrMsg()); - prefixLookupBatch.completeExceptionally(error.exception()); + handleLookupError(tableBucket, error, prefixLookupBatch.lookups(), "prefix "); } else { List> result = new ArrayList<>(pbRespForBucket.getValueListsCount()); for (int i = 0; i < pbRespForBucket.getValueListsCount(); i++) { @@ -352,22 +350,60 @@ private void handleLookupRequestException( Throwable t, Map lookupsByBucket) { ApiError error = ApiError.fromThrowable(t); for (LookupBatch lookupBatch : lookupsByBucket.values()) { - // TODO for re-triable error, we should retry here instead of throwing exception. - LOG.warn( - "Get error lookup response on table bucket {}, fail. Error: {}", - lookupBatch.tableBucket(), - error.formatErrMsg()); - lookupBatch.completeExceptionally(error.exception()); + handleLookupError(lookupBatch.tableBucket(), error, lookupBatch.lookups(), ""); } } private void handlePrefixLookupException( Throwable t, Map lookupsByBucket) { ApiError error = ApiError.fromThrowable(t); - // TODO If error, we need to retry send the request instead of throw exception. - LOG.warn("Get error prefix lookup response. Error: {}", error.formatErrMsg()); for (PrefixLookupBatch lookupBatch : lookupsByBucket.values()) { - lookupBatch.completeExceptionally(error.exception()); + handleLookupError(lookupBatch.tableBucket(), error, lookupBatch.lookups(), "prefix "); + } + } + + private void reEnqueueLookup(AbstractLookupQuery lookup) { + lookup.incrementRetries(); + lookupQueue.appendLookup(lookup); + } + + private boolean canRetry(AbstractLookupQuery lookup, Exception exception) { + return lookup.retries() < maxRetries + && !lookup.future().isDone() + && exception instanceof RetriableException; + } + + /** + * Handle lookup error with retry logic. For each lookup in the list, check if it can be + * retried. If yes, re-enqueue it; otherwise, complete it exceptionally. + * + * @param tableBucket the table bucket + * @param error the error from server response + * @param lookups the list of lookups to handle + * @param lookupType the type of lookup ("" for regular lookup, "prefix " for prefix lookup) + */ + private void handleLookupError( + TableBucket tableBucket, + ApiError error, + List> lookups, + String lookupType) { + for (AbstractLookupQuery lookup : lookups) { + if (canRetry(lookup, error.exception())) { + LOG.warn( + "Get error {}lookup response on table bucket {}, retrying ({} attempts left). Error: {}", + lookupType, + tableBucket, + maxRetries - lookup.retries(), + error.formatErrMsg()); + reEnqueueLookup(lookup); + } else { + LOG.warn( + "Get error {}lookup response on table bucket {}, fail. Error: {}", + lookupType, + tableBucket, + error.formatErrMsg()); + lookup.future().completeExceptionally(error.exception()); + } } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java index 4d519d4d82..61645374fd 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java @@ -32,6 +32,7 @@ import org.apache.fluss.types.RowType; import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; import java.util.ArrayList; import java.util.Collections; @@ -46,6 +47,7 @@ * An implementation of {@link Lookuper} that lookups by prefix key. A prefix key is a prefix subset * of the primary key. */ +@NotThreadSafe class PrefixKeyLookuper implements Lookuper { private final TableInfo tableInfo; @@ -146,7 +148,7 @@ private void validatePrefixLookup(TableInfo tableInfo, List lookupColumn } @Override - public synchronized CompletableFuture lookup(InternalRow prefixKey) { + public CompletableFuture lookup(InternalRow prefixKey) { byte[] bucketKeyBytes = bucketKeyEncoder.encodeKey(prefixKey); int bucketId = bucketingFunction.bucketing(bucketKeyBytes, numBuckets); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java index c0014417ae..2945f1ab40 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java @@ -32,6 +32,7 @@ import org.apache.fluss.types.RowType; import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -40,6 +41,7 @@ import static org.apache.fluss.utils.Preconditions.checkArgument; /** An implementation of {@link Lookuper} that lookups by primary key. */ +@NotThreadSafe class PrimaryKeyLookuper implements Lookuper { private final TableInfo tableInfo; @@ -101,7 +103,7 @@ public PrimaryKeyLookuper( } @Override - public synchronized CompletableFuture lookup(InternalRow lookupKey) { + public CompletableFuture lookup(InternalRow lookupKey) { // encoding the key row using a compacted way consisted with how the key is encoded when put // a row byte[] pkBytes = primaryKeyEncoder.encodeKey(lookupKey); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java new file mode 100644 index 0000000000..5db92be330 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.lookup; + +import org.apache.fluss.client.metadata.TestingMetadataUpdater; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.exception.TimeoutException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.LookupRequest; +import org.apache.fluss.rpc.messages.LookupResponse; +import org.apache.fluss.rpc.messages.PbLookupRespForBucket; +import org.apache.fluss.rpc.messages.PrefixLookupRequest; +import org.apache.fluss.rpc.messages.PrefixLookupResponse; +import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.server.tablet.TestTabletServerGateway; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Comprehensive unit tests for {@link LookupSender} retry mechanism. + * + *

These tests verify: + * + *

    + *
  • Retry behavior for RetriableException vs non-retriable exceptions + *
  • Max retries enforcement + *
  • Re-enqueue and eventual success scenarios + *
  • Complete lookup flow with controlled failures and retries + *
+ */ +class LookupSenderTest { + + private static final int MAX_RETRIES = 3; + private static final int MAX_INFLIGHT_REQUESTS = 10; + private static final TableBucket TABLE_BUCKET = new TableBucket(DATA1_TABLE_ID_PK, 0); + + private TestingMetadataUpdater metadataUpdater; + private LookupQueue lookupQueue; + private LookupSender lookupSender; + private Thread senderThread; + private ConfigurableTestTabletServerGateway gateway; + + @BeforeEach + void setup() { + // create a configurable gateway for testing + gateway = new ConfigurableTestTabletServerGateway(); + + // build metadata updater with custom gateway using builder pattern + Map tableInfos = new HashMap<>(); + tableInfos.put(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK); + metadataUpdater = + TestingMetadataUpdater.builder(tableInfos) + .withTabletServerGateway(1, gateway) + .build(); + + Configuration conf = new Configuration(); + lookupQueue = new LookupQueue(conf); + + lookupSender = + new LookupSender(metadataUpdater, lookupQueue, MAX_INFLIGHT_REQUESTS, MAX_RETRIES); + + senderThread = new Thread(lookupSender); + senderThread.start(); + } + + @AfterEach + void teardown() throws InterruptedException { + if (lookupSender != null) { + lookupSender.forceClose(); + } + if (senderThread != null) { + senderThread.join(5000); + } + } + + @Test + void testRetriableExceptionTriggersRetry() throws Exception { + // setup: fail twice with retriable exception, then succeed + AtomicInteger attemptCount = new AtomicInteger(0); + gateway.setLookupHandler( + request -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt <= 2) { + // first two attempts fail with retriable exception + return createFailedResponse( + request, new TimeoutException("simulated timeout")); + } else { + // third attempt succeeds + return createSuccessResponse(request, "value".getBytes()); + } + }); + + // execute: submit lookup + byte[] key = "key".getBytes(); + LookupQuery query = new LookupQuery(TABLE_BUCKET, key); + lookupQueue.appendLookup(query); + + // verify: eventually succeeds after retries + byte[] result = query.future().get(5, TimeUnit.SECONDS); + assertThat(result).isEqualTo("value".getBytes()); + assertThat(attemptCount.get()).isEqualTo(3); + assertThat(query.retries()).isEqualTo(2); // retried 2 times + } + + @Test + void testNonRetriableExceptionDoesNotRetry() throws Exception { + // setup: fail with non-retriable exception + gateway.setLookupHandler( + request -> + createFailedResponse( + request, new TableNotExistException("table not found"))); + + // execute: submit lookup + byte[] key = "key".getBytes(); + LookupQuery query = new LookupQuery(TABLE_BUCKET, key); + lookupQueue.appendLookup(query); + + // verify: fails immediately without retry + assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(TableNotExistException.class); + assertThat(query.retries()).isEqualTo(0); // no retries + } + + @Test + void testMaxRetriesEnforced() throws Exception { + // setup: always fail with retriable exception + AtomicInteger attemptCount = new AtomicInteger(0); + gateway.setLookupHandler( + request -> { + attemptCount.incrementAndGet(); + return createFailedResponse(request, new TimeoutException("timeout")); + }); + + // execute: submit lookup + byte[] key = "key".getBytes(); + LookupQuery query = new LookupQuery(TABLE_BUCKET, key); + lookupQueue.appendLookup(query); + + // verify: eventually fails after max retries + assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(TimeoutException.class); + + // should attempt: 1 initial + MAX_RETRIES retries + assertThat(attemptCount.get()).isEqualTo(1 + MAX_RETRIES); + assertThat(query.retries()).isEqualTo(MAX_RETRIES); + } + + @Test + void testRetryStopsIfFutureCompleted() throws Exception { + // setup: always fail with retriable exception + AtomicInteger attemptCount = new AtomicInteger(0); + gateway.setLookupHandler( + request -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt == 1) { + // first attempt fails + return createFailedResponse(request, new TimeoutException("timeout")); + } else { + // subsequent attempts should not happen if we complete the future + throw new AssertionError( + "Should not retry after future is completed externally"); + } + }); + + // execute: submit lookup + byte[] key = "key".getBytes(); + LookupQuery query = new LookupQuery(TABLE_BUCKET, key); + lookupQueue.appendLookup(query); + + // complete the future externally before retry happens + Thread.sleep(100); // wait for first attempt to fail + query.future().complete("external".getBytes()); + + // verify: completed externally + byte[] result = query.future().get(1, TimeUnit.SECONDS); + assertThat(result).isEqualTo("external".getBytes()); + // retries is 1 because we incremented it when re-enqueuing, but didn't send again + assertThat(query.retries()).isLessThanOrEqualTo(1); + assertThat(attemptCount.get()).isEqualTo(1); // only first attempt + } + + @Test + void testDifferentExceptionTypesHandledCorrectly() throws Exception { + // test multiple exception types + testException(new TimeoutException("timeout"), true, 3); // retriable, should retry + testException(new InvalidTableException("invalid"), false, 0); // non-retriable, no retry + testException(new TableNotExistException("not exist"), false, 0); // non-retriable, no retry + } + + @Test + void testPrefixLookupRetry() throws Exception { + // setup: fail twice with retriable exception, then succeed + AtomicInteger attemptCount = new AtomicInteger(0); + gateway.setPrefixLookupHandler( + request -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt <= 2) { + // first two attempts fail + return createFailedPrefixLookupResponse( + request, new TimeoutException("timeout")); + } else { + // third attempt succeeds + return createSuccessPrefixLookupResponse(request); + } + }); + + // execute: submit prefix lookup + byte[] prefixKey = "prefix".getBytes(); + PrefixLookupQuery query = new PrefixLookupQuery(TABLE_BUCKET, prefixKey); + lookupQueue.appendLookup(query); + + // verify: eventually succeeds after retries + query.future().get(5, TimeUnit.SECONDS); + assertThat(attemptCount.get()).isEqualTo(3); + assertThat(query.retries()).isEqualTo(2); + } + + @Test + void testMultipleConcurrentLookupsWithRetries() throws Exception { + // setup: first attempt fails, second succeeds + AtomicInteger attemptCount = new AtomicInteger(0); + gateway.setLookupHandler( + request -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt % 2 == 1) { + // odd attempts fail + return createFailedResponse(request, new TimeoutException("timeout")); + } else { + // even attempts succeed + return createSuccessResponse(request, ("value" + attempt).getBytes()); + } + }); + + // execute: submit multiple lookups + LookupQuery query1 = new LookupQuery(TABLE_BUCKET, "key1".getBytes()); + LookupQuery query2 = new LookupQuery(TABLE_BUCKET, "key2".getBytes()); + LookupQuery query3 = new LookupQuery(TABLE_BUCKET, "key3".getBytes()); + + lookupQueue.appendLookup(query1); + lookupQueue.appendLookup(query2); + lookupQueue.appendLookup(query3); + + // verify: all succeed after retries + assertThat(query1.future().get(5, TimeUnit.SECONDS)).isNotNull(); + assertThat(query2.future().get(5, TimeUnit.SECONDS)).isNotNull(); + assertThat(query3.future().get(5, TimeUnit.SECONDS)).isNotNull(); + // Note: lookups are batched together, so attemptCount reflects batch attempts, not + // individual lookups + assertThat(attemptCount.get()) + .isGreaterThanOrEqualTo(2); // at least 1 failure + 1 success for the batch + } + + // Helper methods + + private void testException(Exception exception, boolean shouldRetry, int expectedRetries) + throws Exception { + // reset gateway + AtomicInteger attemptCount = new AtomicInteger(0); + gateway.setLookupHandler( + request -> { + attemptCount.incrementAndGet(); + return createFailedResponse(request, exception); + }); + + // execute + byte[] key = ("key-" + exception.getClass().getSimpleName()).getBytes(); + LookupQuery query = new LookupQuery(TABLE_BUCKET, key); + lookupQueue.appendLookup(query); + + // verify + assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class); + + if (shouldRetry) { + assertThat(attemptCount.get()).isEqualTo(1 + MAX_RETRIES); + assertThat(query.retries()).isEqualTo(expectedRetries); + } else { + assertThat(attemptCount.get()).isEqualTo(1); // only initial attempt + assertThat(query.retries()).isEqualTo(expectedRetries); + } + + // wait a bit to ensure no more attempts + Thread.sleep(200); + } + + private CompletableFuture createSuccessResponse( + LookupRequest request, byte[] value) { + LookupResponse response = new LookupResponse(); + PbLookupRespForBucket bucketResp = response.addBucketsResp(); + bucketResp.setBucketId(TABLE_BUCKET.getBucket()); + if (TABLE_BUCKET.getPartitionId() != null) { + bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId()); + } + // Add value for each key in the request + int keyCount = request.getBucketsReqAt(0).getKeysCount(); + for (int i = 0; i < keyCount; i++) { + bucketResp.addValue().setValues(value); + } + return CompletableFuture.completedFuture(response); + } + + private CompletableFuture createFailedResponse( + LookupRequest request, Exception exception) { + LookupResponse response = new LookupResponse(); + PbLookupRespForBucket bucketResp = response.addBucketsResp(); + bucketResp.setBucketId(TABLE_BUCKET.getBucket()); + if (TABLE_BUCKET.getPartitionId() != null) { + bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId()); + } + ApiError error = ApiError.fromThrowable(exception); + bucketResp.setErrorCode(error.error().code()); + bucketResp.setErrorMessage(error.formatErrMsg()); + return CompletableFuture.completedFuture(response); + } + + private CompletableFuture createSuccessPrefixLookupResponse( + PrefixLookupRequest request) { + PrefixLookupResponse response = new PrefixLookupResponse(); + // Create response for each prefix key in request + var bucketResp = response.addBucketsResp(); + bucketResp.setBucketId(TABLE_BUCKET.getBucket()); + if (TABLE_BUCKET.getPartitionId() != null) { + bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId()); + } + // Add empty value list for each prefix key + int keyCount = request.getBucketsReqAt(0).getKeysCount(); + for (int i = 0; i < keyCount; i++) { + bucketResp.addValueList(); // empty list is valid for prefix lookup + } + return CompletableFuture.completedFuture(response); + } + + private CompletableFuture createFailedPrefixLookupResponse( + PrefixLookupRequest request, Exception exception) { + PrefixLookupResponse response = new PrefixLookupResponse(); + var bucketResp = response.addBucketsResp(); + bucketResp.setBucketId(TABLE_BUCKET.getBucket()); + if (TABLE_BUCKET.getPartitionId() != null) { + bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId()); + } + ApiError error = ApiError.fromThrowable(exception); + bucketResp.setErrorCode(error.error().code()); + bucketResp.setErrorMessage(error.formatErrMsg()); + return CompletableFuture.completedFuture(response); + } + + /** + * A configurable {@link TabletServerGateway} for testing that allows setting custom handlers + * for lookup operations. + */ + private static class ConfigurableTestTabletServerGateway extends TestTabletServerGateway { + + private java.util.function.Function> + lookupHandler; + private java.util.function.Function< + PrefixLookupRequest, CompletableFuture> + prefixLookupHandler; + + public ConfigurableTestTabletServerGateway() { + super(false); + } + + public void setLookupHandler( + java.util.function.Function> + handler) { + this.lookupHandler = handler; + } + + public void setPrefixLookupHandler( + java.util.function.Function< + PrefixLookupRequest, CompletableFuture> + handler) { + this.prefixLookupHandler = handler; + } + + @Override + public CompletableFuture lookup(LookupRequest request) { + if (lookupHandler != null) { + return lookupHandler.apply(request); + } + return CompletableFuture.completedFuture(new LookupResponse()); + } + + @Override + public CompletableFuture prefixLookup(PrefixLookupRequest request) { + if (prefixLookupHandler != null) { + return prefixLookupHandler.apply(request); + } + return CompletableFuture.completedFuture(new PrefixLookupResponse()); + } + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java index 4420a2ebbe..f24ec66a2a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java @@ -55,22 +55,72 @@ public class TestingMetadataUpdater extends MetadataUpdater { private final Map tabletServerGatewayMap; public TestingMetadataUpdater(Map tableInfos) { - this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos); + this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos, null); } private TestingMetadataUpdater( ServerNode coordinatorServer, List tabletServers, - Map tableInfos) { + Map tableInfos, + Map customGateways) { super( RpcClient.create( new Configuration(), TestingClientMetricGroup.newInstance(), false), Cluster.empty()); initializeCluster(coordinatorServer, tabletServers, tableInfos); coordinatorGateway = new TestCoordinatorGateway(); - tabletServerGatewayMap = new HashMap<>(); - for (ServerNode tabletServer : tabletServers) { - tabletServerGatewayMap.put(tabletServer.id(), new TestTabletServerGateway(false)); + if (customGateways != null) { + tabletServerGatewayMap = customGateways; + } else { + tabletServerGatewayMap = new HashMap<>(); + for (ServerNode tabletServer : tabletServers) { + tabletServerGatewayMap.put(tabletServer.id(), new TestTabletServerGateway(false)); + } + } + } + + /** + * Create a builder for constructing TestingMetadataUpdater with custom gateways. + * + * @param tableInfos the table information map + * @return a builder instance + */ + public static Builder builder(Map tableInfos) { + return new Builder(tableInfos); + } + + /** Builder for TestingMetadataUpdater to support custom gateway configuration. */ + public static class Builder { + private final Map tableInfos; + private final Map customGateways = new HashMap<>(); + + private Builder(Map tableInfos) { + this.tableInfos = tableInfos; + } + + /** + * Set a custom gateway for a specific tablet server node. + * + * @param serverId the server id (1, 2, or 3 for default nodes) + * @param gateway the custom gateway + * @return this builder + */ + public Builder withTabletServerGateway(int serverId, TestTabletServerGateway gateway) { + customGateways.put(serverId, gateway); + return this; + } + + /** + * Build the TestingMetadataUpdater instance. + * + * @return the configured TestingMetadataUpdater + */ + public TestingMetadataUpdater build() { + return new TestingMetadataUpdater( + COORDINATOR, + Arrays.asList(NODE1, NODE2, NODE3), + tableInfos, + customGateways.isEmpty() ? null : customGateways); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 6b045bb389..875e875d5b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1112,6 +1112,14 @@ public class ConfigOptions { "The maximum time to wait for the lookup batch to full, if this timeout is reached, " + "the lookup batch will be closed to send."); + public static final ConfigOption CLIENT_LOOKUP_MAX_RETRIES = + key("client.lookup.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "Setting a value greater than zero will cause the client to resend any lookup request " + + "that fails with a potentially transient error."); + public static final ConfigOption CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM = key("client.scanner.remote-log.prefetch-num") .intType() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 92579e3ac0..0b9737302a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -244,6 +244,13 @@ private static Configuration toFlussClientConfig( } }); + // map flink lookup.max-retries to client.lookup.max-retries + if (tableOptions.containsKey(LookupOptions.MAX_RETRIES.key())) { + flussConfig.setString( + ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES.key(), + tableOptions.get(LookupOptions.MAX_RETRIES.key())); + } + // pass flink io tmp dir to fluss client. flussConfig.setString( ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java index 716a74199f..92ac0aa492 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -58,7 +58,6 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { private final Configuration flussConfig; private final TablePath tablePath; - private final int maxRetryTimes; private final RowType flinkRowType; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; @@ -78,7 +77,6 @@ public FlinkAsyncLookupFunction( @Nullable int[] projection) { this.flussConfig = flussConfig; this.tablePath = tablePath; - this.maxRetryTimes = maxRetryTimes; this.flinkRowType = flinkRowType; this.lookupNormalizer = lookupNormalizer; this.projection = projection; @@ -121,90 +119,50 @@ public CompletableFuture> asyncLookup(RowData keyRow) { RowData normalizedKeyRow = lookupNormalizer.normalizeLookupKey(keyRow); RemainingFilter remainingFilter = lookupNormalizer.createRemainingFilter(keyRow); InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow); - CompletableFuture> future = new CompletableFuture<>(); - // fetch result - fetchResult(future, 0, flussKeyRow, remainingFilter); - return future; - } - /** - * Execute async fetch result . - * - * @param resultFuture The result or exception is returned. - * @param currentRetry Current number of retries. - * @param keyRow the key row to get. - * @param remainingFilter the nullable remaining filter to filter the result. - */ - private void fetchResult( - CompletableFuture> resultFuture, - int currentRetry, - InternalRow keyRow, - @Nullable RemainingFilter remainingFilter) { - lookuper.lookup(keyRow) + // the retry mechanism is now handled by the underlying LookupClient layer + CompletableFuture> future = new CompletableFuture<>(); + lookuper.lookup(flussKeyRow) .whenComplete( (result, throwable) -> { if (throwable != null) { - handleLookupFailed( - resultFuture, - throwable, - currentRetry, - keyRow, - remainingFilter); + if (throwable instanceof TableNotExistException) { + LOG.error("Table '{}' not found ", tablePath, throwable); + future.completeExceptionally( + new RuntimeException( + "Fluss table '" + tablePath + "' not found.", + throwable)); + } else { + LOG.error("Fluss asyncLookup error", throwable); + future.completeExceptionally( + new RuntimeException( + "Execution of Fluss asyncLookup failed: " + + throwable.getMessage(), + throwable)); + } } else { - handleLookupSuccess( - resultFuture, result.getRowList(), remainingFilter); + List lookupResult = result.getRowList(); + if (lookupResult.isEmpty()) { + future.complete(Collections.emptyList()); + return; + } + + List projectedRow = new ArrayList<>(); + for (InternalRow row : lookupResult) { + if (row != null) { + RowData flinkRow = + flussRowToFlinkRowConverter.toFlinkRowData( + maybeProject(row)); + if (remainingFilter == null + || remainingFilter.isMatch(flinkRow)) { + projectedRow.add(flinkRow); + } + } + } + future.complete(projectedRow); } }); - } - - private void handleLookupFailed( - CompletableFuture> resultFuture, - Throwable throwable, - int currentRetry, - InternalRow keyRow, - @Nullable RemainingFilter remainingFilter) { - if (throwable instanceof TableNotExistException) { - LOG.error("Table '{}' not found ", tablePath, throwable); - resultFuture.completeExceptionally( - new RuntimeException("Fluss table '" + tablePath + "' not found.", throwable)); - } else { - LOG.error("Fluss asyncLookup error, retry times = {}", currentRetry, throwable); - if (currentRetry >= maxRetryTimes) { - String exceptionMsg = - String.format( - "Execution of Fluss asyncLookup failed: %s, retry times = %d.", - throwable.getMessage(), currentRetry); - resultFuture.completeExceptionally(new RuntimeException(exceptionMsg, throwable)); - } else { - try { - Thread.sleep(1000L * currentRetry); - } catch (InterruptedException e1) { - resultFuture.completeExceptionally(e1); - } - fetchResult(resultFuture, currentRetry + 1, keyRow, remainingFilter); - } - } - } - - private void handleLookupSuccess( - CompletableFuture> resultFuture, - List lookupResult, - @Nullable RemainingFilter remainingFilter) { - if (lookupResult.isEmpty()) { - resultFuture.complete(Collections.emptyList()); - return; - } - - List projectedRow = new ArrayList<>(); - for (InternalRow row : lookupResult) { - if (row != null) { - RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); - if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { - projectedRow.add(flinkRow); - } - } - } - resultFuture.complete(projectedRow); + return future; } private InternalRow maybeProject(InternalRow row) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java index 480247f0f9..6637ffb42e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java @@ -54,7 +54,6 @@ public class FlinkLookupFunction extends LookupFunction { private final Configuration flussConfig; private final TablePath tablePath; - private final int maxRetryTimes; private final RowType flinkRowType; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; @@ -75,7 +74,6 @@ public FlinkLookupFunction( @Nullable int[] projection) { this.flussConfig = flussConfig; this.tablePath = tablePath; - this.maxRetryTimes = maxRetryTimes; this.flinkRowType = flinkRowType; this.lookupNormalizer = lookupNormalizer; this.projection = projection; @@ -124,41 +122,28 @@ public Collection lookup(RowData keyRow) { lookupNormalizer.createRemainingFilter(keyRow); // wrap flink row as fluss row to lookup, the flink row has already been in expected order. InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow); - for (int retry = 0; retry <= maxRetryTimes; retry++) { - try { - List lookupRows = lookuper.lookup(flussKeyRow).get().getRowList(); - if (lookupRows.isEmpty()) { - return Collections.emptyList(); - } - List projectedRows = new ArrayList<>(); - for (InternalRow row : lookupRows) { - if (row != null) { - RowData flinkRow = - flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); - if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { - projectedRows.add(flinkRow); - } - } - } - return projectedRows; - } catch (Exception e) { - LOG.error(String.format("Fluss lookup error, retry times = %d", retry), e); - if (retry >= maxRetryTimes) { - String exceptionMsg = - String.format( - "Execution of Fluss lookup failed, retry times = %d.", retry); - throw new RuntimeException(exceptionMsg, e); - } - try { - Thread.sleep(1000L * retry); - } catch (InterruptedException interruptedException) { - Thread.currentThread().interrupt(); - throw new RuntimeException(interruptedException); + // the retry mechanism is now handled by the underlying LookupClient layer + try { + List lookupRows = lookuper.lookup(flussKeyRow).get().getRowList(); + if (lookupRows.isEmpty()) { + return Collections.emptyList(); + } + List projectedRows = new ArrayList<>(); + for (InternalRow row : lookupRows) { + if (row != null) { + RowData flinkRow = + flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); + if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { + projectedRows.add(flinkRow); + } } } + return projectedRows; + } catch (Exception e) { + LOG.error("Fluss lookup error", e); + throw new RuntimeException("Execution of Fluss lookup failed: " + e.getMessage(), e); } - return Collections.emptyList(); } private InternalRow maybeProject(InternalRow row) { From 49e56c03a0752f302f828528b58b87037b51da6e Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Wed, 5 Nov 2025 11:03:08 +0800 Subject: [PATCH 3/4] refact --- .../fluss/flink/source/FlinkTableSource.java | 2 - .../lookup/FlinkAsyncLookupFunction.java | 47 ++++++++++--------- .../source/lookup/FlinkLookupFunction.java | 3 +- .../fluss/flink/utils/PushdownUtils.java | 1 - .../lookup/FlinkLookupFunctionTest.java | 3 -- 5 files changed, 27 insertions(+), 29 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index e7c7357e3b..84e2af93c8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -412,7 +412,6 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { flussConfig, tablePath, tableOutputType, - lookupMaxRetryTimes, lookupNormalizer, projectedFields); if (cache != null) { @@ -426,7 +425,6 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { flussConfig, tablePath, tableOutputType, - lookupMaxRetryTimes, lookupNormalizer, projectedFields); if (cache != null) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java index 92ac0aa492..3d23d3346f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -33,6 +33,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.utils.ExceptionUtils; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.AsyncLookupFunction; @@ -72,7 +73,6 @@ public FlinkAsyncLookupFunction( Configuration flussConfig, TablePath tablePath, RowType flinkRowType, - int maxRetryTimes, LookupNormalizer lookupNormalizer, @Nullable int[] projection) { this.flussConfig = flussConfig; @@ -126,7 +126,9 @@ public CompletableFuture> asyncLookup(RowData keyRow) { .whenComplete( (result, throwable) -> { if (throwable != null) { - if (throwable instanceof TableNotExistException) { + if (ExceptionUtils.findThrowable( + throwable, TableNotExistException.class) + .isPresent()) { LOG.error("Table '{}' not found ", tablePath, throwable); future.completeExceptionally( new RuntimeException( @@ -141,30 +143,33 @@ public CompletableFuture> asyncLookup(RowData keyRow) { throwable)); } } else { - List lookupResult = result.getRowList(); - if (lookupResult.isEmpty()) { - future.complete(Collections.emptyList()); - return; - } - - List projectedRow = new ArrayList<>(); - for (InternalRow row : lookupResult) { - if (row != null) { - RowData flinkRow = - flussRowToFlinkRowConverter.toFlinkRowData( - maybeProject(row)); - if (remainingFilter == null - || remainingFilter.isMatch(flinkRow)) { - projectedRow.add(flinkRow); - } - } - } - future.complete(projectedRow); + handleLookupSuccess(future, result.getRowList(), remainingFilter); } }); return future; } + private void handleLookupSuccess( + CompletableFuture> resultFuture, + List lookupResult, + @Nullable RemainingFilter remainingFilter) { + if (lookupResult.isEmpty()) { + resultFuture.complete(Collections.emptyList()); + return; + } + + List projectedRow = new ArrayList<>(); + for (InternalRow row : lookupResult) { + if (row != null) { + RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); + if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { + projectedRow.add(flinkRow); + } + } + } + resultFuture.complete(projectedRow); + } + private InternalRow maybeProject(InternalRow row) { if (projection == null) { return row; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java index 6637ffb42e..79418f9b20 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java @@ -69,7 +69,6 @@ public FlinkLookupFunction( Configuration flussConfig, TablePath tablePath, RowType flinkRowType, - int maxRetryTimes, LookupNormalizer lookupNormalizer, @Nullable int[] projection) { this.flussConfig = flussConfig; @@ -123,7 +122,7 @@ public Collection lookup(RowData keyRow) { // wrap flink row as fluss row to lookup, the flink row has already been in expected order. InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow); - // the retry mechanism is now handled by the underlying LookupClient layer + // the retry mechanism will be handled by the underlying LookupClient layer try { List lookupRows = lookuper.lookup(flussKeyRow).get().getRowList(); if (lookupRows.isEmpty()) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java index e32e0752c7..8ecd133ede 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java @@ -271,7 +271,6 @@ public static Collection querySingleRow( flussConfig, tablePath, sourceOutputType, - lookupMaxRetryTimes, lookupNormalizer, projectedFields); try { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java index a68350a0c9..297c6a16d2 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java @@ -23,7 +23,6 @@ import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TablePath; -import org.apache.flink.table.connector.source.lookup.LookupOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.types.logical.RowType; @@ -57,7 +56,6 @@ void testSyncLookupEval() throws Exception { clientConf, tablePath, flinkRowType, - LookupOptions.MAX_RETRIES.defaultValue(), createPrimaryKeyLookupNormalizer(new int[] {0}, flinkRowType), null); @@ -95,7 +93,6 @@ void testAsyncLookupEval() throws Exception { clientConf, tablePath, flinkRowType, - LookupOptions.MAX_RETRIES.defaultValue(), createPrimaryKeyLookupNormalizer(new int[] {0}, flinkRowType), null); asyncLookupFunction.open(null); From 443c9a2b4090874ee0b48cc444c3ca6fd1f699d6 Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Wed, 5 Nov 2025 11:23:35 +0800 Subject: [PATCH 4/4] fix compile issue --- .../org/apache/fluss/client/lookup/LookupSenderTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java index 5db92be330..6d6b95ee0e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java @@ -29,6 +29,7 @@ import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.LookupResponse; import org.apache.fluss.rpc.messages.PbLookupRespForBucket; +import org.apache.fluss.rpc.messages.PbPrefixLookupRespForBucket; import org.apache.fluss.rpc.messages.PrefixLookupRequest; import org.apache.fluss.rpc.messages.PrefixLookupResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -354,7 +355,7 @@ private CompletableFuture createSuccessPrefixLookupRespons PrefixLookupRequest request) { PrefixLookupResponse response = new PrefixLookupResponse(); // Create response for each prefix key in request - var bucketResp = response.addBucketsResp(); + PbPrefixLookupRespForBucket bucketResp = response.addBucketsResp(); bucketResp.setBucketId(TABLE_BUCKET.getBucket()); if (TABLE_BUCKET.getPartitionId() != null) { bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId()); @@ -370,7 +371,7 @@ private CompletableFuture createSuccessPrefixLookupRespons private CompletableFuture createFailedPrefixLookupResponse( PrefixLookupRequest request, Exception exception) { PrefixLookupResponse response = new PrefixLookupResponse(); - var bucketResp = response.addBucketsResp(); + PbPrefixLookupRespForBucket bucketResp = response.addBucketsResp(); bucketResp.setBucketId(TABLE_BUCKET.getBucket()); if (TABLE_BUCKET.getPartitionId() != null) { bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());