Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for output_table parameter as a Resolvable object (#7) #8

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Performs the ML.GENERATE_TEXT function on the given source table.

| Param | Type | Description |
| --- | --- | --- |
| output_table | <code>String</code> | the name of the table to store the final result |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the table to store the final result |
| unique_keys | <code>String</code> \| <code>Array</code> | column name(s) for identifying an unique row in the source table |
| ml_model | <code>Resolvable</code> | the remote model to use for the ML operation that uses one of the Vertex AI LLM endpoints |
| source_query | <code>String</code> \| <code>function</code> | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields |
Expand All @@ -115,7 +115,7 @@ Performs the ML.GENERATE_TEXT function on visual content in the given source tab
| Param | Type | Description |
| --- | --- | --- |
| source_table | <code>Resolvable</code> | represents the source object table |
| output_table | <code>String</code> | name of the output table |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the output table |
| model | <code>Resolvable</code> | name the remote model with the `gemini-pro-vision` endpoint |
| prompt | <code>String</code> | the prompt text for the LLM |
| llm_config | <code>Object</code> | extra configurations to the LLM |
Expand All @@ -136,7 +136,7 @@ Performs the ML.GENERATE_EMBEDDING function on the given source table.

| Param | Type | Description |
| --- | --- | --- |
| output_table | <code>String</code> | the name of the table to store the final result |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the table to store the final result |
| unique_keys | <code>String</code> \| <code>Array</code> | column name(s) for identifying an unique row in the source table |
| ml_model | <code>Resolvable</code> | the remote model to use for the ML operation that uses one of the `textembedding-gecko*` Vertex AI LLMs as endpoint |
| source_query | <code>String</code> \| <code>function</code> | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields |
Expand All @@ -158,7 +158,7 @@ Performs the ML.UNDERSTAND_TEXT function on the given source table.

| Param | Type | Description |
| --- | --- | --- |
| output_table | <code>String</code> | the name of the table to store the final result |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the table to store the final result |
| unique_keys | <code>String</code> \| <code>Array</code> | column name(s) for identifying an unique row in the source table |
| ml_model | <code>Resolvable</code> | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_NATURAL_LANGUAGE_V1 |
| source_query | <code>String</code> \| <code>function</code> | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields |
Expand All @@ -180,7 +180,7 @@ Performs the ML.TRANSLATE function on the given source table.

| Param | Type | Description |
| --- | --- | --- |
| output_table | <code>String</code> | the name of the table to store the final result |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the table to store the final result |
| unique_keys | <code>String</code> \| <code>Array</code> | column name(s) for identifying an unique row in the source table |
| ml_model | <code>Resolvable</code> | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_TRANSLATE_V3 |
| source_query | <code>String</code> \| <code>function</code> | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields |
Expand All @@ -202,7 +202,7 @@ Performs the ML.ANNOTATE_IMAGE function on the given source table.
| Param | Type | Description |
| --- | --- | --- |
| source_table | <code>Resolvable</code> | represents the source object table |
| output_table | <code>String</code> | name of the output table |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the output table |
| model | <code>Resolvable</code> | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_VISION_V1 |
| features | <code>Array</code> | specifies one or more feature names of supported Vision API features |
| options | <code>Object</code> | the configuration object for the [obj_table_ml](#obj_table_ml) function |
Expand All @@ -222,7 +222,7 @@ Performs the ML.TRANSCRIBE function on the given source table.
| Param | Type | Description |
| --- | --- | --- |
| source_table | <code>Resolvable</code> | represents the source object table |
| output_table | <code>String</code> | name of the output table |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the output table |
| model | <code>Resolvable</code> | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_SPEECH_TO_TEXT_V2 |
| recognition_config | <code>Object</code> | the recognition configuration to override the default configuration of the specified recognizer |
| options | <code>Object</code> | the configuration object for the [obj_table_ml](#obj_table_ml) function |
Expand All @@ -241,8 +241,8 @@ Performs the ML.PROCESS_DOCUMENT function on the given source table.

| Param | Type | Description |
| --- | --- | --- |
| source_table | <code>Resolvable</code> | represents the source object table |
| output_table | <code>String</code> | name of the output table |
| source_table | <code>Resolvable</code> | either a name or Resolvable of the output table |
| output_table | <code>String</code> \| <code>Resolvable</code> | the output table to store final result |
| model | <code>Resolvable</code> | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_DOCUMENT_V1 |
| options | <code>Object</code> | the configuration object for the [obj_table_ml](#obj_table_ml) function |

Expand All @@ -266,7 +266,7 @@ than the specific duration.

| Param | Type | Description |
| --- | --- | --- |
| output_table | <code>String</code> | name of the output table |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the output table |
| unique_keys | <code>String</code> \| <code>Array</code> | column name(s) for identifying an unique row in the source table |
| ml_function | <code>String</code> | the name of the BQML function to call |
| ml_model | <code>Resolvable</code> | the remote model to use for the ML operation |
Expand Down Expand Up @@ -302,7 +302,7 @@ column is newer than the largest value in the output table.
| --- | --- | --- |
| source_table | <code>Resolvable</code> | represents the source object table |
| source | <code>String</code> \| <code>function</code> | either a query string or a Contextable function to produce the query on the source data |
| output_table | <code>String</code> | the name of the table to store the final result |
| output_table | <code>String</code> \| <code>Resolvable</code> | either a name or Resolvable of the table to store the final result |
| accept_filter | <code>String</code> | a SQL expression for finding rows that contains retryable error |
| batch_size | <code>Number</code> | number of rows to process in each SQL job. Rows in the object table will be processed in batches according to the batch size. Default batch size is 500 |
| unique_key | <code>String</code> | the primary key in the output table for incremental update. Default value is "uri". |
Expand Down
29 changes: 18 additions & 11 deletions modules/object_table_ml.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const common = require("./utils");
*
* @param {Resolvable} source_table represents the source object table
* @param {String | Function} source either a query string or a Contextable function to produce the query on the source data
* @param {String} output_table the name of the table to store the final result
* @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result
* @param {String} accept_filter a SQL expression for finding rows that contains retryable error
* @param {Number} batch_size number of rows to process in each SQL job. Rows in the object table will be
* processed in batches according to the batch size. Default batch size is 500
Expand All @@ -30,15 +30,22 @@ function obj_table_ml(source_table, source, output_table, accept_filter, {
let source_func = (source instanceof Function) ? source : () => source;
let limit_clause = `LIMIT ${batch_size}`;

const output_table_resolvable = common.to_resolvable(output_table);
const init_table = common.resolvable(`init_${output_table_resolvable.name}`, output_table_resolvable);

// Initialize by creating the output table with a small limit to avoid timeout
operate(`init_${output_table}`)
operate(init_table.name)
.schema(init_table.schema)
.database(init_table.database)
.queries((ctx) =>
`CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table)} AS ${source_func(ctx)} WHERE ${accept_filter} LIMIT 10`);
`CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table_resolvable)} AS ${source_func(ctx)} WHERE ${accept_filter} LIMIT 10`);

// Incrementally update the output table.
let table = publish(output_table, {
let table = publish(output_table_resolvable.name, {
type: "incremental",
dependencies: [`init_${output_table}`],
database: output_table_resolvable.database,
schema: output_table_resolvable.schema,
dependencies: [init_table],
uniqueKey: [unique_key]
});

Expand All @@ -48,8 +55,8 @@ function obj_table_ml(source_table, source, output_table, accept_filter, {
REPEAT
SET candidates = ARRAY(
SELECT ${unique_key} FROM ${ctx.ref(source_table)} AS S
WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table)} AS T WHERE S.${unique_key} = T.${unique_key})
OR ${updated_column} > (SELECT max(${updated_column}) FROM ${ctx.resolve(output_table)}) ${limit_clause})`,
WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table_resolvable)} AS T WHERE S.${unique_key} = T.${unique_key})
OR ${updated_column} > (SELECT max(${updated_column}) FROM ${ctx.resolve(output_table_resolvable)}) ${limit_clause})`,
``)}`);
table.query((ctx) => `
${source_func(ctx)} WHERE ${ctx.when(ctx.incremental(),
Expand All @@ -66,7 +73,7 @@ function obj_table_ml(source_table, source, output_table, accept_filter, {
* Performs the ML.ANNOTATE_IMAGE function on the given source table.
*
* @param {Resolvable} source_table represents the source object table
* @param {String} output_table name of the output table
* @param {String | Resolvable} output_table either a name or Resolvable of the output table
* @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_VISION_V1
* @param {Array} features specifies one or more feature names of supported Vision API features
* @param {Object} options the configuration object for the {@link obj_table_ml} function
Expand All @@ -87,7 +94,7 @@ function annotate_image(source_table, output_table, model, features, options) {
* Performs the ML.TRANSCRIBE function on the given source table.
*
* @param {Resolvable} source_table represents the source object table
* @param {String} output_table name of the output table
* @param {String | Resolvable} output_table either a name or Resolvable of the output table
* @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_SPEECH_TO_TEXT_V2
* @param {Object} recognition_config the recognition configuration to override the default configuration
* of the specified recognizer
Expand All @@ -109,7 +116,7 @@ function transcribe(source_table, output_table, model, recognition_config, optio
* Performs the ML.PROCESS_DOCUMENT function on the given source table.
*
* @param {Resolvable} source_table represents the source object table
* @param {String} output_table name of the output table
* @param {String | Resolvable} output_table either a name or Resolvable of the output table
* @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_DOCUMENT_V1
* @param {Object} options the configuration object for the {@link obj_table_ml} function
*
Expand All @@ -126,7 +133,7 @@ function process_document(source_table, output_table, model, options) {
* Performs the ML.GENERATE_TEXT function on visual content in the given source table.
*
* @param {Resolvable} source_table represents the source object table
* @param {String} output_table name of the output table
* @param {String | Resolvable} output_table either a name or Resolvable of the output table
* @param {Resolvable} model name the remote model with the `gemini-pro-vision` endpoint
* @param {String} prompt the prompt text for the LLM
* @param {Object} llm_config extra configurations to the LLM
Expand Down
28 changes: 17 additions & 11 deletions modules/structured_table_ml.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const common = require("./utils");
* and merges to the output table until all rows are processed or runs longer
* than the specific duration.
*
* @param {String} output_table name of the output table
* @param {String | Resolvable} output_table either a name or Resolvable of the output table
* @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table
* @param {String} ml_function the name of the BQML function to call
* @param {Resolvable} ml_model the remote model to use for the ML operation
Expand All @@ -25,25 +25,31 @@ function table_ml(output_table, unique_keys, ml_function, ml_model, source_query
batch_size = 10000,
batch_duration_secs = 22 * 60 * 60,
} = {}) {
const output_table_resolvable = common.to_resolvable(output_table);
let source_func = (source_query instanceof Function) ? source_query : () => source_query;
let limit_clause = `LIMIT ${batch_size}`;
let ml_configs_string = Object.entries(ml_configs).map(([k, v]) => `${JSON.stringify(v)} AS ${k}`).join(',');

unique_keys = (unique_keys instanceof Array ? unique_keys : [unique_keys]);

// Initialize by creating the output table.
operate(`init_${output_table}`)
.queries((ctx) => `CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table)} AS
const init_table = common.resolvable(`init_${output_table_resolvable.name}`, output_table_resolvable);
operate(init_table.name)
.schema(init_table.schema)
.database(init_table.database)
.queries((ctx) => `CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table_resolvable)} AS
SELECT * FROM ${ml_function} (
MODEL ${ctx.ref(ml_model)},
(SELECT * FROM (${source_func(ctx)}) ${limit_clause}),
STRUCT (${ml_configs_string})
) WHERE ${accept_filter}`);

// Incrementally update the output table.
let table = publish(output_table, {
let table = publish(output_table_resolvable.name, {
type: "incremental",
dependencies: [`init_${output_table}`],
database: output_table_resolvable.database,
schema: output_table_resolvable.schema,
dependencies: [init_table],
uniqueKey: unique_keys,
});

Expand All @@ -54,8 +60,8 @@ function table_ml(output_table, unique_keys, ml_function, ml_model, source_query
SELECT * FROM ${ml_function} (
MODEL ${ctx.ref(ml_model)},
(SELECT S.* FROM (${source_func(ctx)}) AS S
${ctx.when(ctx.incremental(),
`WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table)} AS T WHERE ${unique_keys.map((k) => `S.${k} = T.${k}`).join(' AND ')})`)} ${limit_clause}),
${ctx.when(ctx.incremental(),
`WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table_resolvable)} AS T WHERE ${unique_keys.map((k) => `S.${k} = T.${k}`).join(' AND ')})`)} ${limit_clause}),
STRUCT (${ml_configs_string})
) WHERE ${accept_filter}`);
table.postOps((ctx) => `${ctx.when(ctx.incremental(), `
Expand All @@ -66,7 +72,7 @@ function table_ml(output_table, unique_keys, ml_function, ml_model, source_query
/**
* Performs the ML.GENERATE_EMBEDDING function on the given source table.
*
* @param {String} output_table the name of the table to store the final result
* @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result
* @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table
* @param {Resolvable} ml_model the remote model to use for the ML operation that uses one of the
* `textembedding-gecko*` Vertex AI LLMs as endpoint
Expand All @@ -86,7 +92,7 @@ function generate_embedding(output_table, unique_keys, ml_model, source_query, m
/**
* Performs the ML.GENERATE_TEXT function on the given source table.
*
* @param {String} output_table the name of the table to store the final result
* @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result
* @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table
* @param {Resolvable} ml_model the remote model to use for the ML operation that uses one
* of the Vertex AI LLM endpoints
Expand All @@ -106,7 +112,7 @@ function generate_text(output_table, unique_keys, ml_model, source_query, ml_con
/**
* Performs the ML.UNDERSTAND_TEXT function on the given source table.
*
* @param {String} output_table the name of the table to store the final result
* @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result
* @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table
* @param {Resolvable} ml_model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_NATURAL_LANGUAGE_V1
* @param {String | Function} source_query either a query string or a Contextable function to produce the
Expand All @@ -125,7 +131,7 @@ function understand_text(output_table, unique_keys, ml_model, source_query, ml_c
/**
* Performs the ML.TRANSLATE function on the given source table.
*
* @param {String} output_table the name of the table to store the final result
* @param {String | Resolvable} output_table either a name or Resolvable of the table to store the final result
* @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table
* @param {Resolvable} ml_model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_TRANSLATE_V3
* @param {String | Function} source_query either a query string or a Contextable function to produce the
Expand Down
19 changes: 18 additions & 1 deletion modules/utils.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
/**
* Convert passed parameter to Resolvable
* @param {String | Object} resolvable a resolvable can be either the name of an entity as a string, or
* an object that describes the full path to the relation.
*/
function to_resolvable(resolvable) {
return resolvable.constructor === Object ? resolvable :{ name : resolvable};
}

function resolvable(name, default_resolvable) {
return { ...default_resolvable, name };
}

/**
* Declares the resolvable as a Dataform data source.
*/
function declare_resolvable(resolvable) {
return declare(resolvable.constructor === Object ? resolvable :{ name : resolvable});
const convertedResolvable = to_resolvable(resolvable);
return declare(convertedResolvable);
}


/**
* Forms a SQL filter clause for filtering out retryable
* error based on a given status column.
Expand All @@ -16,4 +31,6 @@ function retryable_error_filter(status_col) {
module.exports = {
declare_resolvable,
retryable_error_filter,
to_resolvable,
resolvable,
};