Skip to content

[FLINK-37789][3/N] introduce ml builtin sql functions and sql validator changes #26553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

lihaosky
Copy link
Contributor

@lihaosky lihaosky commented May 13, 2025

What is the purpose of the change

Add Builtin SqlTableFunction for ml_predict and make sql validator pass

Brief change log

  • Add SqlMlTableFunction and SqlMlPredictTableFunction
  • Make sql validator pass

Verifying this change

Skeleton. Will add test in following up PRs during integration

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@lihaosky lihaosky changed the title [FLINK-37780][2/N] ml builtin sql functions [FLINK-37780][3/N] ml builtin sql functions May 13, 2025
@airlock-confluentinc airlock-confluentinc bot force-pushed the model-predict-function branch from 42fcaec to 64363a9 Compare May 13, 2025 22:12
@flinkbot
Copy link
Collaborator

flinkbot commented May 13, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@davidradl
Copy link
Contributor

@lihaosky could we add unit tests please for the new methods please

@airlock-confluentinc airlock-confluentinc bot force-pushed the model-predict-function branch 2 times, most recently from d48945e to 539ce03 Compare May 17, 2025 22:49
@lihaosky lihaosky changed the title [FLINK-37780][3/N] ml builtin sql functions [FLINK-37780][3/N] introduce ml builtin sql functions and sql validator changes May 17, 2025
@lihaosky lihaosky changed the title [FLINK-37780][3/N] introduce ml builtin sql functions and sql validator changes [FLINK-37789][3/N] introduce ml builtin sql functions and sql validator changes May 19, 2025
@airlock-confluentinc airlock-confluentinc bot force-pushed the model-predict-function branch from 18011a9 to 5aa3803 Compare May 19, 2025 03:41
Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. I left some comments

Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your update.

@@ -2573,7 +2575,7 @@ private SqlNode registerFrom(
if (operand instanceof SqlBasicCall) {
final SqlBasicCall call1 = (SqlBasicCall) operand;
final SqlOperator op = call1.getOperator();
if (op instanceof SqlWindowTableFunction
if ((op instanceof SqlWindowTableFunction || op instanceof SqlMLTableFunction)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confuesd about the change and I can not produce a case have the same error as SqlWindowFunction. I am not sure FLINK-35619 is a good solution here, which tries to validate a node when validating namespace. However, in most cases, validator should register the namespace for the node and then validate.

    private SqlNode validateScopedExpression(SqlNode topNode, SqlValidatorScope scope) {
        SqlNode outermostNode = performUnconditionalRewrites(topNode, false);
        cursorSet.add(outermostNode);
        top = outermostNode;
        TRACER.trace("After unconditional rewrite: {}", outermostNode);
        if (outermostNode.isA(SqlKind.TOP_LEVEL)) {
           // 1. REGISTER namespace
            registerQuery(scope, null, outermostNode, outermostNode, null, false);
        }
       // 2. VALIDATE node
        outermostNode.validate(this, scope);
        if (!outermostNode.isA(SqlKind.TOP_LEVEL)) {
            // force type derivation so that we can provide it to the
            // caller later without needing the scope
            deriveType(scope, outermostNode);
        }
        TRACER.trace("After validation: {}", outermostNode);
        return outermostNode;
    }

How about removing the change here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. This one doesn't seem to have effect. The first select scope should be used when validating descriptor's operands. I made changes there

if (node instanceof SqlExplicitModelCall) {
// Convert it so that model can be accessed in planner. SqlExplicitModelCall
// from parser can't access model.
SqlExplicitModelCall modelCall = (SqlExplicitModelCall) node;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we only rewrite the call if the model is not null here. For example, SqlValidatorImpl#performUnconditionalRewrites override the operator if the function is found.

if (call.getOperator() instanceof SqlUnresolvedFunction) {
            assert call instanceof SqlBasicCall;
            final SqlUnresolvedFunction function = (SqlUnresolvedFunction) call.getOperator();
            // This function hasn't been resolved yet.  Perform
            // a half-hearted resolution now in case it's a
            // builtin function requiring special casing.  If it's
            // not, we'll handle it later during overload resolution.
            final List<SqlOperator> overloads = new ArrayList<>();
            opTab.lookupOperatorOverloads(
                    function.getNameAsId(),
                    function.getFunctionType(),
                    SqlSyntax.FUNCTION,
                    overloads,
                    catalogReader.nameMatcher());
            if (overloads.size() == 1) {
                ((SqlBasicCall) call).setOperator(overloads.get(0));
            }
        }

I think we can have the same logic here


        if (node instanceof SqlExplicitModelCall) {
            // Convert it so that model can be accessed in planner. SqlExplicitModelCall
            // from parser can't access model.
            FlinkCalciteCatalogReader catalogReader =
                    (FlinkCalciteCatalogReader) getCatalogReader();
            CatalogSchemaModel model = catalogReader.getModel(((SqlExplicitModelCall) node).getIdentifier());
            if (model != null) {
                return new SqlModelCall(model);
            }
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting model here should be fine. But SqlExplicitModelCall with model identifier expects the model to exist so we should enforce that here. If there are cases where model doesn't need to exist in other cases, we can revisit?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants