Skip to content

Commit e1b0ed6

Browse files
committed
[FLINK-37724] Adds AsyncTableFunction as a fully supported UDF type
1 parent 9d0f915 commit e1b0ed6

File tree

53 files changed

+3285
-85
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3285
-85
lines changed

docs/layouts/shortcodes/generated/execution_config_configuration.html

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<td><h5>table.exec.async-scalar.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td>
3131
<td style="word-wrap: break-word;">10</td>
3232
<td>Integer</td>
33-
<td>The max number of async i/o operation that the async lookup join can trigger.</td>
33+
<td>The max number of async i/o operation that the async scalar function can trigger.</td>
3434
</tr>
3535
<tr>
3636
<td><h5>table.exec.async-scalar.max-attempts</h5><br> <span class="label label-primary">Streaming</span></td>
@@ -62,6 +62,36 @@
6262
<td>Boolean</td>
6363
<td>Set whether to use the SQL/Table operators based on the asynchronous state api. Default value is false.</td>
6464
</tr>
65+
<tr>
66+
<td><h5>table.exec.async-table.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td>
67+
<td style="word-wrap: break-word;">10</td>
68+
<td>Integer</td>
69+
<td>The max number of async i/o operations that the async table function can trigger.</td>
70+
</tr>
71+
<tr>
72+
<td><h5>table.exec.async-table.max-attempts</h5><br> <span class="label label-primary">Streaming</span></td>
73+
<td style="word-wrap: break-word;">3</td>
74+
<td>Integer</td>
75+
<td>The max number of async retry attempts to make before task execution is failed.</td>
76+
</tr>
77+
<tr>
78+
<td><h5>table.exec.async-table.retry-delay</h5><br> <span class="label label-primary">Streaming</span></td>
79+
<td style="word-wrap: break-word;">100 ms</td>
80+
<td>Duration</td>
81+
<td>The delay to wait before trying again.</td>
82+
</tr>
83+
<tr>
84+
<td><h5>table.exec.async-table.retry-strategy</h5><br> <span class="label label-primary">Streaming</span></td>
85+
<td style="word-wrap: break-word;">FIXED_DELAY</td>
86+
<td><p>Enum</p></td>
87+
<td>Restart strategy which will be used, FIXED_DELAY by default.<br /><br />Possible values:<ul><li>"NO_RETRY"</li><li>"FIXED_DELAY"</li></ul></td>
88+
</tr>
89+
<tr>
90+
<td><h5>table.exec.async-table.timeout</h5><br> <span class="label label-primary">Streaming</span></td>
91+
<td style="word-wrap: break-word;">3 min</td>
92+
<td>Duration</td>
93+
<td>The async timeout for the asynchronous operation to complete.</td>
94+
</tr>
6595
<tr>
6696
<td><h5>table.exec.deduplicate.insert-update-after-sensitive-enabled</h5><br> <span class="label label-primary">Streaming</span></td>
6797
<td style="word-wrap: break-word;">true</td>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<table class="configuration table table-bordered">
2+
<thead>
3+
<tr>
4+
<th class="text-left" style="width: 20%">Key</th>
5+
<th class="text-left" style="width: 15%">Default</th>
6+
<th class="text-left" style="width: 10%">Type</th>
7+
<th class="text-left" style="width: 55%">Description</th>
8+
</tr>
9+
</thead>
10+
<tbody>
11+
<tr>
12+
<td><h5>sink.committer.retries</h5></td>
13+
<td style="word-wrap: break-word;">10</td>
14+
<td>Integer</td>
15+
<td>The number of retries a Flink application attempts for committable operations (such as transactions) on retriable errors, as specified by the sink connector, before Flink fails and potentially restarts.</td>
16+
</tr>
17+
</tbody>
18+
</table>

flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.ArrayList;
3535
import java.util.Collections;
3636
import java.util.List;
37+
import java.util.Optional;
3738

3839
import static org.apache.flink.shaded.asm9.org.objectweb.asm.Type.getConstructorDescriptor;
3940
import static org.apache.flink.shaded.asm9.org.objectweb.asm.Type.getMethodDescriptor;
@@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, Type t) {
373374
+ "Otherwise the type has to be specified explicitly using type information.");
374375
}
375376
}
377+
378+
/**
379+
* Will return true if the type of the given generic class type.
380+
*
381+
* @param clazz The generic class to check against
382+
* @param type The type to be checked
383+
*/
384+
public static boolean isGenericOfClass(Class<?> clazz, Type type) {
385+
Optional<ParameterizedType> parameterized = getParameterizedType(type);
386+
return clazz.equals(type)
387+
|| parameterized.isPresent() && clazz.equals(parameterized.get().getRawType());
388+
}
389+
390+
/**
391+
* Returns an optional of a ParameterizedType, if that's what the type is.
392+
*
393+
* @param type The type to check
394+
* @return optional which is present if the type is a ParameterizedType
395+
*/
396+
public static Optional<ParameterizedType> getParameterizedType(Type type) {
397+
return Optional.of(type)
398+
.filter(p -> p instanceof ParameterizedType)
399+
.map(ParameterizedType.class::cast);
400+
}
376401
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ public class ExecutionConfigOptions {
409409
.intType()
410410
.defaultValue(10)
411411
.withDescription(
412-
"The max number of async i/o operation that the async lookup join can trigger.");
412+
"The max number of async i/o operation that the async scalar function can trigger.");
413413

414414
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
415415
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_SCALAR_TIMEOUT =
@@ -443,6 +443,49 @@ public class ExecutionConfigOptions {
443443
"The max number of async retry attempts to make before task "
444444
+ "execution is failed.");
445445

446+
// ------------------------------------------------------------------------
447+
// Async Table Function
448+
// ------------------------------------------------------------------------
449+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
450+
public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_TABLE_BUFFER_CAPACITY =
451+
key("table.exec.async-table.buffer-capacity")
452+
.intType()
453+
.defaultValue(10)
454+
.withDescription(
455+
"The max number of async i/o operations that the async table function can trigger.");
456+
457+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
458+
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_TIMEOUT =
459+
key("table.exec.async-table.timeout")
460+
.durationType()
461+
.defaultValue(Duration.ofMinutes(3))
462+
.withDescription(
463+
"The async timeout for the asynchronous operation to complete.");
464+
465+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
466+
public static final ConfigOption<RetryStrategy> TABLE_EXEC_ASYNC_TABLE_RETRY_STRATEGY =
467+
key("table.exec.async-table.retry-strategy")
468+
.enumType(RetryStrategy.class)
469+
.defaultValue(RetryStrategy.FIXED_DELAY)
470+
.withDescription(
471+
"Restart strategy which will be used, FIXED_DELAY by default.");
472+
473+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
474+
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_RETRY_DELAY =
475+
key("table.exec.async-table.retry-delay")
476+
.durationType()
477+
.defaultValue(Duration.ofMillis(100))
478+
.withDescription("The delay to wait before trying again.");
479+
480+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
481+
public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_TABLE_MAX_ATTEMPTS =
482+
key("table.exec.async-table.max-attempts")
483+
.intType()
484+
.defaultValue(3)
485+
.withDescription(
486+
"The max number of async retry attempts to make before task "
487+
+ "execution is failed.");
488+
446489
// ------------------------------------------------------------------------
447490
// MiniBatch Options
448491
// ------------------------------------------------------------------------

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@
4848
import java.lang.reflect.ParameterizedType;
4949
import java.lang.reflect.Type;
5050
import java.util.Arrays;
51+
import java.util.Collection;
5152
import java.util.HashSet;
5253
import java.util.List;
54+
import java.util.Optional;
5355
import java.util.Set;
5456
import java.util.concurrent.CompletableFuture;
5557
import java.util.stream.Collectors;
5658

