Skip to content

[FLINK-37724] Adds AsyncTableFunction as a fully supported UDF type #26567

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

AlanConfluent
Copy link
Contributor

What is the purpose of the change

This aims to implement FLIP-498 https://cwiki.apache.org/confluence/display/FLINK/FLIP-498%3A+AsyncTableFunction+for+async+table+function+support.

In particular AsyncTableFunctions can now be defined as:

public class TestTableFunction extends AsyncTableFunction<String> {
    public void eval(CompletableFuture<Collection<String>> result, Integer i) {
        result.complete(Arrays.asList("Row1 " + i, "Row2 " + i));
    }
}

They can be registered in the catalog as with other UDFs, such as with:

 tEnv.createTemporarySystemFunction("func", new TestTableFunction());

There are a few new configs:

Name (Prefix table.exec.async-table) Meaning
buffer-capacity The number of outstanding requests the operator allows at once
timeout The total time which can pass before the invocation (including retries) is considered timed out and task execution is failed
retry-strategy FIXED_DELAY is for a retry after a fixed amount of time
retry-delay The time to wait between retries for the FIXED_DELAY strategy.  Could be the base delay time for a (not yet proposed) exponential backoff.
max-attempts The maximum number of attempts while retrying.

Brief change log

  • Adds AsyncTableFunction support to type system, codegen, and runtime

Verifying this change

This change added tests and can be verified as follows:

  • Tested Udf validation (UserDefinedFunctionHelperTest)
  • Tested type inference (TypeInferenceExtractorTest.java)
  • Tested code generation for AsyncTableFunction
  • Tested new split rules for new async correlates
  • ITCases new async correlate use

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    • It should only affect the new code and shouldn't have detrimental effects to existing AsyncTableFunction use with lookup joins
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    • More documentation will follow as well beyond the configs

@flinkbot
Copy link
Collaborator

flinkbot commented May 15, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Alan for the contribution! I made a brief pass and left some initial comments

@@ -0,0 +1,18 @@
<table class="configuration table table-bordered">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated the async table function?

Comment on lines +384 to +396
public static boolean isGenericOfClass(Class<?> clazz, Type type) {
Optional<ParameterizedType> parameterized = getParameterizedType(type);
return clazz.equals(type)
|| parameterized.isPresent() && clazz.equals(parameterized.get().getRawType());
}

/**
* Returns an optional of a ParameterizedType, if that's what the type is.
*
* @param type The type to check
* @return optional which is present if the type is a ParameterizedType
*/
public static Optional<ParameterizedType> getParameterizedType(Type type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit test for these two function?

@@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, Type t) {
+ "Otherwise the type has to be specified explicitly using type information.");
}
}

/**
* Will return true if the type of the given generic class type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Will return true if the type of the given generic class type.
* Will return true if the type of the given generic class type matches clazz.

.durationType()
.defaultValue(Duration.ofMinutes(3))
.withDescription(
"The async timeout for the asynchronous operation to complete.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the timeout for each retry or total timeout for all retries? I suppose it's for each retry

/** Implementation method isn't void. */
public static class NoFutureAsyncScalarFunction extends AsyncScalarFunction {
public void eval(int i) {}
}

/** Implementation method isn't void. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first param isn't CompletableFuture<Collection>?

public void eval(int i) {}
}

/** Implementation method isn't void. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +50 to +51
minPlanVersion = FlinkVersion.v1_19,
minStateVersion = FlinkVersion.v1_19)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exist yet in 1.19?

@@ -71,24 +72,35 @@ public class AsyncCalcSplitRule {
* org.apache.flink.table.functions.AsyncScalarFunction}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as well as AsyncTableFunction?

Comment on lines +78 to +80
public AsyncRemoteCalcCallFinder() {
this(FunctionKind.ASYNC_SCALAR);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not sure if we want to default to scalar function given both are supported now. Requiring functionKind is probably fine

String sqlQuery = "select * FROM MyTable, LATERAL TABLE(asyncTableFunc(scalar(a)))";
util.verifyRelPlan(sqlQuery);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add

    @Test
    public void testCorrelateWithCast() {
        String sqlQuery =
                "select * FROM MyTable, LATERAL TABLE(asyncTableFunc(cast(cast(a as int) as int)))";
        util.verifyRelPlan(sqlQuery);
    }

There was a fix I made internally around cast

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants