From 6a4f33cb216f7764cffc5274de0d270a783c1be3 Mon Sep 17 00:00:00 2001 From: Vitalii Filipov Date: Fri, 20 Dec 2024 17:09:14 +0200 Subject: [PATCH] Added support for output_table parameter as a Resolvable object (#7) --- README.md | 22 +++++++++++----------- modules/object_table_ml.js | 29 ++++++++++++++++++----------- modules/structured_table_ml.js | 28 +++++++++++++++++----------- modules/utils.js | 19 ++++++++++++++++++- 4 files changed, 64 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 825f45b..ae9cf88 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ Performs the ML.GENERATE_TEXT function on the given source table. | Param | Type | Description | | --- | --- | --- | -| output_table | String | the name of the table to store the final result | +| output_table | String \| Resolvable | either a name or Resolvable of the table to store the final result | | unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | | ml_model | Resolvable | the remote model to use for the ML operation that uses one of the Vertex AI LLM endpoints | | source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | @@ -115,7 +115,7 @@ Performs the ML.GENERATE_TEXT function on visual content in the given source tab | Param | Type | Description | | --- | --- | --- | | source_table | Resolvable | represents the source object table | -| output_table | String | name of the output table | +| output_table | String \| Resolvable | either a name or Resolvable of the output table | | model | Resolvable | name the remote model with the `gemini-pro-vision` endpoint | | prompt | String | the prompt text for the LLM | | llm_config | Object | extra configurations to the LLM | @@ -136,7 +136,7 @@ Performs the ML.GENERATE_EMBEDDING function on the given source table. | Param | Type | Description | | --- | --- | --- | -| output_table | String | the name of the table to store the final result | +| output_table | String \| Resolvable | either a name or Resolvable of the table to store the final result | | unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | | ml_model | Resolvable | the remote model to use for the ML operation that uses one of the `textembedding-gecko*` Vertex AI LLMs as endpoint | | source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | @@ -158,7 +158,7 @@ Performs the ML.UNDERSTAND_TEXT function on the given source table. | Param | Type | Description | | --- | --- | --- | -| output_table | String | the name of the table to store the final result | +| output_table | String \| Resolvable | either a name or Resolvable of the table to store the final result | | unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | | ml_model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_NATURAL_LANGUAGE_V1 | | source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | @@ -180,7 +180,7 @@ Performs the ML.TRANSLATE function on the given source table. | Param | Type | Description | | --- | --- | --- | -| output_table | String | the name of the table to store the final result | +| output_table | String \| Resolvable | either a name or Resolvable of the table to store the final result | | unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | | ml_model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_TRANSLATE_V3 | | source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | @@ -202,7 +202,7 @@ Performs the ML.ANNOTATE_IMAGE function on the given source table. | Param | Type | Description | | --- | --- | --- | | source_table | Resolvable | represents the source object table | -| output_table | String | name of the output table | +| output_table | String \| Resolvable | either a name or Resolvable of the output table | | model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_VISION_V1 | | features | Array | specifies one or more feature names of supported Vision API features | | options | Object | the configuration object for the [obj_table_ml](#obj_table_ml) function | @@ -222,7 +222,7 @@ Performs the ML.TRANSCRIBE function on the given source table. | Param | Type | Description | | --- | --- | --- | | source_table | Resolvable | represents the source object table | -| output_table | String | name of the output table | +| output_table | String \| Resolvable | either a name or Resolvable of the output table | | model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_SPEECH_TO_TEXT_V2 | | recognition_config | Object | the recognition configuration to override the default configuration of the specified recognizer | | options | Object | the configuration object for the [obj_table_ml](#obj_table_ml) function | @@ -241,8 +241,8 @@ Performs the ML.PROCESS_DOCUMENT function on the given source table. | Param | Type | Description | | --- | --- | --- | -| source_table | Resolvable | represents the source object table | -| output_table | String | name of the output table | +| source_table | Resolvable | either a name or Resolvable of the output table | +| output_table | String \| Resolvable | the output table to store final result | | model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_DOCUMENT_V1 | | options | Object | the configuration object for the [obj_table_ml](#obj_table_ml) function | @@ -266,7 +266,7 @@ than the specific duration. | Param | Type | Description | | --- | --- | --- | -| output_table | String | name of the output table | +| output_table | String \| Resolvable | either a name or Resolvable of the output table | | unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | | ml_function | String | the name of the BQML function to call | | ml_model | Resolvable | the remote model to use for the ML operation | @@ -302,7 +302,7 @@ column is newer than the largest value in the output table. | --- | --- | --- | | source_table | Resolvable | represents the source object table | | source | String \| function | either a query string or a Contextable function to produce the query on the source data | -| output_table | String | the name of the table to store the final result | +| output_table | String \| Resolvable | either a name or Resolvable of the table to store the final result | | accept_filter | String | a SQL expression for finding rows that contains retryable error | | batch_size | Number | number of rows to process in each SQL job. Rows in the object table will be processed in batches according to the batch size. Default batch size is 500 | | unique_key | String | the primary key in the output table for incremental update. Default value is "uri". | diff --git a/modules/object_table_ml.js b/modules/object_table_ml.js index d289f31..a4b97fa 100644 --- a/modules/object_table_ml.js +++ b/modules/object_table_ml.js @@ -11,7 +11,7 @@ const common = require("./utils"); * * @param {Resolvable} source_table represents the source object table * @param {String | Function} source either a query string or a Contextable function to produce the query on the source data - * @param {String} output_table the name of the table to store the final result + * @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result * @param {String} accept_filter a SQL expression for finding rows that contains retryable error * @param {Number} batch_size number of rows to process in each SQL job. Rows in the object table will be * processed in batches according to the batch size. Default batch size is 500 @@ -30,15 +30,22 @@ function obj_table_ml(source_table, source, output_table, accept_filter, { let source_func = (source instanceof Function) ? source : () => source; let limit_clause = `LIMIT ${batch_size}`; + const output_table_resolvable = common.to_resolvable(output_table); + const init_table = common.resolvable(`init_${output_table_resolvable.name}`, output_table_resolvable); + // Initialize by creating the output table with a small limit to avoid timeout - operate(`init_${output_table}`) + operate(init_table.name) + .schema(init_table.schema) + .database(init_table.database) .queries((ctx) => - `CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table)} AS ${source_func(ctx)} WHERE ${accept_filter} LIMIT 10`); + `CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table_resolvable)} AS ${source_func(ctx)} WHERE ${accept_filter} LIMIT 10`); // Incrementally update the output table. - let table = publish(output_table, { + let table = publish(output_table_resolvable.name, { type: "incremental", - dependencies: [`init_${output_table}`], + database: output_table_resolvable.database, + schema: output_table_resolvable.schema, + dependencies: [init_table], uniqueKey: [unique_key] }); @@ -48,8 +55,8 @@ function obj_table_ml(source_table, source, output_table, accept_filter, { REPEAT SET candidates = ARRAY( SELECT ${unique_key} FROM ${ctx.ref(source_table)} AS S - WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table)} AS T WHERE S.${unique_key} = T.${unique_key}) - OR ${updated_column} > (SELECT max(${updated_column}) FROM ${ctx.resolve(output_table)}) ${limit_clause})`, + WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table_resolvable)} AS T WHERE S.${unique_key} = T.${unique_key}) + OR ${updated_column} > (SELECT max(${updated_column}) FROM ${ctx.resolve(output_table_resolvable)}) ${limit_clause})`, ``)}`); table.query((ctx) => ` ${source_func(ctx)} WHERE ${ctx.when(ctx.incremental(), @@ -66,7 +73,7 @@ function obj_table_ml(source_table, source, output_table, accept_filter, { * Performs the ML.ANNOTATE_IMAGE function on the given source table. * * @param {Resolvable} source_table represents the source object table - * @param {String} output_table name of the output table + * @param {String | Resolvable} output_table either a name or Resolvable of the output table * @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_VISION_V1 * @param {Array} features specifies one or more feature names of supported Vision API features * @param {Object} options the configuration object for the {@link obj_table_ml} function @@ -87,7 +94,7 @@ function annotate_image(source_table, output_table, model, features, options) { * Performs the ML.TRANSCRIBE function on the given source table. * * @param {Resolvable} source_table represents the source object table - * @param {String} output_table name of the output table + * @param {String | Resolvable} output_table either a name or Resolvable of the output table * @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_SPEECH_TO_TEXT_V2 * @param {Object} recognition_config the recognition configuration to override the default configuration * of the specified recognizer @@ -109,7 +116,7 @@ function transcribe(source_table, output_table, model, recognition_config, optio * Performs the ML.PROCESS_DOCUMENT function on the given source table. * * @param {Resolvable} source_table represents the source object table - * @param {String} output_table name of the output table + * @param {String | Resolvable} output_table either a name or Resolvable of the output table * @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_DOCUMENT_V1 * @param {Object} options the configuration object for the {@link obj_table_ml} function * @@ -126,7 +133,7 @@ function process_document(source_table, output_table, model, options) { * Performs the ML.GENERATE_TEXT function on visual content in the given source table. * * @param {Resolvable} source_table represents the source object table - * @param {String} output_table name of the output table + * @param {String | Resolvable} output_table either a name or Resolvable of the output table * @param {Resolvable} model name the remote model with the `gemini-pro-vision` endpoint * @param {String} prompt the prompt text for the LLM * @param {Object} llm_config extra configurations to the LLM diff --git a/modules/structured_table_ml.js b/modules/structured_table_ml.js index 7aed4e6..c2ea18a 100644 --- a/modules/structured_table_ml.js +++ b/modules/structured_table_ml.js @@ -6,7 +6,7 @@ const common = require("./utils"); * and merges to the output table until all rows are processed or runs longer * than the specific duration. * - * @param {String} output_table name of the output table + * @param {String | Resolvable} output_table either a name or Resolvable of the output table * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table * @param {String} ml_function the name of the BQML function to call * @param {Resolvable} ml_model the remote model to use for the ML operation @@ -25,6 +25,7 @@ function table_ml(output_table, unique_keys, ml_function, ml_model, source_query batch_size = 10000, batch_duration_secs = 22 * 60 * 60, } = {}) { + const output_table_resolvable = common.to_resolvable(output_table); let source_func = (source_query instanceof Function) ? source_query : () => source_query; let limit_clause = `LIMIT ${batch_size}`; let ml_configs_string = Object.entries(ml_configs).map(([k, v]) => `${JSON.stringify(v)} AS ${k}`).join(','); @@ -32,8 +33,11 @@ function table_ml(output_table, unique_keys, ml_function, ml_model, source_query unique_keys = (unique_keys instanceof Array ? unique_keys : [unique_keys]); // Initialize by creating the output table. - operate(`init_${output_table}`) - .queries((ctx) => `CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table)} AS + const init_table = common.resolvable(`init_${output_table_resolvable.name}`, output_table_resolvable); + operate(init_table.name) + .schema(init_table.schema) + .database(init_table.database) + .queries((ctx) => `CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table_resolvable)} AS SELECT * FROM ${ml_function} ( MODEL ${ctx.ref(ml_model)}, (SELECT * FROM (${source_func(ctx)}) ${limit_clause}), @@ -41,9 +45,11 @@ function table_ml(output_table, unique_keys, ml_function, ml_model, source_query ) WHERE ${accept_filter}`); // Incrementally update the output table. - let table = publish(output_table, { + let table = publish(output_table_resolvable.name, { type: "incremental", - dependencies: [`init_${output_table}`], + database: output_table_resolvable.database, + schema: output_table_resolvable.schema, + dependencies: [init_table], uniqueKey: unique_keys, }); @@ -54,8 +60,8 @@ function table_ml(output_table, unique_keys, ml_function, ml_model, source_query SELECT * FROM ${ml_function} ( MODEL ${ctx.ref(ml_model)}, (SELECT S.* FROM (${source_func(ctx)}) AS S - ${ctx.when(ctx.incremental(), - `WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table)} AS T WHERE ${unique_keys.map((k) => `S.${k} = T.${k}`).join(' AND ')})`)} ${limit_clause}), + ${ctx.when(ctx.incremental(), + `WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table_resolvable)} AS T WHERE ${unique_keys.map((k) => `S.${k} = T.${k}`).join(' AND ')})`)} ${limit_clause}), STRUCT (${ml_configs_string}) ) WHERE ${accept_filter}`); table.postOps((ctx) => `${ctx.when(ctx.incremental(), ` @@ -66,7 +72,7 @@ function table_ml(output_table, unique_keys, ml_function, ml_model, source_query /** * Performs the ML.GENERATE_EMBEDDING function on the given source table. * - * @param {String} output_table the name of the table to store the final result + * @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table * @param {Resolvable} ml_model the remote model to use for the ML operation that uses one of the * `textembedding-gecko*` Vertex AI LLMs as endpoint @@ -86,7 +92,7 @@ function generate_embedding(output_table, unique_keys, ml_model, source_query, m /** * Performs the ML.GENERATE_TEXT function on the given source table. * - * @param {String} output_table the name of the table to store the final result + * @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table * @param {Resolvable} ml_model the remote model to use for the ML operation that uses one * of the Vertex AI LLM endpoints @@ -106,7 +112,7 @@ function generate_text(output_table, unique_keys, ml_model, source_query, ml_con /** * Performs the ML.UNDERSTAND_TEXT function on the given source table. * - * @param {String} output_table the name of the table to store the final result +* @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table * @param {Resolvable} ml_model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_NATURAL_LANGUAGE_V1 * @param {String | Function} source_query either a query string or a Contextable function to produce the @@ -125,7 +131,7 @@ function understand_text(output_table, unique_keys, ml_model, source_query, ml_c /** * Performs the ML.TRANSLATE function on the given source table. * - * @param {String} output_table the name of the table to store the final result +* @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table * @param {Resolvable} ml_model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_TRANSLATE_V3 * @param {String | Function} source_query either a query string or a Contextable function to produce the diff --git a/modules/utils.js b/modules/utils.js index 37a24e9..606a625 100644 --- a/modules/utils.js +++ b/modules/utils.js @@ -1,10 +1,25 @@ +/** + * Convert passed parameter to Resolvable + * @param {String | Object} resolvable a resolvable can be either the name of an entity as a string, or + * an object that describes the full path to the relation. + */ +function to_resolvable(resolvable) { + return resolvable.constructor === Object ? resolvable :{ name : resolvable}; +} + +function resolvable(name, default_resolvable) { + return { ...default_resolvable, name }; +} + /** * Declares the resolvable as a Dataform data source. */ function declare_resolvable(resolvable) { - return declare(resolvable.constructor === Object ? resolvable :{ name : resolvable}); + const convertedResolvable = to_resolvable(resolvable); + return declare(convertedResolvable); } + /** * Forms a SQL filter clause for filtering out retryable * error based on a given status column. @@ -16,4 +31,6 @@ function retryable_error_filter(status_col) { module.exports = { declare_resolvable, retryable_error_filter, + to_resolvable, + resolvable, };