5759
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
60+
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getParameterizedType;
61+
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isGenericOfClass;
5862
import static org.apache.flink.util.Preconditions.checkState;
5963

6064
/**
@@ -477,11 +481,12 @@ private static void validateImplementationMethods(
477481
validateImplementationMethod(functionClass, false, false, SCALAR_EVAL);
478482
} else if (AsyncScalarFunction.class.isAssignableFrom(functionClass)) {
479483
validateImplementationMethod(functionClass, false, false, ASYNC_SCALAR_EVAL);
480-
validateAsyncImplementationMethod(functionClass, ASYNC_SCALAR_EVAL);
484+
validateAsyncImplementationMethod(functionClass, false, ASYNC_SCALAR_EVAL);
481485
} else if (TableFunction.class.isAssignableFrom(functionClass)) {
482486
validateImplementationMethod(functionClass, true, false, TABLE_EVAL);
483487
} else if (AsyncTableFunction.class.isAssignableFrom(functionClass)) {
484488
validateImplementationMethod(functionClass, true, false, ASYNC_TABLE_EVAL);
489+
validateAsyncImplementationMethod(functionClass, true, ASYNC_TABLE_EVAL);
485490
} else if (AggregateFunction.class.isAssignableFrom(functionClass)) {
486491
validateImplementationMethod(functionClass, true, false, AGGREGATE_ACCUMULATE);
487492
validateImplementationMethod(functionClass, true, true, AGGREGATE_RETRACT);
@@ -541,7 +546,9 @@ private static void validateImplementationMethod(
541546
}
542547

543548
private static void validateAsyncImplementationMethod(
544-
Class<? extends UserDefinedFunction> clazz, String... methodNameOptions) {
549+
Class<? extends UserDefinedFunction> clazz,
550+
boolean verifyFutureContainsCollection,
551+
String... methodNameOptions) {
545552
final Set<String> nameSet = new HashSet<>(Arrays.asList(methodNameOptions));
546553
final List<Method> methods = getAllDeclaredMethods(clazz);
547554
for (Method method : methods) {
@@ -558,18 +565,31 @@ private static void validateAsyncImplementationMethod(
558565
if (method.getParameterCount() >= 1) {
559566
Type firstParam = method.getGenericParameterTypes()[0];
560567
firstParam = ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam);
561-
if (CompletableFuture.class.equals(firstParam)
562-
|| firstParam instanceof ParameterizedType
563-
&& CompletableFuture.class.equals(
564-
((ParameterizedType) firstParam).getRawType())) {
565-
foundParam = true;
568+
if (isGenericOfClass(CompletableFuture.class, firstParam)) {
569+
Optional<ParameterizedType> parameterized = getParameterizedType(firstParam);
570+
if (!verifyFutureContainsCollection) {
571+
foundParam = true;
572+
} else if (parameterized.isPresent()
573+
&& parameterized.get().getActualTypeArguments().length > 0) {
574+
firstParam = parameterized.get().getActualTypeArguments()[0];
575+
if (isGenericOfClass(Collection.class, firstParam)) {
576+
foundParam = true;
577+
}
578+
}
566579
}
567580
}
568581
if (!foundParam) {
569-
throw new ValidationException(
570-
String.format(
571-
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.",
572-
method.getName(), clazz.getName()));
582+
if (!verifyFutureContainsCollection) {
583+
throw new ValidationException(
584+
String.format(
585+
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.",
586+
method.getName(), clazz.getName()));
587+
} else {
588+
throw new ValidationException(
589+
String.format(
590+
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture<java.util.Collection>.",
591+
method.getName(), clazz.getName()));
592+
}
573593
}
574594
}
575595
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.CompletableFuture;
4444
import java.util.stream.Stream;
4545

46+
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getParameterizedType;
4647
import static org.apache.flink.table.types.extraction.ExtractionUtils.collectAnnotationsOfClass;
4748
import static org.apache.flink.table.types.extraction.ExtractionUtils.collectAnnotationsOfMethod;
4849
import static org.apache.flink.table.types.extraction.ExtractionUtils.extractionError;
@@ -208,7 +209,8 @@ static MethodVerification createParameterVerification(boolean requireAccumulator
208209
* Verification that checks a method by parameters (arguments only) with mandatory {@link
209210
* CompletableFuture}.
210211
*/
211-
static MethodVerification createParameterAndCompletableFutureVerification(Class<?> baseClass) {
212+
static MethodVerification createParameterAndCompletableFutureVerification(
213+
Class<?> baseClass, @Nullable Class<?> nestedArgumentClass) {
212214
return (method, state, arguments, result) -> {
213215
checkNoState(state);
214216
checkScalarArgumentsOnly(arguments);
@@ -220,11 +222,24 @@ static MethodVerification createParameterAndCompletableFutureVerification(Class<
220222
final Class<?> resultClass = result.toClass();
221223
Type genericType = method.getGenericParameterTypes()[0];
222224
genericType = resolveVariableWithClassContext(baseClass, genericType);
223-
if (!(genericType instanceof ParameterizedType)) {
225+
Optional<ParameterizedType> parameterized = getParameterizedType(genericType);
226+
if (!parameterized.isPresent()) {
224227
throw extractionError(
225228
"The method '%s' needs generic parameters for the CompletableFuture at position %d.",
226229
method.getName(), 0);
227230
}
231+
// If nestedArgumentClass is given, it is assumed to be a generic parameters of
232+
// argumentClass, also at the position genericPos
233+
if (nestedArgumentClass != null) {
234+
genericType = parameterized.get().getActualTypeArguments()[0];
235+
parameterized = getParameterizedType(genericType);
236+
if (!parameterized.isPresent()
237+
|| !parameterized.get().getRawType().equals(nestedArgumentClass)) {
238+
throw extractionError(
239+
"The method '%s' expects nested generic type CompletableFuture<%s> for the %d arg.",
240+
method.getName(), nestedArgumentClass.getName(), 0);
241+
}
242+
}
228243
final Type returnType = ((ParameterizedType) genericType).getActualTypeArguments()[0];
229244
Class<?> returnTypeClass = getClassFromType(returnType);
230245
// Parameters should be validated using strict autoboxing.

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import javax.annotation.Nullable;
4747

48+
import java.util.Collection;
4849
import java.util.LinkedHashMap;
4950
import java.util.List;
5051
import java.util.Map;
@@ -106,7 +107,7 @@ public static TypeInference forAsyncScalarFunction(
106107
null,
107108
null,
108109
createOutputFromGenericInMethod(0, 0, true),
109-
createParameterAndCompletableFutureVerification(function));
110+
createParameterAndCompletableFutureVerification(function, null));
110111
return extractTypeInference(mappingExtractor, false);
111112
}
112113

@@ -172,7 +173,8 @@ public static TypeInference forAsyncTableFunction(
172173
null,
173174
null,
174175
createOutputFromGenericInClass(AsyncTableFunction.class, 0, true),
175-
createParameterAndCompletableFutureVerification(function));
176+
createParameterAndCompletableFutureVerification(
177+
function, Collection.class));
176178
return extractTypeInference(mappingExtractor, false);
177179
}
178180

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ private static TypeStrategy deriveSystemOutputStrategy(
198198
FunctionKind functionKind,
199199
@Nullable List<StaticArgument> staticArgs,
200200
TypeStrategy outputStrategy) {
201-
if (functionKind != FunctionKind.TABLE && functionKind != FunctionKind.PROCESS_TABLE) {
201+
if (functionKind != FunctionKind.TABLE
202+
&& functionKind != FunctionKind.PROCESS_TABLE
203+
&& functionKind != FunctionKind.ASYNC_TABLE) {
202204
return outputStrategy;
203205
}
204206
return new SystemOutputStrategy(functionKind, staticArgs, outputStrategy);

0 commit comments

Comments
 (0)