diff --git a/go.mod b/go.mod index ad20fa3a93..72ec870514 100644 --- a/go.mod +++ b/go.mod @@ -123,8 +123,8 @@ require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect + github.com/apache/arrow/go/v12 v12.0.1 // indirect github.com/apache/arrow/go/v15 v15.0.2 // indirect - github.com/apache/arrow/go/v16 v16.0.0 // indirect github.com/apache/thrift v0.20.0 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/astaxie/beego v1.12.3 // indirect @@ -266,3 +266,8 @@ require ( k8s.io/apimachinery v0.23.17 // indirect nhooyr.io/websocket v1.8.7 // indirect ) + +// FIXME: due to a bug in databricks go sql driver we have this hack below +// This should be reverted once https://github.com/databricks/databricks-sql-go/issues/254 +// is fixed and we have updated to the latest driver +replace github.com/databricks/databricks-sql-go v1.5.5 => github.com/mdibaiee/databricks-sql-go v0.0.0-20250307113106-c54f93acc0f1 diff --git a/go.sum b/go.sum index 43e656b602..f6b585c00a 100644 --- a/go.sum +++ b/go.sum @@ -156,10 +156,10 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= +github.com/apache/arrow/go/v12 v12.0.1 h1:JsR2+hzYYjgSUkBSaahpqCetqZMr76djX80fF/DiJbg= +github.com/apache/arrow/go/v12 v12.0.1/go.mod h1:weuTY7JvTG/HDPtMQxEUp7pU73vkLWMLpY67QwZ/WWw= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= -github.com/apache/arrow/go/v16 v16.0.0 h1:qRLbJRPj4zaseZrjbDHa7mUoZDDIU+4pu+mE2Lucs5g= -github.com/apache/arrow/go/v16 v16.0.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54= github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/apache/iceberg-go v0.0.0-20231019054250-52f8abd172c6 h1:G2K7F0Qk21Tf1ndIAzgYB83ymmOc0NzAx081+pqFvRg= @@ -326,8 +326,6 @@ github.com/danieljoos/wincred v1.2.1 h1:dl9cBrupW8+r5250DYkYxocLeZ1Y4vB1kxgtjxw8 github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5FdCcyfPwps= github.com/databricks/databricks-sdk-go v0.41.0 h1:OyhYY+Q6+gqkWeXmpGEiacoU2RStTeWPF0x4vmqbQdc= github.com/databricks/databricks-sdk-go v0.41.0/go.mod h1:rLIhh7DvifVLmf2QxMr/vMRGqdrTZazn8VYo4LilfCo= -github.com/databricks/databricks-sql-go v1.5.5 h1:5R1e8sqoSBxN7AxGBq5SPZA33yFYtVEZsXSQMucotK8= -github.com/databricks/databricks-sql-go v1.5.5/go.mod h1:7y/00AwHRdsN7ise/Qr1aYOrv24jqP+6cGd4+KCfU+c= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -715,6 +713,8 @@ github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxU github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mdibaiee/databricks-sql-go v0.0.0-20250307113106-c54f93acc0f1 h1:udswkCRKTu1DtYLXyH0ITH/onYUWkt1hbML13iNHx7o= +github.com/mdibaiee/databricks-sql-go v0.0.0-20250307113106-c54f93acc0f1/go.mod h1:mhlVfeKir0vTE7JVJv1qvSoiU0PryRKOU49WbMdggrs= github.com/microsoft/go-mssqldb v1.8.0 h1:7cyZ/AT7ycDsEoWPIXibd+aVKFtteUNhDGf3aobP+tw= github.com/microsoft/go-mssqldb v1.8.0/go.mod h1:6znkekS3T2vp0waiMhen4GPU1BiAsrP+iXHcE7a7rFo= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/materialize-databricks/driver.go b/materialize-databricks/driver.go index 2075423bd5..e1a9721c7f 100644 --- a/materialize-databricks/driver.go +++ b/materialize-databricks/driver.go @@ -389,12 +389,7 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error) // given that COPY INTO is idempotent by default: files that have already been loaded into a table will // not be loaded again // see https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html - - // FIXME: due to a bug in databricks go sql driver we are temporarily always running MERGE INTO for - // all non-delta-updates bindings. This should be reverted once https://github.com/databricks/databricks-sql-go/issues/254 - // is fixed and we have updated to the latest driver - // if b.target.DeltaUpdates || !b.needsMerge { - if b.target.DeltaUpdates { + if b.target.DeltaUpdates || !b.needsMerge { // TODO: switch to slices.Chunk once we switch to go1.23 for i := 0; i < len(toCopy); i += queryBatchSize { end := i + queryBatchSize diff --git a/tests/materialize/materialize-databricks/snapshot.json b/tests/materialize/materialize-databricks/snapshot.json index 4ad83e8aaa..5b4e2166bd 100644 --- a/tests/materialize/materialize-databricks/snapshot.json +++ b/tests/materialize/materialize-databricks/snapshot.json @@ -14,7 +14,7 @@ "updated": { "some-schema%2Fbinary_key": { "Queries": [ - "\n\tMERGE INTO `some-schema`.binary_key AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tunbase64(id)::BINARY as id, counter::LONG, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.counter = r.counter, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, counter, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.counter, r.flow_published_at, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.binary_key FROM (\n SELECT\n\t\tunbase64(id)::BINARY as id, counter::LONG, flow_published_at::TIMESTAMP, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -22,7 +22,7 @@ }, "some-schema%2Fdeletions": { "Queries": [ - "\n\tMERGE INTO `some-schema`.deletions AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, `_meta/op`::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.`_meta/op` = r.`_meta/op`, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, `_meta/op`, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.`_meta/op`, r.flow_published_at, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.deletions FROM (\n SELECT\n\t\tid::LONG, `_meta/op`::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -46,7 +46,7 @@ }, "some-schema%2Fduplicate_keys_standard": { "Queries": [ - "\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.duplicate_keys_standard FROM (\n SELECT\n\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -54,7 +54,7 @@ }, "some-schema%2Fformatted_strings": { "Queries": [ - "\n\tMERGE INTO `some-schema`.formatted_strings AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, date::DATE, datetime::TIMESTAMP, flow_published_at::TIMESTAMP, int_and_str::NUMERIC(38,0), int_str::NUMERIC(38,0), num_and_str::DOUBLE, num_str::DOUBLE, time::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.date = r.date, l.datetime = r.datetime, l.flow_published_at = r.flow_published_at, l.int_and_str = r.int_and_str, l.int_str = r.int_str, l.num_and_str = r.num_and_str, l.num_str = r.num_str, l.time = r.time, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, date, datetime, flow_published_at, int_and_str, int_str, num_and_str, num_str, time, flow_document)\n\t\tVALUES (r.id, r.date, r.datetime, r.flow_published_at, r.int_and_str, r.int_str, r.num_and_str, r.num_str, r.time, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.formatted_strings FROM (\n SELECT\n\t\tid::LONG, date::DATE, datetime::TIMESTAMP, flow_published_at::TIMESTAMP, int_and_str::NUMERIC(38,0), int_str::NUMERIC(38,0), num_and_str::DOUBLE, num_str::DOUBLE, time::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -62,7 +62,7 @@ }, "some-schema%2Fmultiple_types": { "Queries": [ - "\n\tMERGE INTO `some-schema`.multiple_types AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, array_int::STRING, unbase64(binary_field)::BINARY as binary_field, bool_field::BOOLEAN, float_field::DOUBLE, flow_published_at::TIMESTAMP, multiple::STRING, nested::STRING, nullable_int::LONG, str_field::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.array_int = r.array_int, l.binary_field = r.binary_field, l.bool_field = r.bool_field, l.float_field = r.float_field, l.flow_published_at = r.flow_published_at, l.multiple = r.multiple, l.nested = r.nested, l.nullable_int = r.nullable_int, l.str_field = r.str_field, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, array_int, binary_field, bool_field, float_field, flow_published_at, multiple, nested, nullable_int, str_field, flow_document)\n\t\tVALUES (r.id, r.array_int, r.binary_field, r.bool_field, r.float_field, r.flow_published_at, r.multiple, r.nested, r.nullable_int, r.str_field, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.multiple_types FROM (\n SELECT\n\t\tid::LONG, array_int::STRING, unbase64(binary_field)::BINARY as binary_field, bool_field::BOOLEAN, float_field::DOUBLE, flow_published_at::TIMESTAMP, multiple::STRING, nested::STRING, nullable_int::LONG, str_field::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -70,7 +70,7 @@ }, "some-schema%2Fsimple": { "Queries": [ - "\n\tMERGE INTO `some-schema`.simple AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, canary::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.canary = r.canary, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, canary, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.canary, r.flow_published_at, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.simple FROM (\n SELECT\n\t\tid::LONG, canary::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -78,7 +78,7 @@ }, "some-schema%2Funsigned_bigint": { "Queries": [ - "\n\tMERGE INTO `some-schema`.unsigned_bigint AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, unsigned_bigint::NUMERIC(38,0), flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.unsigned_bigint = r.unsigned_bigint, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, unsigned_bigint, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.unsigned_bigint, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.unsigned_bigint FROM (\n SELECT\n\t\tid::LONG, flow_published_at::TIMESTAMP, unsigned_bigint::NUMERIC(38,0), flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -133,7 +133,7 @@ }, "some-schema%2Fformatted_strings": { "Queries": [ - "\n\tMERGE INTO `some-schema`.formatted_strings AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, date::DATE, datetime::TIMESTAMP, flow_published_at::TIMESTAMP, int_and_str::NUMERIC(38,0), int_str::NUMERIC(38,0), num_and_str::DOUBLE, num_str::DOUBLE, time::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.date = r.date, l.datetime = r.datetime, l.flow_published_at = r.flow_published_at, l.int_and_str = r.int_and_str, l.int_str = r.int_str, l.num_and_str = r.num_and_str, l.num_str = r.num_str, l.time = r.time, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, date, datetime, flow_published_at, int_and_str, int_str, num_and_str, num_str, time, flow_document)\n\t\tVALUES (r.id, r.date, r.datetime, r.flow_published_at, r.int_and_str, r.int_str, r.num_and_str, r.num_str, r.time, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.formatted_strings FROM (\n SELECT\n\t\tid::LONG, date::DATE, datetime::TIMESTAMP, flow_published_at::TIMESTAMP, int_and_str::NUMERIC(38,0), int_str::NUMERIC(38,0), num_and_str::DOUBLE, num_str::DOUBLE, time::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -149,7 +149,7 @@ }, "some-schema%2Fsimple": { "Queries": [ - "\n\tMERGE INTO `some-schema`.simple AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, canary::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.canary = r.canary, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, canary, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.canary, r.flow_published_at, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.simple FROM (\n SELECT\n\t\tid::LONG, canary::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -180,7 +180,7 @@ }, "some-schema%2Fduplicate_keys_standard": { "Queries": [ - "\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.duplicate_keys_standard FROM (\n SELECT\n\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/" @@ -211,7 +211,7 @@ }, "some-schema%2Fduplicate_keys_standard": { "Queries": [ - "\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n" + "\n\tCOPY INTO `some-schema`.duplicate_keys_standard FROM (\n SELECT\n\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n FROM '/Volumes/main/some-schema/flow_staging/flow_temp_tables'\n\t)\n FILEFORMAT = JSON\n FILES = ('')\n FORMAT_OPTIONS ( 'mode' = 'FAILFAST', 'ignoreMissingFiles' = 'false' )\n\tCOPY_OPTIONS ( 'mergeSchema' = 'true' )\n ;\n" ], "ToDelete": [ "/Volumes/main/some-schema/flow_staging/flow_temp_tables/"