Skip to content

[FLINK-37724] Adds AsyncTableFunction as a fully supported UDF type #26567

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<td><h5>table.exec.async-scalar.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>The max number of async i/o operation that the async lookup join can trigger.</td>
<td>The max number of async i/o operation that the async scalar function can trigger.</td>
</tr>
<tr>
<td><h5>table.exec.async-scalar.max-attempts</h5><br> <span class="label label-primary">Streaming</span></td>
Expand Down Expand Up @@ -87,6 +87,36 @@
<td>Boolean</td>
<td>Set whether to use the SQL/Table operators based on the asynchronous state api. Default value is false.</td>
</tr>
<tr>
<td><h5>table.exec.async-table.max-concurrent-operations</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>The max number of concurrent async i/o operations that the async table function can trigger.</td>
</tr>
<tr>
<td><h5>table.exec.async-table.max-retries</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>The max number of async retry attempts to make before task execution is failed.</td>
</tr>
<tr>
<td><h5>table.exec.async-table.retry-delay</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">100 ms</td>
<td>Duration</td>
<td>The delay to wait before trying again.</td>
</tr>
<tr>
<td><h5>table.exec.async-table.retry-strategy</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">FIXED_DELAY</td>
<td><p>Enum</p></td>
<td>Restart strategy which will be used, FIXED_DELAY by default.<br /><br />Possible values:<ul><li>"NO_RETRY"</li><li>"FIXED_DELAY"</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.async-table.timeout</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">3 min</td>
<td>Duration</td>
<td>The async timeout for the asynchronous operation to complete, including any retries which may occur.</td>
</tr>
<tr>
<td><h5>table.exec.deduplicate.insert-update-after-sensitive-enabled</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.shaded.asm9.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm9.org.objectweb.asm.Type.getMethodDescriptor;
Expand Down Expand Up @@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, Type t) {
+ "Otherwise the type has to be specified explicitly using type information.");
}
}

/**
* Will return true if the type of the given generic class type matches clazz.
*
* @param clazz The generic class to check against
* @param type The type to be checked
*/
public static boolean isGenericOfClass(Class<?> clazz, Type type) {
Optional<ParameterizedType> parameterized = getParameterizedType(type);
return clazz.equals(type)
|| parameterized.isPresent() && clazz.equals(parameterized.get().getRawType());
}

