From 97ba3304cfbbd17374109fc5f29bb8ac91fd049d Mon Sep 17 00:00:00 2001 From: Will Baker Date: Sun, 16 Mar 2025 15:54:12 -0400 Subject: [PATCH 1/4] materializer: support for column migrations Mainly this adds support for column migrations as a concept in the materializer `RunApply` function. The set of fields to be migrated is computed and included in the struct of common updates to be applied for a binding. A few other changes in this commit that are related to some extent, and/or opportunistic changes I wanted to make and couldn't easily split into separate commits: * For namespace creation, often there is a "default" namespace that is set in the endpoint configuration. Although this is usually used by one of the bindings and will be created if it doesn't exist because of that, sometimes no bindings will use it if they all have an alternative namespace set. It can be important for this namespace to exist if, for example, metadata tables must be created in it, so it needs some representation outside the InfoSchema and resource configurations. This changes the endpoint config itself to an interface that must be able to say what the default namespace is, if there is one. * The prior handling for connectors that want to do some extra stuff during `Apply` was a little ugly, and also insufficient for cases where they need to do the extra stuff _before_ the typical apply actions are run. I think it's common enough for these extra actions to be needed (creating a checkpoints table for some SQL materializations, or uploading PySpark scripts for Iceberg) that a specific method for doing them is warranted. So here it is, and it's called `Setup`. * Field compatibility & descriptions are moved to a `MappedTyper` interface to separate concerns a little more. * Fixed the generated "comment" for a field that is a projection of the root document since it looked weird having the pointer be empty. --- materialize-boilerplate/materializer.go | 188 ++++++++++++++++-------- materialize-boilerplate/type_mapping.go | 32 ++++ 2 files changed, 155 insertions(+), 65 deletions(-) diff --git a/materialize-boilerplate/materializer.go b/materialize-boilerplate/materializer.go index 93f68fdbf0..9e32578be8 100644 --- a/materialize-boilerplate/materializer.go +++ b/materialize-boilerplate/materializer.go @@ -78,8 +78,8 @@ type ElementConverter func(tuple.TupleElement) (any, error) // MappedProjection adds materialization-specific type mapping information to a // basic Flow projection, as well as other useful metadata. -type MappedProjection[MT any] struct { - pf.Projection +type MappedProjection[MT MappedTyper] struct { + Projection Comment string Mapped MT } @@ -87,7 +87,7 @@ type MappedProjection[MT any] struct { // MappedBinding is all of the projections for the selected fields of the // materialization, with materialization-specific type mapping and parsed // resource configuration. -type MappedBinding[EC pb.Validator, RC Resourcer[RC, EC], MT any] struct { +type MappedBinding[EC EndpointConfiger, RC Resourcer[RC, EC], MT MappedTyper] struct { pf.MaterializationSpec_Binding Config RC Keys, Values []MappedProjection[MT] @@ -147,12 +147,20 @@ func (mb *MappedBinding[EC, RC, MT]) convertTuple(in tuple.Tuple, offset int, ou // MaterializerBindingUpdate is a distilled representation of the typical kinds // of changes a destination system will care about in response to a new binding // or change to an existing binding. -type MaterializerBindingUpdate[MT any] struct { +type MaterializerBindingUpdate[MT MappedTyper] struct { NewProjections []MappedProjection[MT] NewlyNullableFields []ExistingField + FieldsToMigrate []MigrateField[MT] NewlyDeltaUpdates bool } +// MigrateField is an existing field that must be migrated to be compatible with +// an updated projection. +type MigrateField[MT MappedTyper] struct { + From ExistingField + To MappedProjection[MT] +} + // RuntimeCheckpoint is the raw bytes of a persisted Flow checkpoint. In the // `Opened` response, it will be marshalled into a protocol.Checkpoint. type RuntimeCheckpoint []byte @@ -169,8 +177,20 @@ type MaterializerTransactor interface { m.Transactor } +// EndpointConfiger represents a parsed endpoint config. +type EndpointConfiger interface { + pb.Validator + + // Default namespace is the namespace used for bindings if no explicit + // namespace is configured for the binding. It may also contain metadata + // tables, and needs to exist even if no binding is actually created in it. + // This can return an empty string if namespaces do not apply to the + // materialization. + DefaultNamespace() string +} + // Resourcer represents a parsed resource config. -type Resourcer[T any, EC pb.Validator] interface { +type Resourcer[T any, EC EndpointConfiger] interface { pb.Validator // WithDefaults provides the parsed endpoint config so that any defaults @@ -186,6 +206,7 @@ type Resourcer[T any, EC pb.Validator] interface { // FieldConfiger represents a parsed field config. type FieldConfiger interface { pb.Validator + // CastToString is a common field configuration option that will cause a // field to be converted to a string when it is materialized. CastToString() bool @@ -197,10 +218,10 @@ type FieldConfiger interface { // existing resources to resource specs, and managing the transactions // lifecycle. type Materializer[ - EC pb.Validator, // endpoint config - FC FieldConfiger, // field config - RC Resourcer[RC, EC], // resource config - MT any, // mapped type + EC EndpointConfiger, + FC FieldConfiger, + RC Resourcer[RC, EC], + MT MappedTyper, ] interface { // Config should return a MaterializeCfg with non-defaults populated as // needed. @@ -224,12 +245,15 @@ type Materializer[ // no conversion is needed. MapType(p Projection, fieldCfg FC) (MT, ElementConverter) - // Compatible determines if an existing materialized field as reported in - // the InfoSchema is compatible with a proposed mapped projection. - Compatible(ExistingField, MT) bool - - // DescriptionForType produces a description for a mapped projection. - DescriptionForType(MT) string + // Setup performs extra materialization-specific actions when handling an + // Apply RPC prior to any of the standard actions for resource creation or + // alteration. For example, creating metadata tables that are not + // represented by a binding. It may return a string to describe the actions + // that were taken. + // + // Since Apply is always ran before Open, this effectively runs the Setup + // actions before Open as well. + Setup(context.Context, *InfoSchema) (string, error) // CreateNamespace creates a namespace in the destination system, for // example a "schema" in a SQL database. @@ -260,7 +284,7 @@ type Materializer[ NewMaterializerTransactor(context.Context, pm.Request_Open, InfoSchema, []MappedBinding[EC, RC, MT], *BindingEvents) (MaterializerTransactor, error) } -type NewMaterializerFn[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any] func(context.Context, EC) (Materializer[EC, FC, RC, MT], error) +type NewMaterializerFn[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper] func(context.Context, string, EC) (Materializer[EC, FC, RC, MT], error) // RunSpec produces a spec response from the typical inputs of documentation // url, endpoint config schema, and resource config schema. @@ -277,7 +301,7 @@ func RunSpec(ctx context.Context, req *pm.Request_Spec, docUrl string, endpointS } // RunValidate produces a Validated response for a Validate request. -func RunValidate[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any]( +func RunValidate[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper]( ctx context.Context, req *pm.Request_Validate, newMaterializer NewMaterializerFn[EC, FC, RC, MT], @@ -291,7 +315,7 @@ func RunValidate[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any return nil, err } - materializer, err := newMaterializer(ctx, cfg) + materializer, err := newMaterializer(ctx, req.Name.String(), cfg) if err != nil { return nil, err } @@ -344,29 +368,24 @@ func RunValidate[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any return &pm.Response_Validated{Bindings: out}, nil } -// RunApply produces an Applied response for an Apply request. In addition, the -// initialized Materializer and populated InfoSchema is returned so that -// materializations can do other things after the standard apply actions are -// completed, such as creating additional metadata tables for checkpoints. -func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any]( +// RunApply produces an Applied response for an Apply request. +func RunApply[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper]( ctx context.Context, req *pm.Request_Apply, newMaterializer NewMaterializerFn[EC, FC, RC, MT], -) (T, *InfoSchema, *pm.Response_Applied, error) { - var materializer T +) (*pm.Response_Applied, error) { if err := req.Validate(); err != nil { - return materializer, nil, nil, fmt.Errorf("validating request: %w", err) + return nil, fmt.Errorf("validating request: %w", err) } var endpointCfg EC if err := unmarshalStrict(req.Materialization.ConfigJson, &endpointCfg); err != nil { - return materializer, nil, nil, err + return nil, err } - var err error - nm, err := newMaterializer(ctx, endpointCfg) + materializer, err := newMaterializer(ctx, req.Materialization.Name.String(), endpointCfg) if err != nil { - return materializer, nil, nil, err + return nil, err } // TODO(whb): Some point soon we will have the last committed checkpoint @@ -374,9 +393,6 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, // State + Acknowledge somewhere around here to commit any previously staged // transaction before applying the next spec's updates. - // TODO(whb): This type assertion is not very satisfying but I haven't - // figured out a way to do it more cleanly. - materializer = nm.(T) mCfg := materializer.Config() paths := make([][]string, 0, len(req.Materialization.Bindings)) @@ -386,22 +402,32 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, is := initInfoSchema(mCfg) if err := materializer.PopulateInfoSchema(ctx, paths, is); err != nil { - return materializer, nil, nil, err - } - - computed, err := computeCommonUpdates(req.LastMaterialization, req.Materialization, is) - if err != nil { - return materializer, nil, nil, err + return nil, err } actionDescriptions := []string{} actions := []ActionApplyFn{} + if desc, err := materializer.Setup(ctx, is); err != nil { + return nil, fmt.Errorf("running PreApply: %w", err) + } else if desc != "" { + actionDescriptions = append(actionDescriptions, desc) + } + + common, err := computeCommonUpdates(req.LastMaterialization, req.Materialization, is) + if err != nil { + return nil, err + } + if !mCfg.NoCreateNamespaces { // Create any required namespaces before other actions, which may // include resource creation. Otherwise resources creation may fail due // to namespaces not yet existing. requiredNamespaces := make(map[string]struct{}) + if ns := endpointCfg.DefaultNamespace(); ns != "" { + requiredNamespaces[ns] = struct{}{} + } + for _, b := range req.Materialization.Bindings { path := is.locatePath(b.ResourcePath) if len(path) < 2 { @@ -414,7 +440,7 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, if slices.Contains(is.namespaces, ns) { continue } else if desc, err := materializer.CreateNamespace(ctx, ns); err != nil { - return materializer, nil, nil, err + return nil, err } else { actionDescriptions = append(actionDescriptions, desc) } @@ -428,23 +454,23 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, } } - for _, bindingIdx := range computed.newBindings { + for _, bindingIdx := range common.newBindings { if mapped, err := buildMappedBinding(endpointCfg, materializer, *req.Materialization, bindingIdx); err != nil { - return materializer, nil, nil, err + return nil, err } else if desc, action, err := materializer.CreateResource(ctx, *mapped); err != nil { - return materializer, nil, nil, fmt.Errorf("getting CreateResource action: %w", err) + return nil, fmt.Errorf("getting CreateResource action: %w", err) } else { addAction(desc, action) } } - for _, bindingIdx := range computed.backfillBindings { + for _, bindingIdx := range common.backfillBindings { if deleteDesc, deleteAction, err := materializer.DeleteResource(ctx, req.Materialization.Bindings[bindingIdx].ResourcePath); err != nil { - return materializer, nil, nil, fmt.Errorf("getting DeleteResource action to replace resource: %w", err) + return nil, fmt.Errorf("getting DeleteResource action to replace resource: %w", err) } else if mapped, err := buildMappedBinding(endpointCfg, materializer, *req.Materialization, bindingIdx); err != nil { - return materializer, nil, nil, err + return nil, err } else if createDesc, createAction, err := materializer.CreateResource(ctx, *mapped); err != nil { - return materializer, nil, nil, fmt.Errorf("getting CreateResource action to replace resource: %w", err) + return nil, fmt.Errorf("getting CreateResource action to replace resource: %w", err) } else { addAction(deleteDesc+"\n"+createDesc, func(ctx context.Context) error { if err := deleteAction(ctx); err != nil { @@ -457,18 +483,18 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, } } - for bindingIdx, commonUpdates := range computed.updatedBindings { + for bindingIdx, commonUpdates := range common.updatedBindings { update := MaterializerBindingUpdate[MT]{ NewlyNullableFields: commonUpdates.NewlyNullableFields, NewlyDeltaUpdates: commonUpdates.NewlyDeltaUpdates, } - mapped, err := buildMappedBinding(endpointCfg, materializer, *req.Materialization, bindingIdx) + mb, err := buildMappedBinding(endpointCfg, materializer, *req.Materialization, bindingIdx) if err != nil { - return materializer, nil, nil, err + return nil, err } - ps := mapped.SelectedProjections() + ps := mb.SelectedProjections() for _, p := range commonUpdates.NewProjections { i := slices.IndexFunc(ps, func(pp MappedProjection[MT]) bool { return pp.Field == p.Field @@ -476,23 +502,48 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, update.NewProjections = append(update.NewProjections, ps[i]) } - if desc, action, err := materializer.UpdateResource(ctx, mapped.ResourcePath, *is.GetResource(mapped.ResourcePath), update); err != nil { - return materializer, nil, nil, err + existingResource := is.GetResource(mb.ResourcePath) + for _, p := range mb.Values { + if existingField := existingResource.GetField(p.Field); existingField == nil { + continue + } else if !p.Mapped.Compatible(*existingField) && p.Mapped.CanMigrate(*existingField) { + update.FieldsToMigrate = append(update.FieldsToMigrate, MigrateField[MT]{ + From: *existingField, + To: p, + }) + } else if !p.Mapped.Compatible(*existingField) { + // This is mostly a sanity check that some other process (user + // modifications, perhaps) didn't change the type of a column in + // a way that we can't deal with. There is also a scarce chance + // that a specific sequence of events could occur during + // existing field migrations if they use column renaming, where + // the "old" column is dropped and before the "new" column can + // be renamed (if these operations are not atomic) the source + // field type is changed again in some way and a new column is + // created with that source field type. This scenario would + // involve one invoked Apply RPC racing with another, which is + // not likely, but perhaps not totally impossible. + return nil, fmt.Errorf("existing field %q has incompatible type %q that cannot be migrated", p.Field, p.Mapped.String()) + } + } + + if desc, action, err := materializer.UpdateResource(ctx, mb.ResourcePath, *is.GetResource(mb.ResourcePath), update); err != nil { + return nil, err } else { addAction(desc, action) } } if err := runActions(ctx, actions, actionDescriptions, mCfg.ConcurrentApply); err != nil { - return materializer, nil, nil, err + return nil, err } - return materializer, is, &pm.Response_Applied{ActionDescription: strings.Join(actionDescriptions, "\n")}, nil + return &pm.Response_Applied{ActionDescription: strings.Join(actionDescriptions, "\n")}, nil } // RunNewTransactor builds a transactor and Opened response from an Open // request. -func RunNewTransactor[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any]( +func RunNewTransactor[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper]( ctx context.Context, req pm.Request_Open, be *BindingEvents, @@ -507,7 +558,7 @@ func RunNewTransactor[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], M return nil, nil, nil, err } - materializer, err := newMaterializer(ctx, epCfg) + materializer, err := newMaterializer(ctx, req.Materialization.Name.String(), epCfg) if err != nil { return nil, nil, nil, err } @@ -566,7 +617,7 @@ func initInfoSchema(cfg MaterializeCfg) *InfoSchema { return NewInfoSchema(locatePath, translateField) } -func buildMappedBinding[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any]( +func buildMappedBinding[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper]( endpointCfg EC, materializer Materializer[EC, FC, RC, MT], spec pf.MaterializationSpec, @@ -595,9 +646,10 @@ func buildMappedBinding[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], } } - mt, converter := materializer.MapType(mapProjection(*p, fieldCfg), fieldCfg) + mp := mapProjection(*p, fieldCfg) + mt, converter := materializer.MapType(mp, fieldCfg) *dst = append(*dst, MappedProjection[MT]{ - Projection: *p, + Projection: mp, Comment: commentForProjection(*p), Mapped: mt, }) @@ -630,8 +682,14 @@ func commentForProjection(p pf.Projection) string { if p.Explicit { source = "user-provided" } - out = fmt.Sprintf("%s projection of JSON at: %s with inferred types: %s", - source, p.Ptr, p.Inference.Types) + + if p.Ptr == "" { + out = fmt.Sprintf("%s projection of the root document with inferred types: %s", + source, p.Inference.Types) + } else { + out = fmt.Sprintf("%s projection of JSON at: %s with inferred types: %s", + source, p.Ptr, p.Inference.Types) + } if p.Inference.Description != "" { out = p.Inference.Description + "\n" + out @@ -645,7 +703,7 @@ func commentForProjection(p pf.Projection) string { // constrainterAdapter is purely a shim between the new Materializer interface // and the old Constrainter interface. Eventually these should be consolidated. -type constrainterAdapter[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any] struct { +type constrainterAdapter[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper] struct { m Materializer[EC, FC, RC, MT] } @@ -670,7 +728,7 @@ func (c *constrainterAdapter[EC, FC, RC, MT]) Compatible(existing ExistingField, } mt, _ := c.m.MapType(mapProjection(*p, fieldCfg), fieldCfg) - return c.m.Compatible(existing, mt), nil + return mt.Compatible(existing) || mt.CanMigrate(existing), nil } func (c *constrainterAdapter[EC, FC, RC, MT]) DescriptionForType(p *pf.Projection, rawFieldConfig json.RawMessage) (string, error) { @@ -682,7 +740,7 @@ func (c *constrainterAdapter[EC, FC, RC, MT]) DescriptionForType(p *pf.Projectio } mt, _ := c.m.MapType(mapProjection(*p, fieldCfg), fieldCfg) - return c.m.DescriptionForType(mt), nil + return mt.String(), nil } func unmarshalStrict[T pb.Validator](raw []byte, into *T) error { diff --git a/materialize-boilerplate/type_mapping.go b/materialize-boilerplate/type_mapping.go index 88f07982c9..a435dcfa2e 100644 --- a/materialize-boilerplate/type_mapping.go +++ b/materialize-boilerplate/type_mapping.go @@ -70,6 +70,20 @@ func (FlatTypeString) isFlatType() {} func (FlatTypeStringFormatInteger) isFlatType() {} func (FlatTypeStringFormatNumber) isFlatType() {} +type MappedTyper interface { + // String produces a human readable description of the mapped type, which + // will be included validation constraint descriptions. + String() string + + // Compatible determines if an existing materialized field as reported in + // the InfoSchema is compatible with a proposed mapped type. + Compatible(ExistingField) bool + + // CanMigrate determines if an existing materialized field can be migrated + // to be compatible with the updated mapped type. + CanMigrate(ExistingField) bool +} + // Projection lifts a pf.Projection into a form that's more easily worked with // for materialization-specific type mapping. type Projection struct { @@ -78,6 +92,24 @@ type Projection struct { MustExist bool } +const AnyExistingType string = "*" + +// TypeMigrations is a utility struct for defining which existing fields can be +// migrated to be compatible with updated projections. +type TypeMigrations[T comparable] map[string][]T + +func (m TypeMigrations[T]) CanMigrate(from string, to T) bool { + for f, ts := range m { + if f == from || f == AnyExistingType { + if slices.Contains(ts, to) { + return true + } + } + } + + return false +} + func mapProjection(p pf.Projection, fc FieldConfiger) Projection { mustExist := p.Inference.Exists == pf.Inference_MUST && !slices.Contains(p.Inference.Types, "null") typesWithoutNull := getTypesWithoutNull(p.Inference.Types) From 55f914423455b40aa277a299c6a7261a0d21d981 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Sun, 16 Mar 2025 16:18:26 -0400 Subject: [PATCH 2/4] materialize-iceberg: make integration tests work with concurrent CI runs Adds a random suffix to integration test table names so that more than one CI job run can run them at a time. Otherwise concurrent job runs will delete the testing tables out from under each other and cause the tests to fail. This happens for many of our materialization integration tests, but it's particularly bad for these Iceberg tests since they take so long to run. Sometimes it can be legitimately difficult to get a build to pass CI if many different branches are trying to run the tests. --- tests/materialize/materialize-iceberg/checks.sh | 1 + tests/materialize/materialize-iceberg/cleanup.sh | 2 +- tests/materialize/materialize-iceberg/fetch.sh | 2 +- tests/materialize/materialize-iceberg/setup.sh | 16 +++++++++------- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/tests/materialize/materialize-iceberg/checks.sh b/tests/materialize/materialize-iceberg/checks.sh index fcb0a9f630..064f5eaa0a 100755 --- a/tests/materialize/materialize-iceberg/checks.sh +++ b/tests/materialize/materialize-iceberg/checks.sh @@ -5,3 +5,4 @@ if [ "$(uname -s)" = "Darwin" ]; then SED_CMD="gsed" fi $SED_CMD -i'' -E 's|(/)[0-9a-fA-F-]{36}/[0-9a-fA-F-]{36}(\.)|\1/\2|g' ${SNAPSHOT} +$SED_CMD -i'' "s|_${TABLE_SUFFIX}||g" ${SNAPSHOT} diff --git a/tests/materialize/materialize-iceberg/cleanup.sh b/tests/materialize/materialize-iceberg/cleanup.sh index 6ab40d6ebf..1a60b8e245 100644 --- a/tests/materialize/materialize-iceberg/cleanup.sh +++ b/tests/materialize/materialize-iceberg/cleanup.sh @@ -1,7 +1,7 @@ #!/bin/bash function purge() { - ${ICEBERG_HELPER_CMD} --force purge ${NAMESPACE}."$1" + ${ICEBERG_HELPER_CMD} --force purge ${NAMESPACE}."$1"_${TABLE_SUFFIX} } purge "simple" diff --git a/tests/materialize/materialize-iceberg/fetch.sh b/tests/materialize/materialize-iceberg/fetch.sh index 31357a3750..bf2144aed8 100755 --- a/tests/materialize/materialize-iceberg/fetch.sh +++ b/tests/materialize/materialize-iceberg/fetch.sh @@ -5,7 +5,7 @@ set -o pipefail set -o nounset function exportToJsonl() { - ${ICEBERG_HELPER_CMD} read ${NAMESPACE}."$1" | jq -s "sort_by([.id, .flow_published_at]) | { _table: \"$1\", rows: . }" + ${ICEBERG_HELPER_CMD} read ${NAMESPACE}."$1"_${TABLE_SUFFIX} | jq -s "sort_by([.id, .flow_published_at]) | { _table: \"$1\", rows: . }" } exportToJsonl "simple" diff --git a/tests/materialize/materialize-iceberg/setup.sh b/tests/materialize/materialize-iceberg/setup.sh index 158ad9585f..3e4b353a65 100755 --- a/tests/materialize/materialize-iceberg/setup.sh +++ b/tests/materialize/materialize-iceberg/setup.sh @@ -4,22 +4,24 @@ set -o errexit set -o pipefail set -o nounset +export TABLE_SUFFIX=$(head -c 12 /dev/urandom | base64 | tr -dc 'A-Za-z0-9' | head -c 8 | tr '[:upper:]' '[:lower:]') + resources_json_template='[ { "resource": { - "table": "simple" + "table": "simple_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_SIMPLE}" }, { "resource": { - "table": "duplicate_keys_standard" + "table": "duplicate_keys_standard_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_DUPLICATED_KEYS}" }, { "resource": { - "table": "multiple_types" + "table": "multiple_types_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_MULTIPLE_DATATYPES}", "fields": { @@ -34,7 +36,7 @@ resources_json_template='[ }, { "resource": { - "table": "formatted_strings" + "table": "formatted_strings_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_FORMATTED_STRINGS}", "fields": { @@ -43,19 +45,19 @@ resources_json_template='[ }, { "resource": { - "table": "deletions" + "table": "deletions_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_DELETIONS}" }, { "resource": { - "table": "binary_key" + "table": "binary_key_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_BINARY_KEY}" }, { "resource": { - "table": "string_escaped_key" + "table": "string_escaped_key_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_STRING_ESCAPED_KEY}" } From c1ea82c2ad3209144c1499a639e777e0f4384da0 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Sun, 16 Mar 2025 16:30:23 -0400 Subject: [PATCH 3/4] materialize-iceberg: get rid of `getHighestFieldID` I recently added this `getHighestFieldID` function since the `(*Schema).HighestFieldID` did not work with schemas containing logical types like `decimal`. It is true that the custom version works where the other one doesn't, but neither one is the correct approach for figuring out which field ID to start with when adding a field to an Iceberg table schema. Since there could have been fields that previously existed but were since deleted in the table, it's not strictly correct to assume that the next highest field ID that has never been used before can be determined from the current schema by just looking at the numbers. There's an Iceberg table property that tracks the highest field ID on the table itself though, and using that should always work. This commit removes the `getHighestFieldID` function - the next commit will update the related schema calculation functions, among other things, as part of supporting column migrations. --- materialize-iceberg/type_mapping.go | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/materialize-iceberg/type_mapping.go b/materialize-iceberg/type_mapping.go index c753ff7668..83079eb9a4 100644 --- a/materialize-iceberg/type_mapping.go +++ b/materialize-iceberg/type_mapping.go @@ -164,25 +164,3 @@ func appendProjectionsAsFields(dst *[]iceberg.NestedField, ps []boilerplate.Mapp return ids, id } - -// TODO(whb): The version of go-iceberg we are using has a broken -// (*Schema).HighestFieldID method which does not work with most logical types. -// It looks like that is probably fixed in a more recent version, but we need to -// be on Go 1.23 to use that. -func getHighestFieldID(sch *iceberg.Schema) int { - var out int - - for _, f := range sch.Fields() { - out = max(out, f.ID) - if l, ok := f.Type.(*iceberg.ListType); ok { - out = max(out, l.ElementID) - } - // Ignoring the possibility of Map and Struct types, which also have - // elements with their own field IDs since we don't create columns with - // those types. We _also_ don't create columns with list types, but it - // is more conceivable that we could add that in the short term. This - // should all be removed when we upgrade Go & iceberg-go anyway. - } - - return out -} From ded4a690529e6ba0b15011ef6f0f13a99d0e948c Mon Sep 17 00:00:00 2001 From: Will Baker Date: Sun, 16 Mar 2025 16:36:34 -0400 Subject: [PATCH 4/4] materialize-iceberg: column type migrations Adds support for materialize-iceberg to do column type migrations for all known scenarios of schema "widening" without requiring a backfill to re-build the table. The Spark SQL queries for doing this are straightforward. A new PySpark script is added called `exec.py` that just runs a single query and that is used for running a migration query. This kind of script might be useful for other things in the future as well, so it's not called `migrate.py` or something like that. Most of the work for adding this capability is from needing to initialize all the EMR job running apparatus in `Apply` now to run migration Spark jobs, in addition to in `Open` for running transactions. So there's a lot of code that got moved around to support that, as well as the core changes for supporting column migrations. --- .../.snapshots/TestComputeSchemas | 42 +++ materialize-iceberg/.snapshots/TestTemplates | 16 +- .../.snapshots/TestValidateAndApply | 30 +- materialize-iceberg/config.go | 4 + materialize-iceberg/driver.go | 280 ++++++++++++------ materialize-iceberg/emr.go | 84 +++--- materialize-iceberg/python/exec.py | 22 ++ materialize-iceberg/python/python.go | 4 + materialize-iceberg/sqlgen.go | 72 ++++- materialize-iceberg/sqlgen_test.go | 49 ++- materialize-iceberg/transactor.go | 95 +++--- materialize-iceberg/type_mapping.go | 94 +++++- materialize-iceberg/type_mapping_test.go | 100 ++++++- .../materialize-iceberg/snapshot.json | 10 +- 14 files changed, 652 insertions(+), 250 deletions(-) create mode 100644 materialize-iceberg/.snapshots/TestComputeSchemas create mode 100644 materialize-iceberg/python/exec.py diff --git a/materialize-iceberg/.snapshots/TestComputeSchemas b/materialize-iceberg/.snapshots/TestComputeSchemas new file mode 100644 index 0000000000..c402ce8670 --- /dev/null +++ b/materialize-iceberg/.snapshots/TestComputeSchemas @@ -0,0 +1,42 @@ +--- Original Schema --- +table { + 1: firstKey: required long + 2: secondKey: required string + 3: val1: required string + 4: val2: optional boolean + 5: dateToStr: required date + 6: intToDecimal: optional long + 7: decimalToFloat: required decimal(38, 0) + 8: timestampToStr: required timestamptz + 10: dateToStr_flow_tmp: required double +} + +--- Next Schema --- +table { + 1: firstKey: required long + 2: secondKey: required string + 3: val1: required string + 4: val2: optional boolean + 5: dateToStr: optional date + 6: intToDecimal: optional long + 7: decimalToFloat: required decimal(38, 0) + 8: timestampToStr: required timestamptz + 13: new: optional string + 14: dateToStr_flow_tmp: optional string + 15: intToDecimal_flow_tmp: optional decimal(38, 0) + 16: decimalToFloat_flow_tmp: required double + 17: timestampToStr_flow_tmp: optional string +} + +--- After Migrate Schema --- +table { + 1: firstKey: required long + 2: secondKey: required string + 3: val1: required string + 4: val2: optional boolean + 13: new: optional string + 14: dateToStr: optional string + 15: intToDecimal: optional decimal(38, 0) + 16: decimalToFloat: required double + 17: timestampToStr: optional string +} diff --git a/materialize-iceberg/.snapshots/TestTemplates b/materialize-iceberg/.snapshots/TestTemplates index 5b7bf59608..66e57b1548 100644 --- a/materialize-iceberg/.snapshots/TestTemplates +++ b/materialize-iceberg/.snapshots/TestTemplates @@ -3,7 +3,7 @@ SELECT 0, l.`flow_document` FROM `foo`.`bar` AS l JOIN load_view_0 AS r ON l.`first-key` = r.`first-key` AND l.`first-key` >= 1 AND l.`first-key` <= 10 - AND l.`second-key` = unbase64(r.`second-key`) + AND l.`second-key` = UNBASE64(r.`second-key`) AND l.`third-key` = r.`third-key` AND l.`third-key` >= 'aaaSomeString' AND l.`third-key` <= 'zzzSomeString' --- End load query --- @@ -12,9 +12,17 @@ MERGE INTO `foo`.`bar` AS l USING merge_view_0 AS r ON l.`first-key` = r.`first-key` AND l.`first-key` >= 1 AND l.`first-key` <= 10 - AND l.`second-key` = unbase64(r.`second-key`) + AND l.`second-key` = UNBASE64(r.`second-key`) AND l.`third-key` = r.`third-key` AND l.`third-key` >= 'aaaSomeString' AND l.`third-key` <= 'zzzSomeString' WHEN MATCHED AND r.`flow_document` = '"delete"' THEN DELETE -WHEN MATCHED THEN UPDATE SET l.`first-key` = r.`first-key`, l.`second-key` = unbase64(r.`second-key`), l.`third-key` = r.`third-key`, l.`first-val` = r.`first-val`, l.`second-val` = r.`second-val`, l.`third-val` = r.`third-val`, l.`fourth-val` = r.`fourth-val`, l.`flow_document` = r.`flow_document` -WHEN NOT MATCHED AND r.`flow_document` != '"delete"' THEN INSERT (`first-key`, `second-key`, `third-key`, `first-val`, `second-val`, `third-val`, `fourth-val`, `flow_document`) VALUES (r.`first-key`, unbase64(r.`second-key`), r.`third-key`, r.`first-val`, r.`second-val`, r.`third-val`, r.`fourth-val`, r.`flow_document`) +WHEN MATCHED THEN UPDATE SET l.`first-key` = r.`first-key`, l.`second-key` = UNBASE64(r.`second-key`), l.`third-key` = r.`third-key`, l.`first-val` = r.`first-val`, l.`second-val` = r.`second-val`, l.`third-val` = UNBASE64(r.`third-val`), l.`fourth-val` = r.`fourth-val`, l.`flow_document` = r.`flow_document` +WHEN NOT MATCHED AND r.`flow_document` != '"delete"' THEN INSERT (`first-key`, `second-key`, `third-key`, `first-val`, `second-val`, `third-val`, `fourth-val`, `flow_document`) VALUES (r.`first-key`, UNBASE64(r.`second-key`), r.`third-key`, r.`first-val`, r.`second-val`, UNBASE64(r.`third-val`), r.`fourth-val`, r.`flow_document`) --- End merge query --- + +--- Begin migrate query --- +UPDATE `some`.`table` +SET + `long_to_decimal_flow_tmp` = CAST(`long_to_decimal` AS decimal(38, 0)), + `datetime_to_string_flow_tmp` = DATE_FORMAT(`datetime_to_string`, 'yyyy-MM-dd\'T\'HH:mm:ss.SSSSSS\'Z\''), + `binary_to_string_flow_tmp` = BASE64(`binary_to_string`) +--- End migrate query --- diff --git a/materialize-iceberg/.snapshots/TestValidateAndApply b/materialize-iceberg/.snapshots/TestValidateAndApply index b36f5e8a58..e4a310ae30 100644 --- a/materialize-iceberg/.snapshots/TestValidateAndApply +++ b/materialize-iceberg/.snapshots/TestValidateAndApply @@ -82,21 +82,21 @@ Big Schema Changed Types Constraints: {"Field":"boolField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'boolField' is already being materialized as endpoint type 'BOOLEAN' but endpoint type 'LONG' is required by its schema '{ type: [integer] }'"} {"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} {"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"intField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'intField' is already being materialized as endpoint type 'LONG' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} {"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} {"Field":"numField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'numField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'BOOLEAN' is required by its schema '{ type: [boolean] }'"} {"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringDateField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateField' is already being materialized as endpoint type 'DATE' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} -{"Field":"stringDateTimeField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateTimeField' is already being materialized as endpoint type 'TIMESTAMPTZ' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"stringDateTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringDurationField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringField' is already being materialized as endpoint type 'STRING' but endpoint type 'LONG' is required by its schema '{ type: [integer] }'"} {"Field":"stringHostnameField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIdnEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIdnHostnameField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringIntegerField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringIntegerField' is already being materialized as endpoint type 'DECIMAL(38, 0)' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"stringIntegerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIpv4Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIpv6Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIriField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} @@ -104,12 +104,12 @@ Big Schema Changed Types Constraints: {"Field":"stringJsonPointerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringMacAddr8Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringMacAddrField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringNumberField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringNumberField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"stringNumberField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringRegexField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringRelativeJsonPointerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringUint32Field","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringUint32Field' is already being materialized as endpoint type 'DECIMAL(38, 0)' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} -{"Field":"stringUint64Field","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringUint64Field' is already being materialized as endpoint type 'DECIMAL(38, 0)' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"stringUint32Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"stringUint64Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringUriField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringUriReferenceField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringUriTemplateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} @@ -152,7 +152,7 @@ table { 33: stringUriReferenceField: required string (auto-generated projection of JSON at: /stringUriReferenceField with inferred types: [string]) 34: stringUriTemplateField: required string (auto-generated projection of JSON at: /stringUriTemplateField with inferred types: [string]) 35: stringUuidField: required string (auto-generated projection of JSON at: /stringUuidField with inferred types: [string]) - 36: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object]) + 36: flow_document: required string (auto-generated projection of the root document with inferred types: [object]) } Big Schema Materialized Resource Schema With No Fields Required: @@ -192,7 +192,7 @@ table { 33: stringUriReferenceField: optional string (auto-generated projection of JSON at: /stringUriReferenceField with inferred types: [string]) 34: stringUriTemplateField: optional string (auto-generated projection of JSON at: /stringUriTemplateField with inferred types: [string]) 35: stringUuidField: optional string (auto-generated projection of JSON at: /stringUuidField with inferred types: [string]) - 36: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object]) + 36: flow_document: required string (auto-generated projection of the root document with inferred types: [object]) } Big Schema Changed Types With Table Replacement Constraints: @@ -272,7 +272,7 @@ table { 34: stringUriReferenceField: required string (auto-generated projection of JSON at: /stringUriReferenceField with inferred types: [string]) 35: stringUriTemplateField: required string (auto-generated projection of JSON at: /stringUriTemplateField with inferred types: [string]) 36: stringUuidField: required string (auto-generated projection of JSON at: /stringUuidField with inferred types: [string]) - 37: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object]) + 37: flow_document: required string (auto-generated projection of the root document with inferred types: [object]) } add a single field: @@ -288,7 +288,7 @@ table { 9: requiredInteger: required long (auto-generated projection of JSON at: /requiredInteger with inferred types: [integer]) 10: requiredObject: required string (auto-generated projection of JSON at: /requiredObject with inferred types: [object]) 11: requiredString: required string (auto-generated projection of JSON at: /requiredString with inferred types: [string]) - 12: flow_document: required string (user-provided projection of JSON at: with inferred types: [object]) + 12: flow_document: required string (user-provided projection of the root document with inferred types: [object]) 13: addedOptionalString: optional string (auto-generated projection of JSON at: /addedOptionalString with inferred types: [string]) } @@ -305,7 +305,7 @@ table { 9: requiredInteger: required long (auto-generated projection of JSON at: /requiredInteger with inferred types: [integer]) 10: requiredObject: required string (auto-generated projection of JSON at: /requiredObject with inferred types: [object]) 11: requiredString: required string (auto-generated projection of JSON at: /requiredString with inferred types: [string]) - 12: flow_document: required string (user-provided projection of JSON at: with inferred types: [object]) + 12: flow_document: required string (user-provided projection of the root document with inferred types: [object]) } remove a single required field: @@ -321,7 +321,7 @@ table { 9: requiredInteger: required long (auto-generated projection of JSON at: /requiredInteger with inferred types: [integer]) 10: requiredObject: required string (auto-generated projection of JSON at: /requiredObject with inferred types: [object]) 11: requiredString: optional string (auto-generated projection of JSON at: /requiredString with inferred types: [string]) - 12: flow_document: required string (user-provided projection of JSON at: with inferred types: [object]) + 12: flow_document: required string (user-provided projection of the root document with inferred types: [object]) } add and remove many fields: @@ -337,7 +337,7 @@ table { 9: requiredInteger: required long (auto-generated projection of JSON at: /requiredInteger with inferred types: [integer]) 10: requiredObject: optional string (auto-generated projection of JSON at: /requiredObject with inferred types: [object]) 11: requiredString: optional string (auto-generated projection of JSON at: /requiredString with inferred types: [string]) - 12: flow_document: required string (user-provided projection of JSON at: with inferred types: [object]) + 12: flow_document: required string (user-provided projection of the root document with inferred types: [object]) 13: addedOptionalString: optional string (auto-generated projection of JSON at: /addedOptionalString with inferred types: [string]) 14: addedRequiredString: required string (auto-generated projection of JSON at: /addedRequiredString with inferred types: [string]) } @@ -357,6 +357,6 @@ table { 11: value.with-separated_words: optional string (auto-generated projection of JSON at: /value.with-separated_words with inferred types: [string]) 12: value.with.separated.words: optional string (auto-generated projection of JSON at: /value.with.separated.words with inferred types: [string]) 13: value_with_separated_words: optional string (auto-generated projection of JSON at: /value_with_separated_words with inferred types: [string]) - 14: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object]) + 14: flow_document: required string (auto-generated projection of the root document with inferred types: [object]) } diff --git a/materialize-iceberg/config.go b/materialize-iceberg/config.go index daea2c77e8..144d42be99 100644 --- a/materialize-iceberg/config.go +++ b/materialize-iceberg/config.go @@ -95,6 +95,10 @@ func (c config) Validate() error { return nil } +func (c config) DefaultNamespace() string { + return sanitizePath(c.Namespace)[0] +} + type catalogAuthType string const ( diff --git a/materialize-iceberg/driver.go b/materialize-iceberg/driver.go index 462d31f92f..b1f9287997 100644 --- a/materialize-iceberg/driver.go +++ b/materialize-iceberg/driver.go @@ -12,10 +12,10 @@ import ( "slices" "strings" "sync" + "time" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" - emr "github.com/aws/aws-sdk-go-v2/service/emrserverless" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/ssm" "github.com/aws/aws-sdk-go/aws" @@ -74,19 +74,7 @@ func (Driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Respo } func (Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response_Applied, error) { - mtz, _, res, err := boilerplate.RunApply[*materialization](ctx, req, newMaterialization) - if err != nil { - return nil, err - } - - if mtz.cfg.CatalogAuthentication.CatalogAuthType == catalogAuthTypeClientCredential { - secretName := clientCredSecretName(mtz.cfg.Compute.SystemsManagerPrefix, req.Materialization.Name.String()) - if err := ensureEmrSecret(ctx, mtz.ssmClient, secretName, mtz.cfg.CatalogAuthentication.Credential); err != nil { - return nil, fmt.Errorf("resolving client credential secret for EMR in Systems Manager: %w", err) - } - } - - return res, nil + return boilerplate.RunApply(ctx, req, newMaterialization) } func (Driver) NewTransactor(ctx context.Context, req pm.Request_Open, be *boilerplate.BindingEvents) (m.Transactor, *pm.Response_Opened, *boilerplate.MaterializeOptions, error) { @@ -97,13 +85,15 @@ type materialization struct { cfg config catalog *catalog.Catalog s3Client *s3.Client - emrClient *emr.Client ssmClient *ssm.Client + emrClient *emrClient + templates templates + pyFiles *pyFileURIs // populated in Setup and NewMaterializerTransactor } var _ boilerplate.Materializer[config, fieldConfig, resource, mapped] = &materialization{} -func newMaterialization(ctx context.Context, cfg config) (boilerplate.Materializer[config, fieldConfig, resource, mapped], error) { +func newMaterialization(ctx context.Context, materializationName string, cfg config) (boilerplate.Materializer[config, fieldConfig, resource, mapped], error) { catalog, err := cfg.toCatalog(ctx) if err != nil { return nil, fmt.Errorf("creating catalog: %w", err) @@ -114,22 +104,32 @@ func newMaterialization(ctx context.Context, cfg config) (boilerplate.Materializ return nil, fmt.Errorf("creating S3 client: %w", err) } - emr, err := cfg.toEmrClient(ctx) + ssmClient, err := cfg.toSsmClient(ctx) if err != nil { return nil, fmt.Errorf("creating EMR client: %w", err) } - ssmClient, err := cfg.toSsmClient(ctx) + emr, err := cfg.toEmrClient(ctx) if err != nil { return nil, fmt.Errorf("creating EMR client: %w", err) } return &materialization{ - cfg: cfg, - catalog: catalog, - s3Client: s3, - emrClient: emr, + cfg: cfg, + catalog: catalog, + s3Client: s3, + emrClient: &emrClient{ + cfg: cfg.Compute.emrConfig, + catalogAuth: cfg.CatalogAuthentication, + catalogURL: cfg.URL, + warehouse: cfg.Warehouse, + materializationName: materializationName, + c: emr, + s3Client: s3, + ssmClient: ssmClient, + }, ssmClient: ssmClient, + templates: parseTemplates(), }, nil } @@ -149,14 +149,6 @@ func (d *materialization) Config() boilerplate.MaterializeCfg { } } -func (d *materialization) SetResourceDefaults(c config, res resource) resource { - if res.Namespace == "" { - res.Namespace = c.Namespace - } - - return res -} - func (d *materialization) PopulateInfoSchema(ctx context.Context, resourcePaths [][]string, is *boilerplate.InfoSchema) error { relevantPaths := make(map[string][]string) for _, path := range resourcePaths { @@ -198,7 +190,7 @@ func (d *materialization) PopulateInfoSchema(ctx context.Context, resourcePaths res.PushField(boilerplate.ExistingField{ Name: f.Name, Nullable: !f.Required, - Type: f.Type.Type(), + Type: f.Type.String(), HasDefault: f.WriteDefault != nil, }) } @@ -234,7 +226,7 @@ func (d *materialization) PopulateInfoSchema(ctx context.Context, resourcePaths func (d *materialization) CheckPrerequisites(ctx context.Context) *cerrors.PrereqErr { errs := &cerrors.PrereqErr{} - checkEmrPrereqs(ctx, d, errs) + d.emrClient.checkPrereqs(ctx, errs) bucket := d.cfg.Compute.Bucket bucketWithPath := path.Join(bucket, d.cfg.Compute.BucketPath) @@ -334,12 +326,20 @@ func (d *materialization) MapType(p boilerplate.Projection, fc fieldConfig) (map return mapProjection(p) } -func (d *materialization) Compatible(existing boilerplate.ExistingField, proposed mapped) bool { - return strings.EqualFold(existing.Type, string(proposed.type_.String())) -} +func (d *materialization) Setup(ctx context.Context, is *boilerplate.InfoSchema) (string, error) { + if d.cfg.CatalogAuthentication.CatalogAuthType == catalogAuthTypeClientCredential { + if err := d.emrClient.ensureSecret(ctx, d.cfg.CatalogAuthentication.Credential); err != nil { + return "", err + } + } + + pyFiles, err := d.putPyFiles(ctx) + if err != nil { + return "", fmt.Errorf("putting PySpark scripts: %w", err) + } + d.pyFiles = pyFiles -func (d *materialization) DescriptionForType(prop mapped) string { - return string(prop.type_.String()) + return "", nil } func (d *materialization) CreateNamespace(ctx context.Context, ns string) (string, error) { @@ -385,25 +385,98 @@ func (d *materialization) UpdateResource( existing boilerplate.ExistingResource, update boilerplate.MaterializerBindingUpdate[mapped], ) (string, boilerplate.ActionApplyFn, error) { - if len(update.NewProjections) == 0 && len(update.NewlyNullableFields) == 0 { + if len(update.NewProjections) == 0 && len(update.NewlyNullableFields) == 0 && len(update.FieldsToMigrate) == 0 { return "", nil, nil } ns := resourcePath[0] name := resourcePath[1] - current := existing.Meta.(table.Metadata).CurrentSchema() - next := computeSchemaForUpdatedTable(current, update) + table := existing.Meta.(table.Metadata) + current := table.CurrentSchema() + next := computeSchemaForUpdatedTable(table.LastColumnID(), current, update) reqs := []catalog.TableRequirement{catalog.AssertCurrentSchemaID(current.ID)} upds := []catalog.TableUpdate{catalog.AddSchemaUpdate(next), catalog.SetCurrentSchemaUpdate(next.ID)} + action := fmt.Sprintf("updated table %q.%q schema from %s to %s", ns, name, current.String(), next.String()) + + var afterMigrateReqs []catalog.TableRequirement + var afterMigrateUpds []catalog.TableUpdate + if len(update.FieldsToMigrate) > 0 { + // If there are migrations, the table is initially updated to |next|, + // which will add temporary columns that existing data will be cast + // into. After the compute job is run to cast the existing column data + // into the temporary column, the table is updated to |afterMigrate|, + // which will delete the original columns and rename the temporary + // columns to be the name of the original columns in an atomic schema + // update. + // + // The AssertSchemaID requirements throughout these two table updates + // ensure that no other process comes in and modifies the table schema + // while this is happening. If the schema IDs aren't as expected, the + // connector will crash and start over, which should work fine. + afterMigrate := computeSchemaForCompletedMigrations(next, update.FieldsToMigrate) + afterMigrateReqs = []catalog.TableRequirement{catalog.AssertCurrentSchemaID(next.ID)} + afterMigrateUpds = []catalog.TableUpdate{catalog.AddSchemaUpdate(afterMigrate), catalog.SetCurrentSchemaUpdate(afterMigrate.ID)} + action = fmt.Sprintf("updated table %q.%q schema from %s to %s", ns, name, current.String(), afterMigrate.String()) + } + + return action, func(ctx context.Context) error { + if err := d.catalog.UpdateTable(ctx, ns, name, reqs, upds); err != nil { + return err + } + + if len(update.FieldsToMigrate) > 0 { + input := migrateInput{ + ResourcePath: resourcePath, + Migrations: make([]migrateColumn, 0, len(update.FieldsToMigrate)), + } + for _, migration := range update.FieldsToMigrate { + input.Migrations = append(input.Migrations, migrateColumn{ + Field: migration.From.Name, + FromType: migration.From.Type, + TargetType: migration.To.Mapped.type_, + }) + } - return fmt.Sprintf("updated table %q.%q schema from %s to %s", ns, name, current.String(), next.String()), func(ctx context.Context) error { - return d.catalog.UpdateTable(ctx, ns, name, reqs, upds) + var q strings.Builder + if err := d.templates.migrateQuery.Execute(&q, input); err != nil { + return err + } + + outputPrefix := path.Join(d.emrClient.cfg.BucketPath, uuid.NewString()) + defer func() { + if err := cleanPrefixOnceFn(ctx, d.s3Client, d.emrClient.cfg.Bucket, outputPrefix)(); err != nil { + log.WithError(err).Warn("failed to clean up status file after running a column migration job") + } + }() + + ll := log.WithFields(log.Fields{ + "table": fmt.Sprintf("%s.%s", ns, name), + "numColumns": len(update.FieldsToMigrate), + }) + ll.Info("running column migration job") + ts := time.Now() + if err := d.emrClient.runJob( + ctx, + python.ExecInput{Query: q.String()}, + d.pyFiles.exec, + d.pyFiles.common, + fmt.Sprintf("column migration for: %s", d.emrClient.materializationName), + outputPrefix, + ); err != nil { + return fmt.Errorf("failed to run column migration job: %w", err) + } + ll.WithField("took", time.Since(ts).String()).Info("column migration job complete") + + if err := d.catalog.UpdateTable(ctx, ns, name, afterMigrateReqs, afterMigrateUpds); err != nil { + return fmt.Errorf("failed to update table %s.%s after migration job: %w", ns, name, err) + } + + } + + return nil }, nil } -//go:embed python -var pyFilesFS embed.FS - func (d *materialization) NewMaterializerTransactor( ctx context.Context, req pm.Request_Open, @@ -411,65 +484,24 @@ func (d *materialization) NewMaterializerTransactor( mappedBindings []boilerplate.MappedBinding[config, resource, mapped], be *boilerplate.BindingEvents, ) (boilerplate.MaterializerTransactor, error) { + pyFiles, err := d.putPyFiles(ctx) + if err != nil { + return nil, fmt.Errorf("putting PySpark scripts: %w", err) + } + storageClient := newFileClient(d.s3Client, d.cfg.Compute.emrConfig.Bucket) t := &transactor{ - materializationName: req.Materialization.Name.String(), + materializationName: d.emrClient.materializationName, be: be, cfg: d.cfg, s3Client: d.s3Client, emrClient: d.emrClient, - templates: parseTemplates(), + templates: d.templates, bindings: make([]binding, 0, len(mappedBindings)), loadFiles: boilerplate.NewStagedFiles(storageClient, fileSizeLimit, d.cfg.Compute.emrConfig.BucketPath, false, false), storeFiles: boilerplate.NewStagedFiles(storageClient, fileSizeLimit, d.cfg.Compute.emrConfig.BucketPath, true, true), - } - - if d.cfg.CatalogAuthentication.CatalogAuthType == catalogAuthTypeClientCredential { - t.emrAuth.credentialSecretName = clientCredSecretName(d.cfg.Compute.SystemsManagerPrefix, req.Materialization.Name.String()) - t.emrAuth.scope = d.cfg.CatalogAuthentication.Scope - } - - // PySpark scripts are uploaded to the staging bucket metadata location - // under a prefix that include a hash of their contents. If the scripts are - // changed because of connector updates a new set of files will be written, - // but this is not expected to happen very often so the number of excessive - // files should be minimal. - pyFiles := []struct { - name string - tProp *string - }{ - {name: "common", tProp: &t.pyFiles.commonURI}, - {name: "load", tProp: &t.pyFiles.loadURI}, - {name: "merge", tProp: &t.pyFiles.mergeURI}, - } - pyBytes := make([][]byte, 0, len(pyFiles)) - hasher := sha256.New() - - for _, pf := range pyFiles { - bs, err := pyFilesFS.ReadFile(fmt.Sprintf("python/%s.py", pf.name)) - if err != nil { - return nil, fmt.Errorf("reading embedded python script %s: %w", pf.name, err) - } - if _, err := hasher.Write(bs); err != nil { - return nil, fmt.Errorf("computing hash of embedded python script %s: %w", pf.name, err) - } - pyBytes = append(pyBytes, bs) - } - - fullMetaPrefix := path.Join(d.cfg.Compute.BucketPath, metadataPrefix, "pySparkFiles", fmt.Sprintf("%x", hasher.Sum(nil))) - for idx, pf := range pyFiles { - key := path.Join(fullMetaPrefix, fmt.Sprintf("%s.py", pf.name)) - if _, err := t.s3Client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(d.cfg.Compute.emrConfig.Bucket), - Key: aws.String(key), - Body: bytes.NewReader(pyBytes[idx]), - }); err != nil { - return nil, fmt.Errorf("uploading %s: %w", pf.name, err) - } - - *pf.tProp = "s3://" + path.Join(d.cfg.Compute.emrConfig.Bucket, key) - log.WithField("uri", *pf.tProp).Debug("uploaded PySpark script") + pyFiles: *pyFiles, } for idx, mapped := range mappedBindings { @@ -504,3 +536,63 @@ func (d *materialization) NewMaterializerTransactor( return t, nil } + +//go:embed python +var pyFilesFS embed.FS + +type pyFileURIs struct { + common string + exec string + load string + merge string +} + +// putPyFiles uploads PySpark scripts to the staging bucket metadata location +// under a prefix that includes a hash of their contents. If the scripts are +// changed because of connector updates a new set of files will be written, but +// this is not expected to happen very often so the number of excessive files +// should be minimal. +func (d *materialization) putPyFiles(ctx context.Context) (*pyFileURIs, error) { + var out pyFileURIs + + pyFiles := []struct { + name string + prop *string + }{ + {name: "common", prop: &out.common}, + {name: "exec", prop: &out.exec}, + {name: "load", prop: &out.load}, + {name: "merge", prop: &out.merge}, + } + pyBytes := make([][]byte, 0, len(pyFiles)) + hasher := sha256.New() + + for _, pf := range pyFiles { + bs, err := pyFilesFS.ReadFile(fmt.Sprintf("python/%s.py", pf.name)) + if err != nil { + return nil, fmt.Errorf("reading embedded python script %s: %w", pf.name, err) + } + if _, err := hasher.Write(bs); err != nil { + return nil, fmt.Errorf("computing hash of embedded python script %s: %w", pf.name, err) + } + pyBytes = append(pyBytes, bs) + } + + fullMetaPrefix := path.Join(d.cfg.Compute.emrConfig.BucketPath, metadataPrefix, "pySparkFiles", fmt.Sprintf("%x", hasher.Sum(nil))) + for idx, pf := range pyFiles { + key := path.Join(fullMetaPrefix, fmt.Sprintf("%s.py", pf.name)) + if _, err := d.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(d.cfg.Compute.emrConfig.Bucket), + Key: aws.String(key), + Body: bytes.NewReader(pyBytes[idx]), + }); err != nil { + return nil, fmt.Errorf("uploading %s: %w", pf.name, err) + } + + *pf.prop = "s3://" + path.Join(d.cfg.Compute.emrConfig.Bucket, key) + log.WithField("uri", *pf.prop).Debug("uploaded PySpark script") + } + + d.pyFiles = &out + return d.pyFiles, nil +} diff --git a/materialize-iceberg/emr.go b/materialize-iceberg/emr.go index 876375e42c..19f14208c1 100644 --- a/materialize-iceberg/emr.go +++ b/materialize-iceberg/emr.go @@ -21,21 +21,28 @@ import ( log "github.com/sirupsen/logrus" ) -func clientCredSecretName(prefix string, materializationName string) string { - return prefix + sanitizeAndAppendHash(materializationName) +type emrClient struct { + cfg emrConfig + catalogAuth catalogAuthConfig + catalogURL string + warehouse string + materializationName string + c *emr.Client + s3Client *s3.Client + ssmClient *ssm.Client } -func checkEmrPrereqs(ctx context.Context, d *materialization, errs *cerrors.PrereqErr) { - if _, err := d.emrClient.ListJobRuns(ctx, &emr.ListJobRunsInput{ - ApplicationId: aws.String(d.cfg.Compute.ApplicationId), +func (e *emrClient) checkPrereqs(ctx context.Context, errs *cerrors.PrereqErr) { + if _, err := e.c.ListJobRuns(ctx, &emr.ListJobRunsInput{ + ApplicationId: aws.String(e.cfg.ApplicationId), MaxResults: aws.Int32(1), }); err != nil { - errs.Err(fmt.Errorf("failed to list job runs for application %q: %w", d.cfg.Compute.ApplicationId, err)) + errs.Err(fmt.Errorf("failed to list job runs for application %q: %w", e.cfg.ApplicationId, err)) } - if d.cfg.CatalogAuthentication.CatalogAuthType == catalogAuthTypeClientCredential { - testParameter := clientCredSecretName(d.cfg.Compute.SystemsManagerPrefix, "test") - if _, err := d.ssmClient.PutParameter(ctx, &ssm.PutParameterInput{ + if e.catalogAuth.CatalogAuthType == catalogAuthTypeClientCredential { + testParameter := e.cfg.SystemsManagerPrefix + "test" + if _, err := e.ssmClient.PutParameter(ctx, &ssm.PutParameterInput{ Name: aws.String(testParameter), Value: aws.String("test"), Type: ssmTypes.ParameterTypeSecureString, @@ -43,7 +50,7 @@ func checkEmrPrereqs(ctx context.Context, d *materialization, errs *cerrors.Prer }); err != nil { errs.Err(fmt.Errorf("failed to put secure string parameter to %s: %w", testParameter, err)) return - } else if _, err := d.ssmClient.GetParameter(ctx, &ssm.GetParameterInput{ + } else if _, err := e.ssmClient.GetParameter(ctx, &ssm.GetParameterInput{ Name: aws.String(testParameter), WithDecryption: aws.Bool(true), }); err != nil { @@ -52,12 +59,11 @@ func checkEmrPrereqs(ctx context.Context, d *materialization, errs *cerrors.Prer } } -func ensureEmrSecret(ctx context.Context, client *ssm.Client, parameterName, wantCred string) error { - res, err := client.GetParameter(ctx, &ssm.GetParameterInput{ - Name: aws.String(parameterName), +func (e *emrClient) ensureSecret(ctx context.Context, wantCred string) error { + res, err := e.ssmClient.GetParameter(ctx, &ssm.GetParameterInput{ + Name: aws.String(e.clientCredSecretName()), WithDecryption: aws.Bool(true), }) - if err != nil { var errNotFound *ssmTypes.ParameterNotFound if !errors.As(err, &errNotFound) { @@ -69,8 +75,8 @@ func ensureEmrSecret(ctx context.Context, client *ssm.Client, parameterName, wan return nil } - _, err = client.PutParameter(ctx, &ssm.PutParameterInput{ - Name: aws.String(parameterName), + _, err = e.ssmClient.PutParameter(ctx, &ssm.PutParameterInput{ + Name: aws.String(e.clientCredSecretName()), Value: aws.String(wantCred), Type: ssmTypes.ParameterTypeSecureString, Overwrite: aws.Bool(true), @@ -84,7 +90,7 @@ func ensureEmrSecret(ctx context.Context, client *ssm.Client, parameterName, wan return nil } -func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, workingPrefix, entryPointUri string) error { +func (e *emrClient) runJob(ctx context.Context, input any, entryPointUri, pyFilesCommonURI, jobName, workingPrefix string) error { /*** Available arguments to the pyspark script: | --input-uri | Input for the program, as an s3 URI, to be parsed by the script | Required | @@ -98,8 +104,8 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w getStatus := func() (*python.StatusOutput, error) { var status python.StatusOutput statusKey := path.Join(workingPrefix, statusFile) - if statusObj, err := t.s3Client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(t.cfg.Compute.Bucket), + if statusObj, err := e.s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(e.cfg.Bucket), Key: aws.String(statusKey), }); err != nil { return nil, fmt.Errorf("reading status object %q: %w", statusKey, err) @@ -112,8 +118,8 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w inputKey := path.Join(workingPrefix, "input.json") if inputBytes, err := encodeInput(input); err != nil { return fmt.Errorf("encoding input: %w", err) - } else if _, err := t.s3Client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(t.cfg.Compute.Bucket), + } else if _, err := e.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(e.cfg.Bucket), Key: aws.String(inputKey), Body: bytes.NewReader(inputBytes), }); err != nil { @@ -121,27 +127,27 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w } args := []string{ - "--input-uri", "s3://" + path.Join(t.cfg.Compute.Bucket, inputKey), - "--status-output", "s3://" + path.Join(t.cfg.Compute.Bucket, workingPrefix, statusFile), - "--catalog-url", t.cfg.URL, - "--warehouse", t.cfg.Warehouse, - "--region", t.cfg.Compute.Region, + "--input-uri", "s3://" + path.Join(e.cfg.Bucket, inputKey), + "--status-output", "s3://" + path.Join(e.cfg.Bucket, workingPrefix, statusFile), + "--catalog-url", e.catalogURL, + "--warehouse", e.warehouse, + "--region", e.cfg.Region, } - if n := t.emrAuth.credentialSecretName; n != "" { - args = append(args, "--credential-secret-name", n) - } - if s := t.emrAuth.scope; s != "" { - args = append(args, "--scope", s) + if e.catalogAuth.CatalogAuthType == catalogAuthTypeClientCredential { + args = append(args, "--credential-secret-name", e.clientCredSecretName()) + if s := e.catalogAuth.Scope; s != "" { + args = append(args, "--scope", s) + } } - start, err := t.emrClient.StartJobRun(ctx, &emr.StartJobRunInput{ - ApplicationId: aws.String(t.cfg.Compute.ApplicationId), + start, err := e.c.StartJobRun(ctx, &emr.StartJobRunInput{ + ApplicationId: aws.String(e.cfg.ApplicationId), ClientToken: aws.String(uuid.NewString()), - ExecutionRoleArn: aws.String(t.cfg.Compute.ExecutionRoleArn), + ExecutionRoleArn: aws.String(e.cfg.ExecutionRoleArn), JobDriver: &emrTypes.JobDriverMemberSparkSubmit{ Value: emrTypes.SparkSubmit{ - SparkSubmitParameters: aws.String(fmt.Sprintf("--py-files %s", t.pyFiles.commonURI)), + SparkSubmitParameters: aws.String(fmt.Sprintf("--py-files %s", pyFilesCommonURI)), EntryPoint: aws.String(entryPointUri), EntryPointArguments: args, }, @@ -154,8 +160,8 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w var runDetails string for { - gotRun, err := t.emrClient.GetJobRun(ctx, &emr.GetJobRunInput{ - ApplicationId: aws.String(t.cfg.Compute.ApplicationId), + gotRun, err := e.c.GetJobRun(ctx, &emr.GetJobRunInput{ + ApplicationId: aws.String(e.cfg.ApplicationId), JobRunId: start.JobRunId, }) if err != nil { @@ -190,6 +196,10 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w } } +func (e *emrClient) clientCredSecretName() string { + return e.cfg.SystemsManagerPrefix + sanitizeAndAppendHash(e.materializationName) +} + func encodeInput(in any) ([]byte, error) { var buf bytes.Buffer enc := json.NewEncoder(&buf) diff --git a/materialize-iceberg/python/exec.py b/materialize-iceberg/python/exec.py new file mode 100644 index 0000000000..834e1bb6e1 --- /dev/null +++ b/materialize-iceberg/python/exec.py @@ -0,0 +1,22 @@ +from common import ( + common_args, + get_spark_session, + run_with_status, +) + +args = common_args() +spark = get_spark_session(args) + + +def run(input): + query = input["query"] + + try: + spark.sql(query) + except Exception as e: + raise RuntimeError( + f"Running exec query failed:\n{query}\nOriginal Error:\n{str(e)}" + ) from e + + +run_with_status(args, run) diff --git a/materialize-iceberg/python/python.go b/materialize-iceberg/python/python.go index 09caf51bbc..8de281d030 100644 --- a/materialize-iceberg/python/python.go +++ b/materialize-iceberg/python/python.go @@ -6,6 +6,10 @@ type NestedField struct { Element string `json:"element,omitempty"` } +type ExecInput struct { + Query string `json:"query"` +} + type LoadBinding struct { Binding int `json:"binding"` Keys []NestedField `json:"keys"` diff --git a/materialize-iceberg/sqlgen.go b/materialize-iceberg/sqlgen.go index 84d76b31fe..30f4024254 100644 --- a/materialize-iceberg/sqlgen.go +++ b/materialize-iceberg/sqlgen.go @@ -1,13 +1,17 @@ package connector import ( + "fmt" "strings" "text/template" + + "github.com/apache/iceberg-go" ) type templates struct { - loadQuery *template.Template - mergeQuery *template.Template + loadQuery *template.Template + mergeQuery *template.Template + migrateQuery *template.Template } type templateInput struct { @@ -15,15 +19,51 @@ type templateInput struct { Bounds []mergeBound } +type migrateColumn struct { + Field string + FromType string + TargetType iceberg.Type +} + +type migrateInput struct { + ResourcePath []string + Migrations []migrateColumn +} + +func quoteIdentifier(in string) string { return "`" + in + "`" } + func parseTemplates() templates { tpl := template.New("root").Funcs(template.FuncMap{ "QuoteIdentifier": quoteIdentifier, - "TableFQN": tableFQN, + "TableFQN": func(in []string) string { + quotedParts := make([]string, len(in)) + for i, part := range in { + quotedParts[i] = quoteIdentifier(part) + } + return strings.Join(quotedParts, ".") + }, + "IsBinary": func(m mapped) bool { return m.type_.Equals(iceberg.BinaryType{}) }, + "MigrateColumnName": func(f string) string { return f + migrateFieldSuffix }, + "CastSQL": func(m migrateColumn) string { + ident := quoteIdentifier(m.Field) + switch m.FromType { + case "binary": + return fmt.Sprintf("BASE64(%s)", ident) + case "timestamptz": + // timestamptz columns have an internal storage representation + // in UTC so this will always result in a Z timestamp string, + // which is consistent with what you get if you read a + // timestamptz column. + return fmt.Sprintf(`DATE_FORMAT(%s, 'yyyy-MM-dd\'T\'HH:mm:ss.SSSSSS\'Z\'')`, ident) + default: + return fmt.Sprintf("CAST(%s AS %s)", ident, m.TargetType.Type()) + } + }, }) parsed := template.Must(tpl.Parse(` {{ define "maybe_unbase64_rhs" -}} -{{- if $.Mapped.IsBinary -}}unbase64(r.{{ QuoteIdentifier $.Field }}){{ else }}r.{{ QuoteIdentifier $.Field }}{{ end }} +{{- if IsBinary $.Mapped -}}UNBASE64(r.{{ QuoteIdentifier $.Field }}){{ else }}r.{{ QuoteIdentifier $.Field }}{{ end }} {{- end }} {{ define "loadQuery" -}} @@ -62,20 +102,20 @@ WHEN NOT MATCHED AND r.{{ QuoteIdentifier $.Mapped.Document.Field }} != '"delete {{- end -}} ) {{ end }} + +{{ define "migrateQuery" -}} +UPDATE {{ TableFQN $.ResourcePath }} +SET +{{- range $ind, $col := $.Migrations }} + {{- if $ind }}, {{ end }} + {{ QuoteIdentifier (MigrateColumnName $col.Field) }} = {{ CastSQL $col }} +{{- end }} +{{ end }} `)) return templates{ - loadQuery: parsed.Lookup("loadQuery"), - mergeQuery: parsed.Lookup("mergeQuery"), - } -} - -func quoteIdentifier(in string) string { return "`" + in + "`" } - -func tableFQN(in []string) string { - quotedParts := make([]string, len(in)) - for i, part := range in { - quotedParts[i] = quoteIdentifier(part) + loadQuery: parsed.Lookup("loadQuery"), + mergeQuery: parsed.Lookup("mergeQuery"), + migrateQuery: parsed.Lookup("migrateQuery"), } - return strings.Join(quotedParts, ".") } diff --git a/materialize-iceberg/sqlgen_test.go b/materialize-iceberg/sqlgen_test.go index 77bcc1d1bc..b86622f489 100644 --- a/materialize-iceberg/sqlgen_test.go +++ b/materialize-iceberg/sqlgen_test.go @@ -16,20 +16,26 @@ func TestTemplates(t *testing.T) { var snap strings.Builder + makeProjection := func(field string) boilerplate.Projection { + return boilerplate.Projection{ + Projection: pf.Projection{Field: field}, + } + } + keys := []boilerplate.MappedProjection[mapped]{ - {Projection: pf.Projection{Field: "first-key"}}, - {Projection: pf.Projection{Field: "second-key"}, Mapped: mapped{type_: iceberg.BinaryType{}}}, - {Projection: pf.Projection{Field: "third-key"}}, + {Projection: makeProjection("first-key"), Mapped: mapped{iceberg.StringType{}}}, + {Projection: makeProjection("second-key"), Mapped: mapped{iceberg.BinaryType{}}}, + {Projection: makeProjection("third-key"), Mapped: mapped{iceberg.Int64Type{}}}, } values := []boilerplate.MappedProjection[mapped]{ - {Projection: pf.Projection{Field: "first-val"}}, - {Projection: pf.Projection{Field: "second-val"}}, - {Projection: pf.Projection{Field: "third-val"}}, - {Projection: pf.Projection{Field: "fourth-val"}}, + {Projection: makeProjection("first-val"), Mapped: mapped{iceberg.StringType{}}}, + {Projection: makeProjection("second-val"), Mapped: mapped{iceberg.StringType{}}}, + {Projection: makeProjection("third-val"), Mapped: mapped{iceberg.BinaryType{}}}, + {Projection: makeProjection("fourth-val"), Mapped: mapped{iceberg.StringType{}}}, } - doc := &boilerplate.MappedProjection[mapped]{Projection: pf.Projection{Field: "flow_document"}} + doc := &boilerplate.MappedProjection[mapped]{Projection: makeProjection("flow_document"), Mapped: mapped{iceberg.StringType{}}} input := templateInput{ binding: binding{ @@ -60,6 +66,27 @@ func TestTemplates(t *testing.T) { }, } + mInput := migrateInput{ + ResourcePath: []string{"some", "table"}, + Migrations: []migrateColumn{ + { + Field: "long_to_decimal", + FromType: "long", + TargetType: iceberg.DecimalTypeOf(38, 0), + }, + { + Field: "datetime_to_string", + FromType: "timestamptz", + TargetType: iceberg.StringType{}, + }, + { + Field: "binary_to_string", + FromType: "binary", + TargetType: iceberg.StringType{}, + }, + }, + } + snap.WriteString("--- Begin load query ---\n") require.NoError(t, templates.loadQuery.Execute(&snap, input)) snap.WriteString("--- End load query ---") @@ -70,6 +97,12 @@ func TestTemplates(t *testing.T) { require.NoError(t, templates.mergeQuery.Execute(&snap, input)) snap.WriteString("--- End merge query ---") + snap.WriteString("\n\n") + + snap.WriteString("--- Begin migrate query ---\n") + require.NoError(t, templates.migrateQuery.Execute(&snap, mInput)) + snap.WriteString("--- End migrate query ---") + cupaloy.SnapshotT(t, snap.String()) } diff --git a/materialize-iceberg/transactor.go b/materialize-iceberg/transactor.go index 99cd876f79..09be577e86 100644 --- a/materialize-iceberg/transactor.go +++ b/materialize-iceberg/transactor.go @@ -11,7 +11,6 @@ import ( "strings" "sync" - emr "github.com/aws/aws-sdk-go-v2/service/emrserverless" "github.com/aws/aws-sdk-go-v2/service/s3" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go/aws" @@ -50,23 +49,14 @@ type transactor struct { be *boilerplate.BindingEvents cfg config s3Client *s3.Client - emrClient *emr.Client + emrClient *emrClient templates templates bindings []binding loadFiles *boilerplate.StagedFiles storeFiles *boilerplate.StagedFiles - emrAuth struct { - credentialSecretName string - scope string - } - - pyFiles struct { - commonURI string - loadURI string - mergeURI string - } + pyFiles pyFileURIs } func (t *transactor) RecoverCheckpoint(ctx context.Context, spec pf.MaterializationSpec, rangeSpec pf.RangeSpec) (boilerplate.RuntimeCheckpoint, error) { @@ -146,11 +136,11 @@ func (t *transactor) Load(it *m.LoadIterator, loaded func(binding int, doc json. // In addition to removing the staged load keys, the loaded document result // files that are written to the staging location must also be removed. - cleanupResults := t.cleanPrefixOnceFn(ctx, t.cfg.Compute.Bucket, outputPrefix) + cleanupResults := cleanPrefixOnceFn(ctx, t.s3Client, t.cfg.Compute.Bucket, outputPrefix) defer cleanupResults() t.be.StartedEvaluatingLoads() - if err := t.runEmrJob(ctx, fmt.Sprintf("load for: %s", t.materializationName), loadInput, outputPrefix, t.pyFiles.loadURI); err != nil { + if err := t.emrClient.runJob(ctx, loadInput, t.pyFiles.load, t.pyFiles.common, fmt.Sprintf("load for: %s", t.materializationName), outputPrefix); err != nil { return fmt.Errorf("load job failed: %w", err) } else if err := t.loadFiles.CleanupCurrentTransaction(ctx); err != nil { return fmt.Errorf("cleaning up load files: %w", err) @@ -278,10 +268,10 @@ func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error var stateUpdate *pf.ConnectorState if len(mergeInput.Bindings) > 0 { // Make sure the job status output file gets cleaned up. - cleanupStatus := t.cleanPrefixOnceFn(ctx, t.cfg.Compute.Bucket, outputPrefix) + cleanupStatus := cleanPrefixOnceFn(ctx, t.s3Client, t.cfg.Compute.Bucket, outputPrefix) defer cleanupStatus() - if err := t.runEmrJob(ctx, fmt.Sprintf("store for: %s", t.materializationName), mergeInput, outputPrefix, t.pyFiles.mergeURI); err != nil { + if err := t.emrClient.runJob(ctx, mergeInput, t.pyFiles.merge, t.pyFiles.common, fmt.Sprintf("store for: %s", t.materializationName), outputPrefix); err != nil { return nil, fmt.Errorf("store merge job failed: %w", err) } else if err := cleanupStatus(); err != nil { return nil, fmt.Errorf("cleaning up generated job status file: %w", err) @@ -415,14 +405,47 @@ func (t *transactor) loadWorker(ctx context.Context, loaded func(binding int, do return nil } -func (t *transactor) cleanPrefixOnceFn(ctx context.Context, bucket string, prefix string) func() error { +func (t *transactor) extantFiles(ctx context.Context, originalFileUris []string) (map[string]struct{}, error) { + var prefix string + for _, uri := range originalFileUris { + _, key := s3UriToParts(uri) + this := path.Dir(key) + if prefix == "" { + prefix = this + } else if prefix != this { + return nil, fmt.Errorf("files have different prefixes: %q and %q", prefix, this) + } + } + + out := make(map[string]struct{}) + + paginator := s3.NewListObjectsV2Paginator(t.s3Client, &s3.ListObjectsV2Input{ + Bucket: aws.String(t.cfg.Compute.Bucket), + Prefix: aws.String(prefix), + }) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get next page: %w", err) + } + + for _, obj := range page.Contents { + out[fmt.Sprintf("s3://%s/%s", t.cfg.Compute.Bucket, *obj.Key)] = struct{}{} + } + } + + return out, nil +} + +func cleanPrefixOnceFn(ctx context.Context, s3Client *s3.Client, bucket string, prefix string) func() error { didClean := false return func() error { if didClean { return nil } - paginator := s3.NewListObjectsV2Paginator(t.s3Client, &s3.ListObjectsV2Input{ + paginator := s3.NewListObjectsV2Paginator(s3Client, &s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), }) @@ -444,7 +467,7 @@ func (t *transactor) cleanPrefixOnceFn(ctx context.Context, bucket string, prefi thisPage = append(thisPage, s3types.ObjectIdentifier{Key: obj.Key}) } - if _, err := t.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + if _, err := s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(bucket), Delete: &s3types.Delete{Objects: thisPage}, }); err != nil { @@ -456,37 +479,3 @@ func (t *transactor) cleanPrefixOnceFn(ctx context.Context, bucket string, prefi return nil } } - -func (t *transactor) extantFiles(ctx context.Context, originalFileUris []string) (map[string]struct{}, error) { - var prefix string - for _, uri := range originalFileUris { - _, key := s3UriToParts(uri) - this := path.Dir(key) - if prefix == "" { - prefix = this - } else if prefix != this { - return nil, fmt.Errorf("files have different prefixes: %q and %q", prefix, this) - } - } - - out := make(map[string]struct{}) - - paginator := s3.NewListObjectsV2Paginator(t.s3Client, &s3.ListObjectsV2Input{ - Bucket: aws.String(t.cfg.Compute.Bucket), - Prefix: aws.String(prefix), - }) - - for paginator.HasMorePages() { - page, err := paginator.NextPage(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get next page: %w", err) - } - - for _, obj := range page.Contents { - out[fmt.Sprintf("s3://%s/%s", t.cfg.Compute.Bucket, *obj.Key)] = struct{}{} - } - } - - return out, nil - -} diff --git a/materialize-iceberg/type_mapping.go b/materialize-iceberg/type_mapping.go index 83079eb9a4..0bedae8a41 100644 --- a/materialize-iceberg/type_mapping.go +++ b/materialize-iceberg/type_mapping.go @@ -20,16 +20,31 @@ func (fc fieldConfig) Validate() error { return nil } func (fc fieldConfig) CastToString() bool { return fc.CastToString_ } type mapped struct { - type_ iceberg.Type - required bool + type_ iceberg.Type } -func (m mapped) IsBinary() bool { - return m.type_ == iceberg.BinaryType{} +func (m mapped) String() string { + return m.type_.String() } +func (m mapped) Compatible(existing boilerplate.ExistingField) bool { + return strings.EqualFold(existing.Type, m.type_.String()) +} + +func (m mapped) CanMigrate(existing boilerplate.ExistingField) bool { + return allowedMigrations.CanMigrate(existing.Type, m.type_) +} + +var allowedMigrations = boilerplate.TypeMigrations[iceberg.Type]{ + "long": {iceberg.DecimalTypeOf(38, 0), iceberg.Float64Type{}}, + "decimal(38, 0)": {iceberg.Float64Type{}}, + boilerplate.AnyExistingType: {iceberg.StringType{}}, +} + +var migrateFieldSuffix = "_flow_tmp" + func mapProjection(p boilerplate.Projection) (mapped, boilerplate.ElementConverter) { - m := mapped{required: p.MustExist} + var m mapped var converter boilerplate.ElementConverter switch ft := p.FlatType.(type) { @@ -125,18 +140,81 @@ func computeSchemaForNewTable(res boilerplate.MappedBinding[config, resource, ma return iceberg.NewSchemaWithIdentifiers(1, identifierFields, fields...) } -func computeSchemaForUpdatedTable(current *iceberg.Schema, update boilerplate.MaterializerBindingUpdate[mapped]) *iceberg.Schema { +func computeSchemaForUpdatedTable( + currentHighestID int, + current *iceberg.Schema, + update boilerplate.MaterializerBindingUpdate[mapped], +) *iceberg.Schema { var nextFields []iceberg.NestedField for _, f := range current.Fields() { + if slices.ContainsFunc(update.FieldsToMigrate, func(upd boilerplate.MigrateField[mapped]) bool { + return f.Name == upd.From.Name+migrateFieldSuffix + }) { + // Prune columns from a prior failed migrations of this spec update. + // This prevents rare cases where a prior migration column type is + // incompatible with a source field that has undergone further + // schema evolution before the migration could be applied. + continue + } + if slices.ContainsFunc(update.NewlyNullableFields, func(field boilerplate.ExistingField) bool { return field.Name == f.Name }) { f.Required = false } + + nextFields = append(nextFields, f) + } + + var tempMigrateProjections []boilerplate.MappedProjection[mapped] + for _, f := range update.FieldsToMigrate { + temp := f.To + temp.Field += migrateFieldSuffix + tempMigrateProjections = append(tempMigrateProjections, temp) + } + + _, lastId := appendProjectionsAsFields(&nextFields, update.NewProjections, currentHighestID) + appendProjectionsAsFields(&nextFields, tempMigrateProjections, lastId) + + return iceberg.NewSchemaWithIdentifiers(current.ID+1, current.IdentifierFieldIDs, nextFields...) +} + +func computeSchemaForCompletedMigrations(current *iceberg.Schema, fieldsToMigrate []boilerplate.MigrateField[mapped]) *iceberg.Schema { + fieldWasMigrated := func(f iceberg.NestedField) bool { + return slices.ContainsFunc(fieldsToMigrate, func(field boilerplate.MigrateField[mapped]) bool { + return f.Name == field.From.Name + }) + } + + fieldMigratedTo := func(f iceberg.NestedField) bool { + return slices.ContainsFunc(fieldsToMigrate, func(field boilerplate.MigrateField[mapped]) bool { + return f.Name == field.From.Name+migrateFieldSuffix + }) + } + + // Sanity checks that we don't remove a column without also renaming one to + // its original name. + var fieldsRemoved []string + var fieldsRenamedTo []string + + var nextFields []iceberg.NestedField + for _, f := range current.Fields() { + if fieldWasMigrated(f) { + fieldsRemoved = append(fieldsRemoved, f.Name) + continue + } else if fieldMigratedTo(f) { + f.Name = strings.TrimSuffix(f.Name, migrateFieldSuffix) + fieldsRenamedTo = append(fieldsRenamedTo, f.Name) + } + nextFields = append(nextFields, f) } - appendProjectionsAsFields(&nextFields, update.NewProjections, getHighestFieldID(current)) + slices.Sort(fieldsRemoved) + slices.Sort(fieldsRenamedTo) + if !slices.Equal(fieldsRemoved, fieldsRenamedTo) { + panic(fmt.Sprintf("application error: fields removed and renamed to are not equal: %s vs %s", fieldsRemoved, fieldsRenamedTo)) + } return iceberg.NewSchemaWithIdentifiers(current.ID+1, current.IdentifierFieldIDs, nextFields...) } @@ -156,7 +234,7 @@ func appendProjectionsAsFields(dst *[]iceberg.NestedField, ps []boilerplate.Mapp ID: id, Name: p.Field, Type: p.Mapped.type_, - Required: p.Mapped.required, + Required: p.MustExist, Doc: strings.ReplaceAll(p.Comment, "\n", " - "), // Glue catalogs don't support newlines in field comments }) ids = append(ids, id) diff --git a/materialize-iceberg/type_mapping_test.go b/materialize-iceberg/type_mapping_test.go index 68935deaa1..573ec7a82e 100644 --- a/materialize-iceberg/type_mapping_test.go +++ b/materialize-iceberg/type_mapping_test.go @@ -1,26 +1,106 @@ package connector import ( + "fmt" + "strings" "testing" "github.com/apache/iceberg-go" + "github.com/bradleyjkemp/cupaloy" + + boilerplate "github.com/estuary/connectors/materialize-boilerplate" + pf "github.com/estuary/flow/go/protocols/flow" "github.com/stretchr/testify/require" ) -func TestGetHighestFieldID(t *testing.T) { +func TestAllowedMigrations(t *testing.T) { + for _, test := range []struct { + from string + to iceberg.Type + }{ + {"long", iceberg.DecimalTypeOf(38, 0)}, + {"long", iceberg.Float64Type{}}, + {"long", iceberg.StringType{}}, + {"decimal(38, 0)", iceberg.Float64Type{}}, + {"decimal(38, 0)", iceberg.StringType{}}, + {"double", iceberg.StringType{}}, + {"boolean", iceberg.StringType{}}, + {"binary", iceberg.StringType{}}, + {"date", iceberg.StringType{}}, + {"timestamptz", iceberg.StringType{}}, + } { + t.Run(fmt.Sprintf("%s->%s", test.from, test.to.String()), func(t *testing.T) { + require.True(t, allowedMigrations.CanMigrate(test.from, test.to)) + }) + } +} + +func TestComputeSchemas(t *testing.T) { + mappedProjection := func(field string, mustExist bool, type_ iceberg.Type) boilerplate.MappedProjection[mapped] { + return boilerplate.MappedProjection[mapped]{ + Projection: boilerplate.Projection{ + Projection: pf.Projection{ + Field: field, + }, + MustExist: mustExist, + }, + Mapped: mapped{type_: type_}, + } + } + originalFields := []iceberg.NestedField{ {ID: 1, Name: "firstKey", Required: true, Type: iceberg.Int64Type{}}, {ID: 2, Name: "secondKey", Required: true, Type: iceberg.StringType{}}, - {ID: 3, Name: "reqVal", Required: true, Type: iceberg.StringType{}}, - {ID: 4, Name: "optVal", Required: false, Type: iceberg.BooleanType{}}, - {ID: 5, Name: "thirdVal", Required: true, Type: iceberg.DecimalTypeOf(38, 0)}, - {ID: 6, Name: "fourthVal", Required: true, Type: &iceberg.ListType{ - ElementID: 7, - Element: iceberg.StringType{}, - }}, + {ID: 3, Name: "val1", Required: true, Type: iceberg.StringType{}}, + {ID: 4, Name: "val2", Required: false, Type: iceberg.BooleanType{}}, + {ID: 5, Name: "dateToStr", Required: true, Type: iceberg.DateType{}}, + {ID: 6, Name: "intToDecimal", Required: false, Type: iceberg.Int64Type{}}, + {ID: 7, Name: "decimalToFloat", Required: true, Type: iceberg.DecimalTypeOf(38, 0)}, + {ID: 8, Name: "timestampToStr", Required: true, Type: iceberg.TimestampTzType{}}, + // A temporary migration column that still exists from a prior failed + // migration, with an incompatible type for the upcoming migration. + {ID: 10, Name: "dateToStr" + migrateFieldSuffix, Required: true, Type: &iceberg.Float64Type{}}, + } + + update := boilerplate.MaterializerBindingUpdate[mapped]{ + NewProjections: []boilerplate.MappedProjection[mapped]{mappedProjection("new", false, iceberg.StringType{})}, + NewlyNullableFields: []boilerplate.ExistingField{{Name: "dateToStr"}, {Name: "dateToStr" + migrateFieldSuffix}}, + FieldsToMigrate: []boilerplate.MigrateField[mapped]{ + { + From: boilerplate.ExistingField{Name: "dateToStr"}, + To: mappedProjection("dateToStr", false, iceberg.StringType{}), + }, + { + From: boilerplate.ExistingField{Name: "intToDecimal"}, + To: mappedProjection("intToDecimal", false, iceberg.DecimalTypeOf(38, 0)), + }, + { + From: boilerplate.ExistingField{Name: "decimalToFloat"}, + To: mappedProjection("decimalToFloat", true, iceberg.Float64Type{}), + }, + { + From: boilerplate.ExistingField{Name: "timestampToStr"}, + To: mappedProjection("timestampToStr", false, iceberg.StringType{}), + }, + }, } originalSchema := iceberg.NewSchemaWithIdentifiers(1, []int{1, 2}, originalFields...) - require.NotEqual(t, originalSchema.HighestFieldID(), getHighestFieldID(originalSchema)) - require.Equal(t, 7, getHighestFieldID(originalSchema)) + nextSchema := computeSchemaForUpdatedTable(12, originalSchema, update) + afterMigrateSchema := computeSchemaForCompletedMigrations(nextSchema, update.FieldsToMigrate) + + var snap strings.Builder + + snap.WriteString("--- Original Schema ---\n") + snap.WriteString(originalSchema.String()) + snap.WriteString("\n\n") + + snap.WriteString("--- Next Schema ---\n") + snap.WriteString(nextSchema.String()) + snap.WriteString("\n\n") + + snap.WriteString("--- After Migrate Schema ---\n") + snap.WriteString(afterMigrateSchema.String()) + + cupaloy.SnapshotT(t, snap.String()) } diff --git a/tests/materialize/materialize-iceberg/snapshot.json b/tests/materialize/materialize-iceberg/snapshot.json index bd2a886aec..deba90dff2 100644 --- a/tests/materialize/materialize-iceberg/snapshot.json +++ b/tests/materialize/materialize-iceberg/snapshot.json @@ -1,6 +1,6 @@ [ "applied.actionDescription", - "created table \"ci_testing\".\"simple\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: canary: required string (auto-generated projection of JSON at: /canary with inferred types: [string])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"duplicate_keys_standard\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t3: int: optional long (auto-generated projection of JSON at: /int with inferred types: [integer])\n\t4: str: required string (auto-generated projection of JSON at: /str with inferred types: [string])\n\t5: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"multiple_types\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: array_int: optional string (auto-generated projection of JSON at: /array_int with inferred types: [array])\n\t3: binary_field: optional binary (auto-generated projection of JSON at: /binary_field with inferred types: [string])\n\t4: bool_field: optional boolean (auto-generated projection of JSON at: /bool_field with inferred types: [boolean])\n\t5: float_field: optional double (auto-generated projection of JSON at: /float_field with inferred types: [number])\n\t6: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t7: multiple: optional string (auto-generated projection of JSON at: /multiple with inferred types: [array boolean null number object string])\n\t8: nested: optional string (auto-generated projection of JSON at: /nested with inferred types: [object])\n\t9: nullable_int: optional long (auto-generated projection of JSON at: /nullable_int with inferred types: [integer null])\n\t10: str_field: required string (auto-generated projection of JSON at: /str_field with inferred types: [string])\n\t11: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"formatted_strings\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: date: optional date (auto-generated projection of JSON at: /date with inferred types: [string])\n\t3: datetime: optional timestamptz (auto-generated projection of JSON at: /datetime with inferred types: [string])\n\t4: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t5: int_and_str: optional decimal(38, 0) (auto-generated projection of JSON at: /int_and_str with inferred types: [integer string])\n\t6: int_str: optional decimal(38, 0) (auto-generated projection of JSON at: /int_str with inferred types: [string])\n\t7: num_and_str: optional double (auto-generated projection of JSON at: /num_and_str with inferred types: [number string])\n\t8: num_str: optional double (auto-generated projection of JSON at: /num_str with inferred types: [string])\n\t9: time: optional string (auto-generated projection of JSON at: /time with inferred types: [string])\n\t10: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"deletions\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: _meta/op: optional string (auto-generated projection of JSON at: /_meta/op with inferred types: [string])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"binary_key\" as table {\n\t1: id: required binary (auto-generated projection of JSON at: /id with inferred types: [string])\n\t2: counter: optional long (auto-generated projection of JSON at: /counter with inferred types: [integer])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"string_escaped_key\" as table {\n\t1: id: required string (auto-generated projection of JSON at: /id with inferred types: [string])\n\t2: counter: optional long (auto-generated projection of JSON at: /counter with inferred types: [integer])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}" + "created table \"ci_testing\".\"simple\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: canary: required string (auto-generated projection of JSON at: /canary with inferred types: [string])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"duplicate_keys_standard\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t3: int: optional long (auto-generated projection of JSON at: /int with inferred types: [integer])\n\t4: str: required string (auto-generated projection of JSON at: /str with inferred types: [string])\n\t5: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"multiple_types\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: array_int: optional string (auto-generated projection of JSON at: /array_int with inferred types: [array])\n\t3: binary_field: optional binary (auto-generated projection of JSON at: /binary_field with inferred types: [string])\n\t4: bool_field: optional boolean (auto-generated projection of JSON at: /bool_field with inferred types: [boolean])\n\t5: float_field: optional double (auto-generated projection of JSON at: /float_field with inferred types: [number])\n\t6: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t7: multiple: optional string (auto-generated projection of JSON at: /multiple with inferred types: [array boolean null number object string])\n\t8: nested: optional string (auto-generated projection of JSON at: /nested with inferred types: [object])\n\t9: nullable_int: optional long (auto-generated projection of JSON at: /nullable_int with inferred types: [integer null])\n\t10: str_field: required string (auto-generated projection of JSON at: /str_field with inferred types: [string])\n\t11: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"formatted_strings\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: date: optional date (auto-generated projection of JSON at: /date with inferred types: [string])\n\t3: datetime: optional timestamptz (auto-generated projection of JSON at: /datetime with inferred types: [string])\n\t4: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t5: int_and_str: optional decimal(38, 0) (auto-generated projection of JSON at: /int_and_str with inferred types: [integer string])\n\t6: int_str: optional decimal(38, 0) (auto-generated projection of JSON at: /int_str with inferred types: [string])\n\t7: num_and_str: optional double (auto-generated projection of JSON at: /num_and_str with inferred types: [number string])\n\t8: num_str: optional double (auto-generated projection of JSON at: /num_str with inferred types: [string])\n\t9: time: optional string (auto-generated projection of JSON at: /time with inferred types: [string])\n\t10: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"deletions\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: _meta/op: optional string (auto-generated projection of JSON at: /_meta/op with inferred types: [string])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"binary_key\" as table {\n\t1: id: required binary (auto-generated projection of JSON at: /id with inferred types: [string])\n\t2: counter: optional long (auto-generated projection of JSON at: /counter with inferred types: [integer])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"string_escaped_key\" as table {\n\t1: id: required string (auto-generated projection of JSON at: /id with inferred types: [string])\n\t2: counter: optional long (auto-generated projection of JSON at: /counter with inferred types: [integer])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}" ] [ "connectorState", @@ -15,7 +15,7 @@ "updated": { "ci_testing%2Fbinary_key": { "binding": 5, - "query": "MERGE INTO `ci_testing`.`binary_key` AS l\nUSING merge_view_5 AS r\nON \n\tl.`id` = unbase64(r.`id`)\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = unbase64(r.`id`), l.`counter` = r.`counter`, l.`flow_published_at` = r.`flow_published_at`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `counter`, `flow_published_at`, `flow_document`) VALUES (unbase64(r.`id`), r.`counter`, r.`flow_published_at`, r.`flow_document`)\n", + "query": "MERGE INTO `ci_testing`.`binary_key` AS l\nUSING merge_view_5 AS r\nON \n\tl.`id` = UNBASE64(r.`id`)\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = UNBASE64(r.`id`), l.`counter` = r.`counter`, l.`flow_published_at` = r.`flow_published_at`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `counter`, `flow_published_at`, `flow_document`) VALUES (UNBASE64(r.`id`), r.`counter`, r.`flow_published_at`, r.`flow_document`)\n", "columns": [ { "name": "id", @@ -143,7 +143,7 @@ }, "ci_testing%2Fmultiple_types": { "binding": 2, - "query": "MERGE INTO `ci_testing`.`multiple_types` AS l\nUSING merge_view_2 AS r\nON \n\tl.`id` = r.`id` AND l.`id` >= 1 AND l.`id` <= 10\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = r.`id`, l.`array_int` = r.`array_int`, l.`binary_field` = unbase64(r.`binary_field`), l.`bool_field` = r.`bool_field`, l.`float_field` = r.`float_field`, l.`flow_published_at` = r.`flow_published_at`, l.`multiple` = r.`multiple`, l.`nested` = r.`nested`, l.`nullable_int` = r.`nullable_int`, l.`str_field` = r.`str_field`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `array_int`, `binary_field`, `bool_field`, `float_field`, `flow_published_at`, `multiple`, `nested`, `nullable_int`, `str_field`, `flow_document`) VALUES (r.`id`, r.`array_int`, unbase64(r.`binary_field`), r.`bool_field`, r.`float_field`, r.`flow_published_at`, r.`multiple`, r.`nested`, r.`nullable_int`, r.`str_field`, r.`flow_document`)\n", + "query": "MERGE INTO `ci_testing`.`multiple_types` AS l\nUSING merge_view_2 AS r\nON \n\tl.`id` = r.`id` AND l.`id` >= 1 AND l.`id` <= 10\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = r.`id`, l.`array_int` = r.`array_int`, l.`binary_field` = UNBASE64(r.`binary_field`), l.`bool_field` = r.`bool_field`, l.`float_field` = r.`float_field`, l.`flow_published_at` = r.`flow_published_at`, l.`multiple` = r.`multiple`, l.`nested` = r.`nested`, l.`nullable_int` = r.`nullable_int`, l.`str_field` = r.`str_field`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `array_int`, `binary_field`, `bool_field`, `float_field`, `flow_published_at`, `multiple`, `nested`, `nullable_int`, `str_field`, `flow_document`) VALUES (r.`id`, r.`array_int`, UNBASE64(r.`binary_field`), r.`bool_field`, r.`float_field`, r.`flow_published_at`, r.`multiple`, r.`nested`, r.`nullable_int`, r.`str_field`, r.`flow_document`)\n", "columns": [ { "name": "id", @@ -254,7 +254,7 @@ "updated": { "ci_testing%2Fbinary_key": { "binding": 5, - "query": "MERGE INTO `ci_testing`.`binary_key` AS l\nUSING merge_view_5 AS r\nON \n\tl.`id` = unbase64(r.`id`)\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = unbase64(r.`id`), l.`counter` = r.`counter`, l.`flow_published_at` = r.`flow_published_at`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `counter`, `flow_published_at`, `flow_document`) VALUES (unbase64(r.`id`), r.`counter`, r.`flow_published_at`, r.`flow_document`)\n", + "query": "MERGE INTO `ci_testing`.`binary_key` AS l\nUSING merge_view_5 AS r\nON \n\tl.`id` = UNBASE64(r.`id`)\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = UNBASE64(r.`id`), l.`counter` = r.`counter`, l.`flow_published_at` = r.`flow_published_at`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `counter`, `flow_published_at`, `flow_document`) VALUES (UNBASE64(r.`id`), r.`counter`, r.`flow_published_at`, r.`flow_document`)\n", "columns": [ { "name": "id", @@ -382,7 +382,7 @@ }, "ci_testing%2Fmultiple_types": { "binding": 2, - "query": "MERGE INTO `ci_testing`.`multiple_types` AS l\nUSING merge_view_2 AS r\nON \n\tl.`id` = r.`id` AND l.`id` >= 6 AND l.`id` <= 10\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = r.`id`, l.`array_int` = r.`array_int`, l.`binary_field` = unbase64(r.`binary_field`), l.`bool_field` = r.`bool_field`, l.`float_field` = r.`float_field`, l.`flow_published_at` = r.`flow_published_at`, l.`multiple` = r.`multiple`, l.`nested` = r.`nested`, l.`nullable_int` = r.`nullable_int`, l.`str_field` = r.`str_field`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `array_int`, `binary_field`, `bool_field`, `float_field`, `flow_published_at`, `multiple`, `nested`, `nullable_int`, `str_field`, `flow_document`) VALUES (r.`id`, r.`array_int`, unbase64(r.`binary_field`), r.`bool_field`, r.`float_field`, r.`flow_published_at`, r.`multiple`, r.`nested`, r.`nullable_int`, r.`str_field`, r.`flow_document`)\n", + "query": "MERGE INTO `ci_testing`.`multiple_types` AS l\nUSING merge_view_2 AS r\nON \n\tl.`id` = r.`id` AND l.`id` >= 6 AND l.`id` <= 10\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = r.`id`, l.`array_int` = r.`array_int`, l.`binary_field` = UNBASE64(r.`binary_field`), l.`bool_field` = r.`bool_field`, l.`float_field` = r.`float_field`, l.`flow_published_at` = r.`flow_published_at`, l.`multiple` = r.`multiple`, l.`nested` = r.`nested`, l.`nullable_int` = r.`nullable_int`, l.`str_field` = r.`str_field`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `array_int`, `binary_field`, `bool_field`, `float_field`, `flow_published_at`, `multiple`, `nested`, `nullable_int`, `str_field`, `flow_document`) VALUES (r.`id`, r.`array_int`, UNBASE64(r.`binary_field`), r.`bool_field`, r.`float_field`, r.`flow_published_at`, r.`multiple`, r.`nested`, r.`nullable_int`, r.`str_field`, r.`flow_document`)\n", "columns": [ { "name": "id",