From ab5e6f8758e10c65b4adae202b1bfee9cfa94bfc Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Mon, 10 Mar 2025 13:05:33 +0000 Subject: [PATCH] materialize-databricks: use mahdi's fork of databricks driver I have a temporary fix for the databricks driver bug in my fork: https://github.com/mdibaiee/databricks-sql-go/commit/c54f93acc0f1450585fbf34b2bea5dce1fb3ff3e I have not run this version against a full test suite of databricks driver, but I have run our integration tests and I have also run it against the code that would reproduce the bug, and confirmed the bug does not happen with this version and the error is correctly surfaced. I'm going to deploy this for one customer who is dealing with this issue without merging it, after approval, to test it out. The customer's pipeline has been struggling because of this issue as MERGE INTO queries are too slow for large backfills. --- go.mod | 7 +++++- go.sum | 8 +++---- materialize-databricks/driver.go | 7 +----- .../materialize-databricks/snapshot.json | 22 +++++++++---------- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index ad20fa3a93..72ec870514 100644 --- a/go.mod +++ b/go.mod @@ -123,8 +123,8 @@ require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect + github.com/apache/arrow/go/v12 v12.0.1 // indirect github.com/apache/arrow/go/v15 v15.0.2 // indirect - github.com/apache/arrow/go/v16 v16.0.0 // indirect github.com/apache/thrift v0.20.0 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/astaxie/beego v1.12.3 // indirect @@ -266,3 +266,8 @@ require ( k8s.io/apimachinery v0.23.17 // indirect nhooyr.io/websocket v1.8.7 // indirect ) + +// FIXME: due to a bug in databricks go sql driver we have this hack below +// This should be reverted once https://github.com/databricks/databricks-sql-go/issues/254 +// is fixed and we have updated to the latest driver +replace github.com/databricks/databricks-sql-go v1.5.5 => github.com/mdibaiee/databricks-sql-go v0.0.0-20250307113106-c54f93acc0f1 diff --git a/go.sum b/go.sum index 43e656b602..f6b585c00a 100644 --- a/go.sum +++ b/go.sum @@ -156,10 +156,10 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= +github.com/apache/arrow/go/v12 v12.0.1 h1:JsR2+hzYYjgSUkBSaahpqCetqZMr76djX80fF/DiJbg= +github.com/apache/arrow/go/v12 v12.0.1/go.mod h1:weuTY7JvTG/HDPtMQxEUp7pU73vkLWMLpY67QwZ/WWw= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= -github.com/apache/arrow/go/v16 v16.0.0 h1:qRLbJRPj4zaseZrjbDHa7mUoZDDIU+4pu+mE2Lucs5g= -github.com/apache/arrow/go/v16 v16.0.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54= github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/apache/iceberg-go v0.0.0-20231019054250-52f8abd172c6 h1:G2K7F0Qk21Tf1ndIAzgYB83ymmOc0NzAx081+pqFvRg= @@ -326,8 +326,6 @@ github.com/danieljoos/wincred v1.2.1 h1:dl9cBrupW8+r5250DYkYxocLeZ1Y4vB1kxgtjxw8 github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5FdCcyfPwps= github.com/databricks/databricks-sdk-go v0.41.0 h1:OyhYY+Q6+gqkWeXmpGEiacoU2RStTeWPF0x4vmqbQdc= github.com/databricks/databricks-sdk-go v0.41.0/go.mod h1:rLIhh7DvifVLmf2QxMr/vMRGqdrTZazn8VYo4LilfCo= -github.com/databricks/databricks-sql-go v1.5.5 h1:5R1e8sqoSBxN7AxGBq5SPZA33yFYtVEZsXSQMucotK8= -github.com/databricks/databricks-sql-go v1.5.5/go.mod h1:7y/00AwHRdsN7ise/Qr1aYOrv24jqP+6cGd4+KCfU+c= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -715,6 +713,8 @@ github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxU github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mdibaiee/databricks-sql-go v0.0.0-20250307113106-c54f93acc0f1 h1:udswkCRKTu1DtYLXyH0ITH/onYUWkt1hbML13iNHx7o= +github.com/mdibaiee/databricks-sql-go v0.0.0-20250307113106-c54f93acc0f1/go.mod h1:mhlVfeKir0vTE7JVJv1qvSoiU0PryRKOU49WbMdggrs= github.com/microsoft/go-mssqldb v1.8.0 h1:7cyZ/AT7ycDsEoWPIXibd+aVKFtteUNhDGf3aobP+tw= github.com/microsoft/go-mssqldb v1.8.0/go.mod h1:6znkekS3T2vp0waiMhen4GPU1BiAsrP+iXHcE7a7rFo= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/materialize-databricks/driver.go b/materialize-databricks/driver.go index 2075423bd5..e1a9721c7f 100644 --- a/materialize-databricks/driver.go +++ b/materialize-databricks/driver.go @@ -389,12 +389,7 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error) // given that COPY INTO is idempotent by default: files that have already been loaded into a table will // not be loaded again // see https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html - - // FIXME: due to a bug in databricks go sql driver we are temporarily always running MERGE INTO for - // all non-delta-updates bindings. This should be reverted once https://github.com/databricks/databricks-sql-go/issues/254 - // is fixed and we have updated to the latest driver - // if b.target.DeltaUpdates || !b.needsMerge { - if b.target.DeltaUpdates { + if b.target.DeltaUpdates || !b.needsMerge { // TODO: switch to slices.Chunk once we switch to go1.23 for i := 0; i < len(toCopy); i += queryBatchSize { end := i + queryBatchSize diff --git a/tests/materialize/materialize-databricks/snapshot.json b/tests/materialize/materialize-databricks/snapshot.json index 4ad83e8aaa..5b4e2166bd 100644 --- a/tests/materialize/materialize-databricks/snapshot.json +++ b/tests/materialize/materialize-databricks/snapshot.json @@ -14,7 +14,7 @@ "updated": { "some-schema%2Fbinary_key": { "Queries": [ - "\n\tMERGE INTO `some-schema`.binary_key AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tunbase64(id)::BINARY as id, counter::LONG, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.counter = r.counter, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, counter, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.counter, r.flow_published_at, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.binary_key FROM (\n SELECT\n\t\tunbase64(id)::BINARY as id, counter::LONG, flow_published_at::TIMESTAMP, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -22,7 +22,7 @@ }, "some-schema%2Fdeletions": { "Queries": [ - "\n\tMERGE INTO `some-schema`.deletions AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, `_meta/op`::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.`_meta/op` = r.`_meta/op`, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, `_meta/op`, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.`_meta/op`, r.flow_published_at, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.deletions FROM (\n SELECT\n\t\tid::LONG, `_meta/op`::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -46,7 +46,7 @@ }, "some-schema%2Fduplicate_keys_standard": { "Queries": [ - "\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.duplicate_keys_standard FROM (\n SELECT\n\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -54,7 +54,7 @@ }, "some-schema%2Fformatted_strings": { "Queries": [ - "\n\tMERGE INTO `some-schema`.formatted_strings AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, date::DATE, datetime::TIMESTAMP, flow_published_at::TIMESTAMP, int_and_str::NUMERIC(38,0), int_str::NUMERIC(38,0), num_and_str::DOUBLE, num_str::DOUBLE, time::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.date = r.date, l.datetime = r.datetime, l.flow_published_at = r.flow_published_at, l.int_and_str = r.int_and_str, l.int_str = r.int_str, l.num_and_str = r.num_and_str, l.num_str = r.num_str, l.time = r.time, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, date, datetime, flow_published_at, int_and_str, int_str, num_and_str, num_str, time, flow_document)\n\t\tVALUES (r.id, r.date, r.datetime, r.flow_published_at, r.int_and_str, r.int_str, r.num_and_str, r.num_str, r.time, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.formatted_strings FROM (\n SELECT\n\t\tid::LONG, date::DATE, datetime::TIMESTAMP, flow_published_at::TIMESTAMP, int_and_str::NUMERIC(38,0), int_str::NUMERIC(38,0), num_and_str::DOUBLE, num_str::DOUBLE, time::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -62,7 +62,7 @@ }, "some-schema%2Fmultiple_types": { "Queries": [ - "\n\tMERGE INTO `some-schema`.multiple_types AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, array_int::STRING, unbase64(binary_field)::BINARY as binary_field, bool_field::BOOLEAN, float_field::DOUBLE, flow_published_at::TIMESTAMP, multiple::STRING, nested::STRING, nullable_int::LONG, str_field::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.array_int = r.array_int, l.binary_field = r.binary_field, l.bool_field = r.bool_field, l.float_field = r.float_field, l.flow_published_at = r.flow_published_at, l.multiple = r.multiple, l.nested = r.nested, l.nullable_int = r.nullable_int, l.str_field = r.str_field, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, array_int, binary_field, bool_field, float_field, flow_published_at, multiple, nested, nullable_int, str_field, flow_document)\n\t\tVALUES (r.id, r.array_int, r.binary_field, r.bool_field, r.float_field, r.flow_published_at, r.multiple, r.nested, r.nullable_int, r.str_field, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.multiple_types FROM (\n SELECT\n\t\tid::LONG, array_int::STRING, unbase64(binary_field)::BINARY as binary_field, bool_field::BOOLEAN, float_field::DOUBLE, flow_published_at::TIMESTAMP, multiple::STRING, nested::STRING, nullable_int::LONG, str_field::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -70,7 +70,7 @@ }, "some-schema%2Fsimple": { "Queries": [ - "\n\tMERGE INTO `some-schema`.simple AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, canary::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.canary = r.canary, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, canary, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.canary, r.flow_published_at, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.simple FROM (\n SELECT\n\t\tid::LONG, canary::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -78,7 +78,7 @@ }, "some-schema%2Funsigned_bigint": { "Queries": [ - "\n\tMERGE INTO `some-schema`.unsigned_bigint AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, unsigned_bigint::NUMERIC(38,0), flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.unsigned_bigint = r.unsigned_bigint, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, unsigned_bigint, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.unsigned_bigint, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.unsigned_bigint FROM (\n SELECT\n\t\tid::LONG, flow_published_at::TIMESTAMP, unsigned_bigint::NUMERIC(38,0), flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -133,7 +133,7 @@ }, "some-schema%2Fformatted_strings": { "Queries": [ - "\n\tMERGE INTO `some-schema`.formatted_strings AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, date::DATE, datetime::TIMESTAMP, flow_published_at::TIMESTAMP, int_and_str::NUMERIC(38,0), int_str::NUMERIC(38,0), num_and_str::DOUBLE, num_str::DOUBLE, time::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.date = r.date, l.datetime = r.datetime, l.flow_published_at = r.flow_published_at, l.int_and_str = r.int_and_str, l.int_str = r.int_str, l.num_and_str = r.num_and_str, l.num_str = r.num_str, l.time = r.time, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, date, datetime, flow_published_at, int_and_str, int_str, num_and_str, num_str, time, flow_document)\n\t\tVALUES (r.id, r.date, r.datetime, r.flow_published_at, r.int_and_str, r.int_str, r.num_and_str, r.num_str, r.time, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.formatted_strings FROM (\n SELECT\n\t\tid::LONG, date::DATE, datetime::TIMESTAMP, flow_published_at::TIMESTAMP, int_and_str::NUMERIC(38,0), int_str::NUMERIC(38,0), num_and_str::DOUBLE, num_str::DOUBLE, time::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -149,7 +149,7 @@ }, "some-schema%2Fsimple": { "Queries": [ - "\n\tMERGE INTO `some-schema`.simple AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, canary::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.canary = r.canary, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, canary, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.canary, r.flow_published_at, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.simple FROM (\n SELECT\n\t\tid::LONG, canary::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -180,7 +180,7 @@ }, "some-schema%2Fduplicate_keys_standard": { "Queries": [ - "\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.duplicate_keys_standard FROM (\n SELECT\n\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -211,7 +211,7 @@ }, "some-schema%2Fduplicate_keys_standard": { "Queries": [ - "\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.duplicate_keys_standard FROM (\n SELECT\n\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/"