/**
* Returns an optional of a ParameterizedType, if that's what the type is.
*
* @param type The type to check
* @return optional which is present if the type is a ParameterizedType
*/
public static Optional<ParameterizedType> getParameterizedType(Type type) {
return Optional.of(type)
.filter(p -> p instanceof ParameterizedType)
.map(ParameterizedType.class::cast);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils;

import org.junit.jupiter.api.Test;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link TypeExtractionUtils}. */
@SuppressWarnings("rawtypes")
public class TypeExtractionUtilsTest {

@Test
void testIsGeneric() throws Exception {
Method method = getMethod(IsGeneric.class, "m1");
Type firstParam = method.getGenericParameterTypes()[0];
assertThat(TypeExtractionUtils.isGenericOfClass(List.class, firstParam)).isTrue();

method = getMethod(IsGeneric.class, "m2");
firstParam = method.getGenericParameterTypes()[0];
assertThat(TypeExtractionUtils.isGenericOfClass(List.class, firstParam)).isTrue();
}

@Test
void testGetParameterizedType() throws Exception {
Method method = getMethod(IsGeneric.class, "m1");
Type firstParam = method.getGenericParameterTypes()[0];
Optional<ParameterizedType> parameterizedType =
TypeExtractionUtils.getParameterizedType(firstParam);
assertThat(parameterizedType).isPresent();
assertThat(parameterizedType.get().getRawType()).isEqualTo(List.class);
assertThat(parameterizedType.get().getActualTypeArguments()[0]).isEqualTo(Integer.class);

method = getMethod(IsGeneric.class, "m2");
firstParam = method.getGenericParameterTypes()[0];
assertThat(TypeExtractionUtils.getParameterizedType(firstParam)).isEmpty();
}

private Method getMethod(Class<?> clazz, String name) throws Exception {
return getAllDeclaredMethods(clazz).stream()
.filter(m -> m.getName().equals(name))
.findFirst()
.orElseThrow();
}

public static class IsGeneric {
public void m1(List<Integer> list) {}

public void m2(List list) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public class ExecutionConfigOptions {
.intType()
.defaultValue(10)
.withDescription(
"The max number of async i/o operation that the async lookup join can trigger.");
"The max number of async i/o operation that the async scalar function can trigger.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_SCALAR_TIMEOUT =
Expand Down Expand Up @@ -456,6 +456,49 @@ public class ExecutionConfigOptions {
"The max number of async retry attempts to make before task "
+ "execution is failed.");

// ------------------------------------------------------------------------
// Async Table Function
// ------------------------------------------------------------------------
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_TABLE_MAX_CONCURRENT_OPERATIONS =
key("table.exec.async-table.max-concurrent-operations")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks we changed the naming here. @becketqin , should we change in https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java#L465 as well to make it consistent? ml_predict is not released yet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that makes sense. BTW, if we have a FLIP, we should update the FLIP to make it aligned with the code.

Admittedly, changing the config name here causes the inconsistent config name in AsyncScalarFunction. We can add the new config key and change the old config keys as deprecated config key.

Another thing is that at this point, our design principle for configs in SQL seems not consistent. Some of the configs are set using hints, while others are in the execution config. It would be good to have a common design principle, so users don't need to guess how to config the physical behaviors.

Copy link
Contributor Author

@AlanConfluent AlanConfluent Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admittedly, changing the config name here causes the inconsistent config name in AsyncScalarFunction. We can add the new config key and change the old config keys as deprecated config key.

Should I do that as part of this PR or have a followup for AsyncScalarFunction?

Another thing is that at this point, our design principle for configs in SQL seems not consistent. Some of the configs are set using hints, while others are in the execution config. It would be good to have a common design principle, so users don't need to guess how to config the physical behaviors.

I agree. It's a bit inconsistent. I think all async operations (scalar, table, lookup joins, ml_predict) support the execution config, though only lookup joins at the moment also support hints. Hints support could certainly be added in the future to others. I can file jiras for each feature to add hint support, if it can be retroactively added to old FLIPs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Allan.

Should I do that as part of this PR or have a followup for AsyncScalarFunction?
Let's make it a separate PR. For the config name change in AsyncScalarFunction, I am thinking that maybe we can do the following:

  1. Reply to the vote thread of the AsyncScalarFunction FLIP to let people know that we want to use the new config name, which is more intuitive. And for backwards compatibility, we are going to make the old key as deprecated key.
  2. If there is no objection, we can update the AsyncScalarFunction FLIP to add the description of this change.

I agree. It's a bit inconsistent. I think all async operations (scalar, table, lookup joins, ml_predict) support the execution config, though only lookup joins at the moment also support hints. Hints support could certainly be added in the future to others. I can file jiras for each feature to add hint support, if it can be retroactively added to old FLIPs.
Given that these are addition of new things to multiple components. It might make sense to have a new separate FLIP, which aims to align all the SQL physical behavior to support hints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked in FLIP-525 voting thread to make the config table.exec.async-ml-predict.buffer-capacity to table.exec.async-ml-predict.max-concurrent-operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reply to the vote thread of the AsyncScalarFunction FLIP to let people know that we want to use the new config name, which is more intuitive. And for backwards compatibility, we are going to make the old key as deprecated key.
If there is no objection, we can update the AsyncScalarFunction FLIP to add the description of this change.

Ok, that all sounds good. I'll do that.

Given that these are addition of new things to multiple components. It might make sense to have a new separate FLIP, which aims to align all the SQL physical behavior to support hints.

I agree, that seems good to have a single FLIP if the aim is to make them all consistent.

.intType()
.defaultValue(10)
.withDescription(
"The max number of concurrent async i/o operations that the async table function can trigger.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_TIMEOUT =
key("table.exec.async-table.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(3))
.withDescription(
"The async timeout for the asynchronous operation to complete, including any retries which may occur.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<RetryStrategy> TABLE_EXEC_ASYNC_TABLE_RETRY_STRATEGY =
key("table.exec.async-table.retry-strategy")
.enumType(RetryStrategy.class)
.defaultValue(RetryStrategy.FIXED_DELAY)
.withDescription(
"Restart strategy which will be used, FIXED_DELAY by default.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_RETRY_DELAY =
key("table.exec.async-table.retry-delay")
.durationType()
.defaultValue(Duration.ofMillis(100))
.withDescription("The delay to wait before trying again.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_TABLE_MAX_RETRIES =
key("table.exec.async-table.max-retries")
.intType()
.defaultValue(3)
.withDescription(
"The max number of async retry attempts to make before task "
+ "execution is failed.");

// ------------------------------------------------------------------------
// Async ML_PREDICT Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getParameterizedType;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isGenericOfClass;
import static org.apache.flink.util.Preconditions.checkState;

/**
Expand Down Expand Up @@ -477,11 +481,12 @@ private static void validateImplementationMethods(
validateImplementationMethod(functionClass, false, false, SCALAR_EVAL);
} else if (AsyncScalarFunction.class.isAssignableFrom(functionClass)) {
validateImplementationMethod(functionClass, false, false, ASYNC_SCALAR_EVAL);
validateAsyncImplementationMethod(functionClass, ASYNC_SCALAR_EVAL);
validateAsyncImplementationMethod(functionClass, false, ASYNC_SCALAR_EVAL);
} else if (TableFunction.class.isAssignableFrom(functionClass)) {
validateImplementationMethod(functionClass, true, false, TABLE_EVAL);
} else if (AsyncTableFunction.class.isAssignableFrom(functionClass)) {
validateImplementationMethod(functionClass, true, false, ASYNC_TABLE_EVAL);
validateAsyncImplementationMethod(functionClass, true, ASYNC_TABLE_EVAL);
} else if (AggregateFunction.class.isAssignableFrom(functionClass)) {
validateImplementationMethod(functionClass, true, false, AGGREGATE_ACCUMULATE);
validateImplementationMethod(functionClass, true, true, AGGREGATE_RETRACT);
Expand Down Expand Up @@ -541,7 +546,9 @@ private static void validateImplementationMethod(
}

private static void validateAsyncImplementationMethod(
Class<? extends UserDefinedFunction> clazz, String... methodNameOptions) {
Class<? extends UserDefinedFunction> clazz,
boolean verifyFutureContainsCollection,
String... methodNameOptions) {
final Set<String> nameSet = new HashSet<>(Arrays.asList(methodNameOptions));
final List<Method> methods = getAllDeclaredMethods(clazz);
for (Method method : methods) {
Expand All @@ -558,18 +565,31 @@ private static void validateAsyncImplementationMethod(
if (method.getParameterCount() >= 1) {
Type firstParam = method.getGenericParameterTypes()[0];
firstParam = ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam);
if (CompletableFuture.class.equals(firstParam)
|| firstParam instanceof ParameterizedType
&& CompletableFuture.class.equals(
((ParameterizedType) firstParam).getRawType())) {
foundParam = true;
if (isGenericOfClass(CompletableFuture.class, firstParam)) {
Optional<ParameterizedType> parameterized = getParameterizedType(firstParam);
if (!verifyFutureContainsCollection) {
foundParam = true;
} else if (parameterized.isPresent()
&& parameterized.get().getActualTypeArguments().length > 0) {
Type firstTypeArgument = parameterized.get().getActualTypeArguments()[0];
if (isGenericOfClass(Collection.class, firstTypeArgument)) {
foundParam = true;
}
}
}
}
if (!foundParam) {
throw new ValidationException(
String.format(
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.",
method.getName(), clazz.getName()));
if (!verifyFutureContainsCollection) {
throw new ValidationException(
String.format(
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.",
method.getName(), clazz.getName()));
} else {
throw new ValidationException(
String.format(
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture<java.util.Collection>.",
method.getName(), clazz.getName()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getParameterizedType;
import static org.apache.flink.table.types.extraction.ExtractionUtils.collectAnnotationsOfClass;
import static org.apache.flink.table.types.extraction.ExtractionUtils.collectAnnotationsOfMethod;
import static org.apache.flink.table.types.extraction.ExtractionUtils.extractionError;
Expand Down Expand Up @@ -208,7 +210,8 @@ static MethodVerification createParameterVerification(boolean requireAccumulator
* Verification that checks a method by parameters (arguments only) with mandatory {@link
* CompletableFuture}.
*/
static MethodVerification createParameterAndCompletableFutureVerification(Class<?> baseClass) {
static MethodVerification createParameterAndCompletableFutureVerification(
Class<?> baseClass, boolean verifyFutureContainsCollection) {
return (method, state, arguments, result) -> {
checkNoState(state);
checkScalarArgumentsOnly(arguments);
Expand All @@ -220,12 +223,29 @@ static MethodVerification createParameterAndCompletableFutureVerification(Class<
final Class<?> resultClass = result.toClass();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Existing code. I am curious why we put an assertion here as this is an argument passed by our code. If we put assertion here, then the question is why not put it in other places as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that. It seems a bit overly defensive. I think it's there to verify that it's validating an output template rather than a state template. Obviously, in the place where this is being called, it should never have state. Maybe a better MethodVerification interface could have methods to validate them separately.

Type genericType = method.getGenericParameterTypes()[0];
genericType = resolveVariableWithClassContext(baseClass, genericType);
if (!(genericType instanceof ParameterizedType)) {
Optional<ParameterizedType> parameterized = getParameterizedType(genericType);
if (!parameterized.isPresent()) {
throw extractionError(
"The method '%s' needs generic parameters for the CompletableFuture at position %d.",
method.getName(), 0);
}
final Type returnType = ((ParameterizedType) genericType).getActualTypeArguments()[0];
// If verifyFutureContainsCollection is given, it is assumed to be a generic parameters
// of argumentClass, also at the position genericPos
final Type returnType;
if (verifyFutureContainsCollection) {
Type nestedGenericType = parameterized.get().getActualTypeArguments()[0];
Optional<ParameterizedType> nestedParameterized =
getParameterizedType(nestedGenericType);
if (!nestedParameterized.isPresent()
|| !nestedParameterized.get().getRawType().equals(Collection.class)) {
throw extractionError(
"The method '%s' expects nested generic type CompletableFuture<Collection> for the %d arg.",
method.getName(), 0);
}
returnType = nestedParameterized.get().getActualTypeArguments()[0];
} else {
returnType = parameterized.get().getActualTypeArguments()[0];
}
Class<?> returnTypeClass = getClassFromType(returnType);
// Parameters should be validated using strict autoboxing.
// For return types, we can be more flexible as the UDF should know what it declared.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static TypeInference forAsyncScalarFunction(
null,
null,
createOutputFromGenericInMethod(0, 0, true),
createParameterAndCompletableFutureVerification(function));
createParameterAndCompletableFutureVerification(function, false));
return extractTypeInference(mappingExtractor, false);
}

Expand Down Expand Up @@ -172,7 +172,7 @@ public static TypeInference forAsyncTableFunction(
null,
null,
createOutputFromGenericInClass(AsyncTableFunction.class, 0, true),
createParameterAndCompletableFutureVerification(function));
createParameterAndCompletableFutureVerification(function, true));
return extractTypeInference(mappingExtractor, false);
}

Expand Down
Loading