diff --git a/materialize-boilerplate/materializer.go b/materialize-boilerplate/materializer.go index 93f68fdbf0..9e32578be8 100644 --- a/materialize-boilerplate/materializer.go +++ b/materialize-boilerplate/materializer.go @@ -78,8 +78,8 @@ type ElementConverter func(tuple.TupleElement) (any, error) // MappedProjection adds materialization-specific type mapping information to a // basic Flow projection, as well as other useful metadata. -type MappedProjection[MT any] struct { - pf.Projection +type MappedProjection[MT MappedTyper] struct { + Projection Comment string Mapped MT } @@ -87,7 +87,7 @@ type MappedProjection[MT any] struct { // MappedBinding is all of the projections for the selected fields of the // materialization, with materialization-specific type mapping and parsed // resource configuration. -type MappedBinding[EC pb.Validator, RC Resourcer[RC, EC], MT any] struct { +type MappedBinding[EC EndpointConfiger, RC Resourcer[RC, EC], MT MappedTyper] struct { pf.MaterializationSpec_Binding Config RC Keys, Values []MappedProjection[MT] @@ -147,12 +147,20 @@ func (mb *MappedBinding[EC, RC, MT]) convertTuple(in tuple.Tuple, offset int, ou // MaterializerBindingUpdate is a distilled representation of the typical kinds // of changes a destination system will care about in response to a new binding // or change to an existing binding. -type MaterializerBindingUpdate[MT any] struct { +type MaterializerBindingUpdate[MT MappedTyper] struct { NewProjections []MappedProjection[MT] NewlyNullableFields []ExistingField + FieldsToMigrate []MigrateField[MT] NewlyDeltaUpdates bool } +// MigrateField is an existing field that must be migrated to be compatible with +// an updated projection. +type MigrateField[MT MappedTyper] struct { + From ExistingField + To MappedProjection[MT] +} + // RuntimeCheckpoint is the raw bytes of a persisted Flow checkpoint. In the // `Opened` response, it will be marshalled into a protocol.Checkpoint. type RuntimeCheckpoint []byte @@ -169,8 +177,20 @@ type MaterializerTransactor interface { m.Transactor } +// EndpointConfiger represents a parsed endpoint config. +type EndpointConfiger interface { + pb.Validator + + // Default namespace is the namespace used for bindings if no explicit + // namespace is configured for the binding. It may also contain metadata + // tables, and needs to exist even if no binding is actually created in it. + // This can return an empty string if namespaces do not apply to the + // materialization. + DefaultNamespace() string +} + // Resourcer represents a parsed resource config. -type Resourcer[T any, EC pb.Validator] interface { +type Resourcer[T any, EC EndpointConfiger] interface { pb.Validator // WithDefaults provides the parsed endpoint config so that any defaults @@ -186,6 +206,7 @@ type Resourcer[T any, EC pb.Validator] interface { // FieldConfiger represents a parsed field config. type FieldConfiger interface { pb.Validator + // CastToString is a common field configuration option that will cause a // field to be converted to a string when it is materialized. CastToString() bool @@ -197,10 +218,10 @@ type FieldConfiger interface { // existing resources to resource specs, and managing the transactions // lifecycle. type Materializer[ - EC pb.Validator, // endpoint config - FC FieldConfiger, // field config - RC Resourcer[RC, EC], // resource config - MT any, // mapped type + EC EndpointConfiger, + FC FieldConfiger, + RC Resourcer[RC, EC], + MT MappedTyper, ] interface { // Config should return a MaterializeCfg with non-defaults populated as // needed. @@ -224,12 +245,15 @@ type Materializer[ // no conversion is needed. MapType(p Projection, fieldCfg FC) (MT, ElementConverter) - // Compatible determines if an existing materialized field as reported in - // the InfoSchema is compatible with a proposed mapped projection. - Compatible(ExistingField, MT) bool - - // DescriptionForType produces a description for a mapped projection. - DescriptionForType(MT) string + // Setup performs extra materialization-specific actions when handling an + // Apply RPC prior to any of the standard actions for resource creation or + // alteration. For example, creating metadata tables that are not + // represented by a binding. It may return a string to describe the actions + // that were taken. + // + // Since Apply is always ran before Open, this effectively runs the Setup + // actions before Open as well. + Setup(context.Context, *InfoSchema) (string, error) // CreateNamespace creates a namespace in the destination system, for // example a "schema" in a SQL database. @@ -260,7 +284,7 @@ type Materializer[ NewMaterializerTransactor(context.Context, pm.Request_Open, InfoSchema, []MappedBinding[EC, RC, MT], *BindingEvents) (MaterializerTransactor, error) } -type NewMaterializerFn[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any] func(context.Context, EC) (Materializer[EC, FC, RC, MT], error) +type NewMaterializerFn[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper] func(context.Context, string, EC) (Materializer[EC, FC, RC, MT], error) // RunSpec produces a spec response from the typical inputs of documentation // url, endpoint config schema, and resource config schema. @@ -277,7 +301,7 @@ func RunSpec(ctx context.Context, req *pm.Request_Spec, docUrl string, endpointS } // RunValidate produces a Validated response for a Validate request. -func RunValidate[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any]( +func RunValidate[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper]( ctx context.Context, req *pm.Request_Validate, newMaterializer NewMaterializerFn[EC, FC, RC, MT], @@ -291,7 +315,7 @@ func RunValidate[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any return nil, err } - materializer, err := newMaterializer(ctx, cfg) + materializer, err := newMaterializer(ctx, req.Name.String(), cfg) if err != nil { return nil, err } @@ -344,29 +368,24 @@ func RunValidate[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any return &pm.Response_Validated{Bindings: out}, nil } -// RunApply produces an Applied response for an Apply request. In addition, the -// initialized Materializer and populated InfoSchema is returned so that -// materializations can do other things after the standard apply actions are -// completed, such as creating additional metadata tables for checkpoints. -func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any]( +// RunApply produces an Applied response for an Apply request. +func RunApply[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper]( ctx context.Context, req *pm.Request_Apply, newMaterializer NewMaterializerFn[EC, FC, RC, MT], -) (T, *InfoSchema, *pm.Response_Applied, error) { - var materializer T +) (*pm.Response_Applied, error) { if err := req.Validate(); err != nil { - return materializer, nil, nil, fmt.Errorf("validating request: %w", err) + return nil, fmt.Errorf("validating request: %w", err) } var endpointCfg EC if err := unmarshalStrict(req.Materialization.ConfigJson, &endpointCfg); err != nil { - return materializer, nil, nil, err + return nil, err } - var err error - nm, err := newMaterializer(ctx, endpointCfg) + materializer, err := newMaterializer(ctx, req.Materialization.Name.String(), endpointCfg) if err != nil { - return materializer, nil, nil, err + return nil, err } // TODO(whb): Some point soon we will have the last committed checkpoint @@ -374,9 +393,6 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, // State + Acknowledge somewhere around here to commit any previously staged // transaction before applying the next spec's updates. - // TODO(whb): This type assertion is not very satisfying but I haven't - // figured out a way to do it more cleanly. - materializer = nm.(T) mCfg := materializer.Config() paths := make([][]string, 0, len(req.Materialization.Bindings)) @@ -386,22 +402,32 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, is := initInfoSchema(mCfg) if err := materializer.PopulateInfoSchema(ctx, paths, is); err != nil { - return materializer, nil, nil, err - } - - computed, err := computeCommonUpdates(req.LastMaterialization, req.Materialization, is) - if err != nil { - return materializer, nil, nil, err + return nil, err } actionDescriptions := []string{} actions := []ActionApplyFn{} + if desc, err := materializer.Setup(ctx, is); err != nil { + return nil, fmt.Errorf("running PreApply: %w", err) + } else if desc != "" { + actionDescriptions = append(actionDescriptions, desc) + } + + common, err := computeCommonUpdates(req.LastMaterialization, req.Materialization, is) + if err != nil { + return nil, err + } + if !mCfg.NoCreateNamespaces { // Create any required namespaces before other actions, which may // include resource creation. Otherwise resources creation may fail due // to namespaces not yet existing. requiredNamespaces := make(map[string]struct{}) + if ns := endpointCfg.DefaultNamespace(); ns != "" { + requiredNamespaces[ns] = struct{}{} + } + for _, b := range req.Materialization.Bindings { path := is.locatePath(b.ResourcePath) if len(path) < 2 { @@ -414,7 +440,7 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, if slices.Contains(is.namespaces, ns) { continue } else if desc, err := materializer.CreateNamespace(ctx, ns); err != nil { - return materializer, nil, nil, err + return nil, err } else { actionDescriptions = append(actionDescriptions, desc) } @@ -428,23 +454,23 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, } } - for _, bindingIdx := range computed.newBindings { + for _, bindingIdx := range common.newBindings { if mapped, err := buildMappedBinding(endpointCfg, materializer, *req.Materialization, bindingIdx); err != nil { - return materializer, nil, nil, err + return nil, err } else if desc, action, err := materializer.CreateResource(ctx, *mapped); err != nil { - return materializer, nil, nil, fmt.Errorf("getting CreateResource action: %w", err) + return nil, fmt.Errorf("getting CreateResource action: %w", err) } else { addAction(desc, action) } } - for _, bindingIdx := range computed.backfillBindings { + for _, bindingIdx := range common.backfillBindings { if deleteDesc, deleteAction, err := materializer.DeleteResource(ctx, req.Materialization.Bindings[bindingIdx].ResourcePath); err != nil { - return materializer, nil, nil, fmt.Errorf("getting DeleteResource action to replace resource: %w", err) + return nil, fmt.Errorf("getting DeleteResource action to replace resource: %w", err) } else if mapped, err := buildMappedBinding(endpointCfg, materializer, *req.Materialization, bindingIdx); err != nil { - return materializer, nil, nil, err + return nil, err } else if createDesc, createAction, err := materializer.CreateResource(ctx, *mapped); err != nil { - return materializer, nil, nil, fmt.Errorf("getting CreateResource action to replace resource: %w", err) + return nil, fmt.Errorf("getting CreateResource action to replace resource: %w", err) } else { addAction(deleteDesc+"\n"+createDesc, func(ctx context.Context) error { if err := deleteAction(ctx); err != nil { @@ -457,18 +483,18 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, } } - for bindingIdx, commonUpdates := range computed.updatedBindings { + for bindingIdx, commonUpdates := range common.updatedBindings { update := MaterializerBindingUpdate[MT]{ NewlyNullableFields: commonUpdates.NewlyNullableFields, NewlyDeltaUpdates: commonUpdates.NewlyDeltaUpdates, } - mapped, err := buildMappedBinding(endpointCfg, materializer, *req.Materialization, bindingIdx) + mb, err := buildMappedBinding(endpointCfg, materializer, *req.Materialization, bindingIdx) if err != nil { - return materializer, nil, nil, err + return nil, err } - ps := mapped.SelectedProjections() + ps := mb.SelectedProjections() for _, p := range commonUpdates.NewProjections { i := slices.IndexFunc(ps, func(pp MappedProjection[MT]) bool { return pp.Field == p.Field @@ -476,23 +502,48 @@ func RunApply[T Materializer[EC, FC, RC, MT], EC pb.Validator, FC FieldConfiger, update.NewProjections = append(update.NewProjections, ps[i]) } - if desc, action, err := materializer.UpdateResource(ctx, mapped.ResourcePath, *is.GetResource(mapped.ResourcePath), update); err != nil { - return materializer, nil, nil, err + existingResource := is.GetResource(mb.ResourcePath) + for _, p := range mb.Values { + if existingField := existingResource.GetField(p.Field); existingField == nil { + continue + } else if !p.Mapped.Compatible(*existingField) && p.Mapped.CanMigrate(*existingField) { + update.FieldsToMigrate = append(update.FieldsToMigrate, MigrateField[MT]{ + From: *existingField, + To: p, + }) + } else if !p.Mapped.Compatible(*existingField) { + // This is mostly a sanity check that some other process (user + // modifications, perhaps) didn't change the type of a column in + // a way that we can't deal with. There is also a scarce chance + // that a specific sequence of events could occur during + // existing field migrations if they use column renaming, where + // the "old" column is dropped and before the "new" column can + // be renamed (if these operations are not atomic) the source + // field type is changed again in some way and a new column is + // created with that source field type. This scenario would + // involve one invoked Apply RPC racing with another, which is + // not likely, but perhaps not totally impossible. + return nil, fmt.Errorf("existing field %q has incompatible type %q that cannot be migrated", p.Field, p.Mapped.String()) + } + } + + if desc, action, err := materializer.UpdateResource(ctx, mb.ResourcePath, *is.GetResource(mb.ResourcePath), update); err != nil { + return nil, err } else { addAction(desc, action) } } if err := runActions(ctx, actions, actionDescriptions, mCfg.ConcurrentApply); err != nil { - return materializer, nil, nil, err + return nil, err } - return materializer, is, &pm.Response_Applied{ActionDescription: strings.Join(actionDescriptions, "\n")}, nil + return &pm.Response_Applied{ActionDescription: strings.Join(actionDescriptions, "\n")}, nil } // RunNewTransactor builds a transactor and Opened response from an Open // request. -func RunNewTransactor[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any]( +func RunNewTransactor[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper]( ctx context.Context, req pm.Request_Open, be *BindingEvents, @@ -507,7 +558,7 @@ func RunNewTransactor[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], M return nil, nil, nil, err } - materializer, err := newMaterializer(ctx, epCfg) + materializer, err := newMaterializer(ctx, req.Materialization.Name.String(), epCfg) if err != nil { return nil, nil, nil, err } @@ -566,7 +617,7 @@ func initInfoSchema(cfg MaterializeCfg) *InfoSchema { return NewInfoSchema(locatePath, translateField) } -func buildMappedBinding[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any]( +func buildMappedBinding[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper]( endpointCfg EC, materializer Materializer[EC, FC, RC, MT], spec pf.MaterializationSpec, @@ -595,9 +646,10 @@ func buildMappedBinding[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], } } - mt, converter := materializer.MapType(mapProjection(*p, fieldCfg), fieldCfg) + mp := mapProjection(*p, fieldCfg) + mt, converter := materializer.MapType(mp, fieldCfg) *dst = append(*dst, MappedProjection[MT]{ - Projection: *p, + Projection: mp, Comment: commentForProjection(*p), Mapped: mt, }) @@ -630,8 +682,14 @@ func commentForProjection(p pf.Projection) string { if p.Explicit { source = "user-provided" } - out = fmt.Sprintf("%s projection of JSON at: %s with inferred types: %s", - source, p.Ptr, p.Inference.Types) + + if p.Ptr == "" { + out = fmt.Sprintf("%s projection of the root document with inferred types: %s", + source, p.Inference.Types) + } else { + out = fmt.Sprintf("%s projection of JSON at: %s with inferred types: %s", + source, p.Ptr, p.Inference.Types) + } if p.Inference.Description != "" { out = p.Inference.Description + "\n" + out @@ -645,7 +703,7 @@ func commentForProjection(p pf.Projection) string { // constrainterAdapter is purely a shim between the new Materializer interface // and the old Constrainter interface. Eventually these should be consolidated. -type constrainterAdapter[EC pb.Validator, FC FieldConfiger, RC Resourcer[RC, EC], MT any] struct { +type constrainterAdapter[EC EndpointConfiger, FC FieldConfiger, RC Resourcer[RC, EC], MT MappedTyper] struct { m Materializer[EC, FC, RC, MT] } @@ -670,7 +728,7 @@ func (c *constrainterAdapter[EC, FC, RC, MT]) Compatible(existing ExistingField, } mt, _ := c.m.MapType(mapProjection(*p, fieldCfg), fieldCfg) - return c.m.Compatible(existing, mt), nil + return mt.Compatible(existing) || mt.CanMigrate(existing), nil } func (c *constrainterAdapter[EC, FC, RC, MT]) DescriptionForType(p *pf.Projection, rawFieldConfig json.RawMessage) (string, error) { @@ -682,7 +740,7 @@ func (c *constrainterAdapter[EC, FC, RC, MT]) DescriptionForType(p *pf.Projectio } mt, _ := c.m.MapType(mapProjection(*p, fieldCfg), fieldCfg) - return c.m.DescriptionForType(mt), nil + return mt.String(), nil } func unmarshalStrict[T pb.Validator](raw []byte, into *T) error { diff --git a/materialize-boilerplate/type_mapping.go b/materialize-boilerplate/type_mapping.go index 88f07982c9..16349cf2d1 100644 --- a/materialize-boilerplate/type_mapping.go +++ b/materialize-boilerplate/type_mapping.go @@ -70,6 +70,20 @@ func (FlatTypeString) isFlatType() {} func (FlatTypeStringFormatInteger) isFlatType() {} func (FlatTypeStringFormatNumber) isFlatType() {} +type MappedTyper interface { + // String produces a human readable description of the mapped type, which + // will be included in validation constraint descriptions. + String() string + + // Compatible determines if an existing materialized field as reported in + // the InfoSchema is compatible with a proposed mapped type. + Compatible(ExistingField) bool + + // CanMigrate determines if an existing materialized field can be migrated + // to be compatible with the updated mapped type. + CanMigrate(ExistingField) bool +} + // Projection lifts a pf.Projection into a form that's more easily worked with // for materialization-specific type mapping. type Projection struct { @@ -78,6 +92,24 @@ type Projection struct { MustExist bool } +const AnyExistingType string = "*" + +// TypeMigrations is a utility struct for defining which existing fields can be +// migrated to be compatible with updated projections. +type TypeMigrations[T comparable] map[string][]T + +func (m TypeMigrations[T]) CanMigrate(from string, to T) bool { + for f, ts := range m { + if f == from || f == AnyExistingType { + if slices.Contains(ts, to) { + return true + } + } + } + + return false +} + func mapProjection(p pf.Projection, fc FieldConfiger) Projection { mustExist := p.Inference.Exists == pf.Inference_MUST && !slices.Contains(p.Inference.Types, "null") typesWithoutNull := getTypesWithoutNull(p.Inference.Types) diff --git a/materialize-iceberg/.snapshots/TestComputeSchemas b/materialize-iceberg/.snapshots/TestComputeSchemas new file mode 100644 index 0000000000..c402ce8670 --- /dev/null +++ b/materialize-iceberg/.snapshots/TestComputeSchemas @@ -0,0 +1,42 @@ +--- Original Schema --- +table { + 1: firstKey: required long + 2: secondKey: required string + 3: val1: required string + 4: val2: optional boolean + 5: dateToStr: required date + 6: intToDecimal: optional long + 7: decimalToFloat: required decimal(38, 0) + 8: timestampToStr: required timestamptz + 10: dateToStr_flow_tmp: required double +} + +--- Next Schema --- +table { + 1: firstKey: required long + 2: secondKey: required string + 3: val1: required string + 4: val2: optional boolean + 5: dateToStr: optional date + 6: intToDecimal: optional long + 7: decimalToFloat: required decimal(38, 0) + 8: timestampToStr: required timestamptz + 13: new: optional string + 14: dateToStr_flow_tmp: optional string + 15: intToDecimal_flow_tmp: optional decimal(38, 0) + 16: decimalToFloat_flow_tmp: required double + 17: timestampToStr_flow_tmp: optional string +} + +--- After Migrate Schema --- +table { + 1: firstKey: required long + 2: secondKey: required string + 3: val1: required string + 4: val2: optional boolean + 13: new: optional string + 14: dateToStr: optional string + 15: intToDecimal: optional decimal(38, 0) + 16: decimalToFloat: required double + 17: timestampToStr: optional string +} diff --git a/materialize-iceberg/.snapshots/TestTemplates b/materialize-iceberg/.snapshots/TestTemplates index 5b7bf59608..66e57b1548 100644 --- a/materialize-iceberg/.snapshots/TestTemplates +++ b/materialize-iceberg/.snapshots/TestTemplates @@ -3,7 +3,7 @@ SELECT 0, l.`flow_document` FROM `foo`.`bar` AS l JOIN load_view_0 AS r ON l.`first-key` = r.`first-key` AND l.`first-key` >= 1 AND l.`first-key` <= 10 - AND l.`second-key` = unbase64(r.`second-key`) + AND l.`second-key` = UNBASE64(r.`second-key`) AND l.`third-key` = r.`third-key` AND l.`third-key` >= 'aaaSomeString' AND l.`third-key` <= 'zzzSomeString' --- End load query --- @@ -12,9 +12,17 @@ MERGE INTO `foo`.`bar` AS l USING merge_view_0 AS r ON l.`first-key` = r.`first-key` AND l.`first-key` >= 1 AND l.`first-key` <= 10 - AND l.`second-key` = unbase64(r.`second-key`) + AND l.`second-key` = UNBASE64(r.`second-key`) AND l.`third-key` = r.`third-key` AND l.`third-key` >= 'aaaSomeString' AND l.`third-key` <= 'zzzSomeString' WHEN MATCHED AND r.`flow_document` = '"delete"' THEN DELETE -WHEN MATCHED THEN UPDATE SET l.`first-key` = r.`first-key`, l.`second-key` = unbase64(r.`second-key`), l.`third-key` = r.`third-key`, l.`first-val` = r.`first-val`, l.`second-val` = r.`second-val`, l.`third-val` = r.`third-val`, l.`fourth-val` = r.`fourth-val`, l.`flow_document` = r.`flow_document` -WHEN NOT MATCHED AND r.`flow_document` != '"delete"' THEN INSERT (`first-key`, `second-key`, `third-key`, `first-val`, `second-val`, `third-val`, `fourth-val`, `flow_document`) VALUES (r.`first-key`, unbase64(r.`second-key`), r.`third-key`, r.`first-val`, r.`second-val`, r.`third-val`, r.`fourth-val`, r.`flow_document`) +WHEN MATCHED THEN UPDATE SET l.`first-key` = r.`first-key`, l.`second-key` = UNBASE64(r.`second-key`), l.`third-key` = r.`third-key`, l.`first-val` = r.`first-val`, l.`second-val` = r.`second-val`, l.`third-val` = UNBASE64(r.`third-val`), l.`fourth-val` = r.`fourth-val`, l.`flow_document` = r.`flow_document` +WHEN NOT MATCHED AND r.`flow_document` != '"delete"' THEN INSERT (`first-key`, `second-key`, `third-key`, `first-val`, `second-val`, `third-val`, `fourth-val`, `flow_document`) VALUES (r.`first-key`, UNBASE64(r.`second-key`), r.`third-key`, r.`first-val`, r.`second-val`, UNBASE64(r.`third-val`), r.`fourth-val`, r.`flow_document`) --- End merge query --- + +--- Begin migrate query --- +UPDATE `some`.`table` +SET + `long_to_decimal_flow_tmp` = CAST(`long_to_decimal` AS decimal(38, 0)), + `datetime_to_string_flow_tmp` = DATE_FORMAT(`datetime_to_string`, 'yyyy-MM-dd\'T\'HH:mm:ss.SSSSSS\'Z\''), + `binary_to_string_flow_tmp` = BASE64(`binary_to_string`) +--- End migrate query --- diff --git a/materialize-iceberg/.snapshots/TestValidateAndApply b/materialize-iceberg/.snapshots/TestValidateAndApply index b36f5e8a58..e4a310ae30 100644 --- a/materialize-iceberg/.snapshots/TestValidateAndApply +++ b/materialize-iceberg/.snapshots/TestValidateAndApply @@ -82,21 +82,21 @@ Big Schema Changed Types Constraints: {"Field":"boolField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'boolField' is already being materialized as endpoint type 'BOOLEAN' but endpoint type 'LONG' is required by its schema '{ type: [integer] }'"} {"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} {"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"intField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'intField' is already being materialized as endpoint type 'LONG' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} {"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} {"Field":"numField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'numField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'BOOLEAN' is required by its schema '{ type: [boolean] }'"} {"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringDateField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateField' is already being materialized as endpoint type 'DATE' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} -{"Field":"stringDateTimeField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateTimeField' is already being materialized as endpoint type 'TIMESTAMPTZ' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"stringDateTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringDurationField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringField' is already being materialized as endpoint type 'STRING' but endpoint type 'LONG' is required by its schema '{ type: [integer] }'"} {"Field":"stringHostnameField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIdnEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIdnHostnameField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringIntegerField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringIntegerField' is already being materialized as endpoint type 'DECIMAL(38, 0)' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"stringIntegerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIpv4Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIpv6Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringIriField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} @@ -104,12 +104,12 @@ Big Schema Changed Types Constraints: {"Field":"stringJsonPointerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringMacAddr8Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringMacAddrField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringNumberField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringNumberField' is already being materialized as endpoint type 'DOUBLE' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"stringNumberField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringRegexField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringRelativeJsonPointerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringUint32Field","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringUint32Field' is already being materialized as endpoint type 'DECIMAL(38, 0)' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} -{"Field":"stringUint64Field","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringUint64Field' is already being materialized as endpoint type 'DECIMAL(38, 0)' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"} +{"Field":"stringUint32Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"stringUint64Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringUriField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringUriReferenceField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringUriTemplateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} @@ -152,7 +152,7 @@ table { 33: stringUriReferenceField: required string (auto-generated projection of JSON at: /stringUriReferenceField with inferred types: [string]) 34: stringUriTemplateField: required string (auto-generated projection of JSON at: /stringUriTemplateField with inferred types: [string]) 35: stringUuidField: required string (auto-generated projection of JSON at: /stringUuidField with inferred types: [string]) - 36: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object]) + 36: flow_document: required string (auto-generated projection of the root document with inferred types: [object]) } Big Schema Materialized Resource Schema With No Fields Required: @@ -192,7 +192,7 @@ table { 33: stringUriReferenceField: optional string (auto-generated projection of JSON at: /stringUriReferenceField with inferred types: [string]) 34: stringUriTemplateField: optional string (auto-generated projection of JSON at: /stringUriTemplateField with inferred types: [string]) 35: stringUuidField: optional string (auto-generated projection of JSON at: /stringUuidField with inferred types: [string]) - 36: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object]) + 36: flow_document: required string (auto-generated projection of the root document with inferred types: [object]) } Big Schema Changed Types With Table Replacement Constraints: @@ -272,7 +272,7 @@ table { 34: stringUriReferenceField: required string (auto-generated projection of JSON at: /stringUriReferenceField with inferred types: [string]) 35: stringUriTemplateField: required string (auto-generated projection of JSON at: /stringUriTemplateField with inferred types: [string]) 36: stringUuidField: required string (auto-generated projection of JSON at: /stringUuidField with inferred types: [string]) - 37: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object]) + 37: flow_document: required string (auto-generated projection of the root document with inferred types: [object]) } add a single field: @@ -288,7 +288,7 @@ table { 9: requiredInteger: required long (auto-generated projection of JSON at: /requiredInteger with inferred types: [integer]) 10: requiredObject: required string (auto-generated projection of JSON at: /requiredObject with inferred types: [object]) 11: requiredString: required string (auto-generated projection of JSON at: /requiredString with inferred types: [string]) - 12: flow_document: required string (user-provided projection of JSON at: with inferred types: [object]) + 12: flow_document: required string (user-provided projection of the root document with inferred types: [object]) 13: addedOptionalString: optional string (auto-generated projection of JSON at: /addedOptionalString with inferred types: [string]) } @@ -305,7 +305,7 @@ table { 9: requiredInteger: required long (auto-generated projection of JSON at: /requiredInteger with inferred types: [integer]) 10: requiredObject: required string (auto-generated projection of JSON at: /requiredObject with inferred types: [object]) 11: requiredString: required string (auto-generated projection of JSON at: /requiredString with inferred types: [string]) - 12: flow_document: required string (user-provided projection of JSON at: with inferred types: [object]) + 12: flow_document: required string (user-provided projection of the root document with inferred types: [object]) } remove a single required field: @@ -321,7 +321,7 @@ table { 9: requiredInteger: required long (auto-generated projection of JSON at: /requiredInteger with inferred types: [integer]) 10: requiredObject: required string (auto-generated projection of JSON at: /requiredObject with inferred types: [object]) 11: requiredString: optional string (auto-generated projection of JSON at: /requiredString with inferred types: [string]) - 12: flow_document: required string (user-provided projection of JSON at: with inferred types: [object]) + 12: flow_document: required string (user-provided projection of the root document with inferred types: [object]) } add and remove many fields: @@ -337,7 +337,7 @@ table { 9: requiredInteger: required long (auto-generated projection of JSON at: /requiredInteger with inferred types: [integer]) 10: requiredObject: optional string (auto-generated projection of JSON at: /requiredObject with inferred types: [object]) 11: requiredString: optional string (auto-generated projection of JSON at: /requiredString with inferred types: [string]) - 12: flow_document: required string (user-provided projection of JSON at: with inferred types: [object]) + 12: flow_document: required string (user-provided projection of the root document with inferred types: [object]) 13: addedOptionalString: optional string (auto-generated projection of JSON at: /addedOptionalString with inferred types: [string]) 14: addedRequiredString: required string (auto-generated projection of JSON at: /addedRequiredString with inferred types: [string]) } @@ -357,6 +357,6 @@ table { 11: value.with-separated_words: optional string (auto-generated projection of JSON at: /value.with-separated_words with inferred types: [string]) 12: value.with.separated.words: optional string (auto-generated projection of JSON at: /value.with.separated.words with inferred types: [string]) 13: value_with_separated_words: optional string (auto-generated projection of JSON at: /value_with_separated_words with inferred types: [string]) - 14: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object]) + 14: flow_document: required string (auto-generated projection of the root document with inferred types: [object]) } diff --git a/materialize-iceberg/config.go b/materialize-iceberg/config.go index daea2c77e8..144d42be99 100644 --- a/materialize-iceberg/config.go +++ b/materialize-iceberg/config.go @@ -95,6 +95,10 @@ func (c config) Validate() error { return nil } +func (c config) DefaultNamespace() string { + return sanitizePath(c.Namespace)[0] +} + type catalogAuthType string const ( diff --git a/materialize-iceberg/driver.go b/materialize-iceberg/driver.go index 462d31f92f..b1f9287997 100644 --- a/materialize-iceberg/driver.go +++ b/materialize-iceberg/driver.go @@ -12,10 +12,10 @@ import ( "slices" "strings" "sync" + "time" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" - emr "github.com/aws/aws-sdk-go-v2/service/emrserverless" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/ssm" "github.com/aws/aws-sdk-go/aws" @@ -74,19 +74,7 @@ func (Driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Respo } func (Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response_Applied, error) { - mtz, _, res, err := boilerplate.RunApply[*materialization](ctx, req, newMaterialization) - if err != nil { - return nil, err - } - - if mtz.cfg.CatalogAuthentication.CatalogAuthType == catalogAuthTypeClientCredential { - secretName := clientCredSecretName(mtz.cfg.Compute.SystemsManagerPrefix, req.Materialization.Name.String()) - if err := ensureEmrSecret(ctx, mtz.ssmClient, secretName, mtz.cfg.CatalogAuthentication.Credential); err != nil { - return nil, fmt.Errorf("resolving client credential secret for EMR in Systems Manager: %w", err) - } - } - - return res, nil + return boilerplate.RunApply(ctx, req, newMaterialization) } func (Driver) NewTransactor(ctx context.Context, req pm.Request_Open, be *boilerplate.BindingEvents) (m.Transactor, *pm.Response_Opened, *boilerplate.MaterializeOptions, error) { @@ -97,13 +85,15 @@ type materialization struct { cfg config catalog *catalog.Catalog s3Client *s3.Client - emrClient *emr.Client ssmClient *ssm.Client + emrClient *emrClient + templates templates + pyFiles *pyFileURIs // populated in Setup and NewMaterializerTransactor } var _ boilerplate.Materializer[config, fieldConfig, resource, mapped] = &materialization{} -func newMaterialization(ctx context.Context, cfg config) (boilerplate.Materializer[config, fieldConfig, resource, mapped], error) { +func newMaterialization(ctx context.Context, materializationName string, cfg config) (boilerplate.Materializer[config, fieldConfig, resource, mapped], error) { catalog, err := cfg.toCatalog(ctx) if err != nil { return nil, fmt.Errorf("creating catalog: %w", err) @@ -114,22 +104,32 @@ func newMaterialization(ctx context.Context, cfg config) (boilerplate.Materializ return nil, fmt.Errorf("creating S3 client: %w", err) } - emr, err := cfg.toEmrClient(ctx) + ssmClient, err := cfg.toSsmClient(ctx) if err != nil { return nil, fmt.Errorf("creating EMR client: %w", err) } - ssmClient, err := cfg.toSsmClient(ctx) + emr, err := cfg.toEmrClient(ctx) if err != nil { return nil, fmt.Errorf("creating EMR client: %w", err) } return &materialization{ - cfg: cfg, - catalog: catalog, - s3Client: s3, - emrClient: emr, + cfg: cfg, + catalog: catalog, + s3Client: s3, + emrClient: &emrClient{ + cfg: cfg.Compute.emrConfig, + catalogAuth: cfg.CatalogAuthentication, + catalogURL: cfg.URL, + warehouse: cfg.Warehouse, + materializationName: materializationName, + c: emr, + s3Client: s3, + ssmClient: ssmClient, + }, ssmClient: ssmClient, + templates: parseTemplates(), }, nil } @@ -149,14 +149,6 @@ func (d *materialization) Config() boilerplate.MaterializeCfg { } } -func (d *materialization) SetResourceDefaults(c config, res resource) resource { - if res.Namespace == "" { - res.Namespace = c.Namespace - } - - return res -} - func (d *materialization) PopulateInfoSchema(ctx context.Context, resourcePaths [][]string, is *boilerplate.InfoSchema) error { relevantPaths := make(map[string][]string) for _, path := range resourcePaths { @@ -198,7 +190,7 @@ func (d *materialization) PopulateInfoSchema(ctx context.Context, resourcePaths res.PushField(boilerplate.ExistingField{ Name: f.Name, Nullable: !f.Required, - Type: f.Type.Type(), + Type: f.Type.String(), HasDefault: f.WriteDefault != nil, }) } @@ -234,7 +226,7 @@ func (d *materialization) PopulateInfoSchema(ctx context.Context, resourcePaths func (d *materialization) CheckPrerequisites(ctx context.Context) *cerrors.PrereqErr { errs := &cerrors.PrereqErr{} - checkEmrPrereqs(ctx, d, errs) + d.emrClient.checkPrereqs(ctx, errs) bucket := d.cfg.Compute.Bucket bucketWithPath := path.Join(bucket, d.cfg.Compute.BucketPath) @@ -334,12 +326,20 @@ func (d *materialization) MapType(p boilerplate.Projection, fc fieldConfig) (map return mapProjection(p) } -func (d *materialization) Compatible(existing boilerplate.ExistingField, proposed mapped) bool { - return strings.EqualFold(existing.Type, string(proposed.type_.String())) -} +func (d *materialization) Setup(ctx context.Context, is *boilerplate.InfoSchema) (string, error) { + if d.cfg.CatalogAuthentication.CatalogAuthType == catalogAuthTypeClientCredential { + if err := d.emrClient.ensureSecret(ctx, d.cfg.CatalogAuthentication.Credential); err != nil { + return "", err + } + } + + pyFiles, err := d.putPyFiles(ctx) + if err != nil { + return "", fmt.Errorf("putting PySpark scripts: %w", err) + } + d.pyFiles = pyFiles -func (d *materialization) DescriptionForType(prop mapped) string { - return string(prop.type_.String()) + return "", nil } func (d *materialization) CreateNamespace(ctx context.Context, ns string) (string, error) { @@ -385,25 +385,98 @@ func (d *materialization) UpdateResource( existing boilerplate.ExistingResource, update boilerplate.MaterializerBindingUpdate[mapped], ) (string, boilerplate.ActionApplyFn, error) { - if len(update.NewProjections) == 0 && len(update.NewlyNullableFields) == 0 { + if len(update.NewProjections) == 0 && len(update.NewlyNullableFields) == 0 && len(update.FieldsToMigrate) == 0 { return "", nil, nil } ns := resourcePath[0] name := resourcePath[1] - current := existing.Meta.(table.Metadata).CurrentSchema() - next := computeSchemaForUpdatedTable(current, update) + table := existing.Meta.(table.Metadata) + current := table.CurrentSchema() + next := computeSchemaForUpdatedTable(table.LastColumnID(), current, update) reqs := []catalog.TableRequirement{catalog.AssertCurrentSchemaID(current.ID)} upds := []catalog.TableUpdate{catalog.AddSchemaUpdate(next), catalog.SetCurrentSchemaUpdate(next.ID)} + action := fmt.Sprintf("updated table %q.%q schema from %s to %s", ns, name, current.String(), next.String()) + + var afterMigrateReqs []catalog.TableRequirement + var afterMigrateUpds []catalog.TableUpdate + if len(update.FieldsToMigrate) > 0 { + // If there are migrations, the table is initially updated to |next|, + // which will add temporary columns that existing data will be cast + // into. After the compute job is run to cast the existing column data + // into the temporary column, the table is updated to |afterMigrate|, + // which will delete the original columns and rename the temporary + // columns to be the name of the original columns in an atomic schema + // update. + // + // The AssertSchemaID requirements throughout these two table updates + // ensure that no other process comes in and modifies the table schema + // while this is happening. If the schema IDs aren't as expected, the + // connector will crash and start over, which should work fine. + afterMigrate := computeSchemaForCompletedMigrations(next, update.FieldsToMigrate) + afterMigrateReqs = []catalog.TableRequirement{catalog.AssertCurrentSchemaID(next.ID)} + afterMigrateUpds = []catalog.TableUpdate{catalog.AddSchemaUpdate(afterMigrate), catalog.SetCurrentSchemaUpdate(afterMigrate.ID)} + action = fmt.Sprintf("updated table %q.%q schema from %s to %s", ns, name, current.String(), afterMigrate.String()) + } + + return action, func(ctx context.Context) error { + if err := d.catalog.UpdateTable(ctx, ns, name, reqs, upds); err != nil { + return err + } + + if len(update.FieldsToMigrate) > 0 { + input := migrateInput{ + ResourcePath: resourcePath, + Migrations: make([]migrateColumn, 0, len(update.FieldsToMigrate)), + } + for _, migration := range update.FieldsToMigrate { + input.Migrations = append(input.Migrations, migrateColumn{ + Field: migration.From.Name, + FromType: migration.From.Type, + TargetType: migration.To.Mapped.type_, + }) + } - return fmt.Sprintf("updated table %q.%q schema from %s to %s", ns, name, current.String(), next.String()), func(ctx context.Context) error { - return d.catalog.UpdateTable(ctx, ns, name, reqs, upds) + var q strings.Builder + if err := d.templates.migrateQuery.Execute(&q, input); err != nil { + return err + } + + outputPrefix := path.Join(d.emrClient.cfg.BucketPath, uuid.NewString()) + defer func() { + if err := cleanPrefixOnceFn(ctx, d.s3Client, d.emrClient.cfg.Bucket, outputPrefix)(); err != nil { + log.WithError(err).Warn("failed to clean up status file after running a column migration job") + } + }() + + ll := log.WithFields(log.Fields{ + "table": fmt.Sprintf("%s.%s", ns, name), + "numColumns": len(update.FieldsToMigrate), + }) + ll.Info("running column migration job") + ts := time.Now() + if err := d.emrClient.runJob( + ctx, + python.ExecInput{Query: q.String()}, + d.pyFiles.exec, + d.pyFiles.common, + fmt.Sprintf("column migration for: %s", d.emrClient.materializationName), + outputPrefix, + ); err != nil { + return fmt.Errorf("failed to run column migration job: %w", err) + } + ll.WithField("took", time.Since(ts).String()).Info("column migration job complete") + + if err := d.catalog.UpdateTable(ctx, ns, name, afterMigrateReqs, afterMigrateUpds); err != nil { + return fmt.Errorf("failed to update table %s.%s after migration job: %w", ns, name, err) + } + + } + + return nil }, nil } -//go:embed python -var pyFilesFS embed.FS - func (d *materialization) NewMaterializerTransactor( ctx context.Context, req pm.Request_Open, @@ -411,65 +484,24 @@ func (d *materialization) NewMaterializerTransactor( mappedBindings []boilerplate.MappedBinding[config, resource, mapped], be *boilerplate.BindingEvents, ) (boilerplate.MaterializerTransactor, error) { + pyFiles, err := d.putPyFiles(ctx) + if err != nil { + return nil, fmt.Errorf("putting PySpark scripts: %w", err) + } + storageClient := newFileClient(d.s3Client, d.cfg.Compute.emrConfig.Bucket) t := &transactor{ - materializationName: req.Materialization.Name.String(), + materializationName: d.emrClient.materializationName, be: be, cfg: d.cfg, s3Client: d.s3Client, emrClient: d.emrClient, - templates: parseTemplates(), + templates: d.templates, bindings: make([]binding, 0, len(mappedBindings)), loadFiles: boilerplate.NewStagedFiles(storageClient, fileSizeLimit, d.cfg.Compute.emrConfig.BucketPath, false, false), storeFiles: boilerplate.NewStagedFiles(storageClient, fileSizeLimit, d.cfg.Compute.emrConfig.BucketPath, true, true), - } - - if d.cfg.CatalogAuthentication.CatalogAuthType == catalogAuthTypeClientCredential { - t.emrAuth.credentialSecretName = clientCredSecretName(d.cfg.Compute.SystemsManagerPrefix, req.Materialization.Name.String()) - t.emrAuth.scope = d.cfg.CatalogAuthentication.Scope - } - - // PySpark scripts are uploaded to the staging bucket metadata location - // under a prefix that include a hash of their contents. If the scripts are - // changed because of connector updates a new set of files will be written, - // but this is not expected to happen very often so the number of excessive - // files should be minimal. - pyFiles := []struct { - name string - tProp *string - }{ - {name: "common", tProp: &t.pyFiles.commonURI}, - {name: "load", tProp: &t.pyFiles.loadURI}, - {name: "merge", tProp: &t.pyFiles.mergeURI}, - } - pyBytes := make([][]byte, 0, len(pyFiles)) - hasher := sha256.New() - - for _, pf := range pyFiles { - bs, err := pyFilesFS.ReadFile(fmt.Sprintf("python/%s.py", pf.name)) - if err != nil { - return nil, fmt.Errorf("reading embedded python script %s: %w", pf.name, err) - } - if _, err := hasher.Write(bs); err != nil { - return nil, fmt.Errorf("computing hash of embedded python script %s: %w", pf.name, err) - } - pyBytes = append(pyBytes, bs) - } - - fullMetaPrefix := path.Join(d.cfg.Compute.BucketPath, metadataPrefix, "pySparkFiles", fmt.Sprintf("%x", hasher.Sum(nil))) - for idx, pf := range pyFiles { - key := path.Join(fullMetaPrefix, fmt.Sprintf("%s.py", pf.name)) - if _, err := t.s3Client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(d.cfg.Compute.emrConfig.Bucket), - Key: aws.String(key), - Body: bytes.NewReader(pyBytes[idx]), - }); err != nil { - return nil, fmt.Errorf("uploading %s: %w", pf.name, err) - } - - *pf.tProp = "s3://" + path.Join(d.cfg.Compute.emrConfig.Bucket, key) - log.WithField("uri", *pf.tProp).Debug("uploaded PySpark script") + pyFiles: *pyFiles, } for idx, mapped := range mappedBindings { @@ -504,3 +536,63 @@ func (d *materialization) NewMaterializerTransactor( return t, nil } + +//go:embed python +var pyFilesFS embed.FS + +type pyFileURIs struct { + common string + exec string + load string + merge string +} + +// putPyFiles uploads PySpark scripts to the staging bucket metadata location +// under a prefix that includes a hash of their contents. If the scripts are +// changed because of connector updates a new set of files will be written, but +// this is not expected to happen very often so the number of excessive files +// should be minimal. +func (d *materialization) putPyFiles(ctx context.Context) (*pyFileURIs, error) { + var out pyFileURIs + + pyFiles := []struct { + name string + prop *string + }{ + {name: "common", prop: &out.common}, + {name: "exec", prop: &out.exec}, + {name: "load", prop: &out.load}, + {name: "merge", prop: &out.merge}, + } + pyBytes := make([][]byte, 0, len(pyFiles)) + hasher := sha256.New() + + for _, pf := range pyFiles { + bs, err := pyFilesFS.ReadFile(fmt.Sprintf("python/%s.py", pf.name)) + if err != nil { + return nil, fmt.Errorf("reading embedded python script %s: %w", pf.name, err) + } + if _, err := hasher.Write(bs); err != nil { + return nil, fmt.Errorf("computing hash of embedded python script %s: %w", pf.name, err) + } + pyBytes = append(pyBytes, bs) + } + + fullMetaPrefix := path.Join(d.cfg.Compute.emrConfig.BucketPath, metadataPrefix, "pySparkFiles", fmt.Sprintf("%x", hasher.Sum(nil))) + for idx, pf := range pyFiles { + key := path.Join(fullMetaPrefix, fmt.Sprintf("%s.py", pf.name)) + if _, err := d.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(d.cfg.Compute.emrConfig.Bucket), + Key: aws.String(key), + Body: bytes.NewReader(pyBytes[idx]), + }); err != nil { + return nil, fmt.Errorf("uploading %s: %w", pf.name, err) + } + + *pf.prop = "s3://" + path.Join(d.cfg.Compute.emrConfig.Bucket, key) + log.WithField("uri", *pf.prop).Debug("uploaded PySpark script") + } + + d.pyFiles = &out + return d.pyFiles, nil +} diff --git a/materialize-iceberg/emr.go b/materialize-iceberg/emr.go index 876375e42c..19f14208c1 100644 --- a/materialize-iceberg/emr.go +++ b/materialize-iceberg/emr.go @@ -21,21 +21,28 @@ import ( log "github.com/sirupsen/logrus" ) -func clientCredSecretName(prefix string, materializationName string) string { - return prefix + sanitizeAndAppendHash(materializationName) +type emrClient struct { + cfg emrConfig + catalogAuth catalogAuthConfig + catalogURL string + warehouse string + materializationName string + c *emr.Client + s3Client *s3.Client + ssmClient *ssm.Client } -func checkEmrPrereqs(ctx context.Context, d *materialization, errs *cerrors.PrereqErr) { - if _, err := d.emrClient.ListJobRuns(ctx, &emr.ListJobRunsInput{ - ApplicationId: aws.String(d.cfg.Compute.ApplicationId), +func (e *emrClient) checkPrereqs(ctx context.Context, errs *cerrors.PrereqErr) { + if _, err := e.c.ListJobRuns(ctx, &emr.ListJobRunsInput{ + ApplicationId: aws.String(e.cfg.ApplicationId), MaxResults: aws.Int32(1), }); err != nil { - errs.Err(fmt.Errorf("failed to list job runs for application %q: %w", d.cfg.Compute.ApplicationId, err)) + errs.Err(fmt.Errorf("failed to list job runs for application %q: %w", e.cfg.ApplicationId, err)) } - if d.cfg.CatalogAuthentication.CatalogAuthType == catalogAuthTypeClientCredential { - testParameter := clientCredSecretName(d.cfg.Compute.SystemsManagerPrefix, "test") - if _, err := d.ssmClient.PutParameter(ctx, &ssm.PutParameterInput{ + if e.catalogAuth.CatalogAuthType == catalogAuthTypeClientCredential { + testParameter := e.cfg.SystemsManagerPrefix + "test" + if _, err := e.ssmClient.PutParameter(ctx, &ssm.PutParameterInput{ Name: aws.String(testParameter), Value: aws.String("test"), Type: ssmTypes.ParameterTypeSecureString, @@ -43,7 +50,7 @@ func checkEmrPrereqs(ctx context.Context, d *materialization, errs *cerrors.Prer }); err != nil { errs.Err(fmt.Errorf("failed to put secure string parameter to %s: %w", testParameter, err)) return - } else if _, err := d.ssmClient.GetParameter(ctx, &ssm.GetParameterInput{ + } else if _, err := e.ssmClient.GetParameter(ctx, &ssm.GetParameterInput{ Name: aws.String(testParameter), WithDecryption: aws.Bool(true), }); err != nil { @@ -52,12 +59,11 @@ func checkEmrPrereqs(ctx context.Context, d *materialization, errs *cerrors.Prer } } -func ensureEmrSecret(ctx context.Context, client *ssm.Client, parameterName, wantCred string) error { - res, err := client.GetParameter(ctx, &ssm.GetParameterInput{ - Name: aws.String(parameterName), +func (e *emrClient) ensureSecret(ctx context.Context, wantCred string) error { + res, err := e.ssmClient.GetParameter(ctx, &ssm.GetParameterInput{ + Name: aws.String(e.clientCredSecretName()), WithDecryption: aws.Bool(true), }) - if err != nil { var errNotFound *ssmTypes.ParameterNotFound if !errors.As(err, &errNotFound) { @@ -69,8 +75,8 @@ func ensureEmrSecret(ctx context.Context, client *ssm.Client, parameterName, wan return nil } - _, err = client.PutParameter(ctx, &ssm.PutParameterInput{ - Name: aws.String(parameterName), + _, err = e.ssmClient.PutParameter(ctx, &ssm.PutParameterInput{ + Name: aws.String(e.clientCredSecretName()), Value: aws.String(wantCred), Type: ssmTypes.ParameterTypeSecureString, Overwrite: aws.Bool(true), @@ -84,7 +90,7 @@ func ensureEmrSecret(ctx context.Context, client *ssm.Client, parameterName, wan return nil } -func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, workingPrefix, entryPointUri string) error { +func (e *emrClient) runJob(ctx context.Context, input any, entryPointUri, pyFilesCommonURI, jobName, workingPrefix string) error { /*** Available arguments to the pyspark script: | --input-uri | Input for the program, as an s3 URI, to be parsed by the script | Required | @@ -98,8 +104,8 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w getStatus := func() (*python.StatusOutput, error) { var status python.StatusOutput statusKey := path.Join(workingPrefix, statusFile) - if statusObj, err := t.s3Client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(t.cfg.Compute.Bucket), + if statusObj, err := e.s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(e.cfg.Bucket), Key: aws.String(statusKey), }); err != nil { return nil, fmt.Errorf("reading status object %q: %w", statusKey, err) @@ -112,8 +118,8 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w inputKey := path.Join(workingPrefix, "input.json") if inputBytes, err := encodeInput(input); err != nil { return fmt.Errorf("encoding input: %w", err) - } else if _, err := t.s3Client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(t.cfg.Compute.Bucket), + } else if _, err := e.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(e.cfg.Bucket), Key: aws.String(inputKey), Body: bytes.NewReader(inputBytes), }); err != nil { @@ -121,27 +127,27 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w } args := []string{ - "--input-uri", "s3://" + path.Join(t.cfg.Compute.Bucket, inputKey), - "--status-output", "s3://" + path.Join(t.cfg.Compute.Bucket, workingPrefix, statusFile), - "--catalog-url", t.cfg.URL, - "--warehouse", t.cfg.Warehouse, - "--region", t.cfg.Compute.Region, + "--input-uri", "s3://" + path.Join(e.cfg.Bucket, inputKey), + "--status-output", "s3://" + path.Join(e.cfg.Bucket, workingPrefix, statusFile), + "--catalog-url", e.catalogURL, + "--warehouse", e.warehouse, + "--region", e.cfg.Region, } - if n := t.emrAuth.credentialSecretName; n != "" { - args = append(args, "--credential-secret-name", n) - } - if s := t.emrAuth.scope; s != "" { - args = append(args, "--scope", s) + if e.catalogAuth.CatalogAuthType == catalogAuthTypeClientCredential { + args = append(args, "--credential-secret-name", e.clientCredSecretName()) + if s := e.catalogAuth.Scope; s != "" { + args = append(args, "--scope", s) + } } - start, err := t.emrClient.StartJobRun(ctx, &emr.StartJobRunInput{ - ApplicationId: aws.String(t.cfg.Compute.ApplicationId), + start, err := e.c.StartJobRun(ctx, &emr.StartJobRunInput{ + ApplicationId: aws.String(e.cfg.ApplicationId), ClientToken: aws.String(uuid.NewString()), - ExecutionRoleArn: aws.String(t.cfg.Compute.ExecutionRoleArn), + ExecutionRoleArn: aws.String(e.cfg.ExecutionRoleArn), JobDriver: &emrTypes.JobDriverMemberSparkSubmit{ Value: emrTypes.SparkSubmit{ - SparkSubmitParameters: aws.String(fmt.Sprintf("--py-files %s", t.pyFiles.commonURI)), + SparkSubmitParameters: aws.String(fmt.Sprintf("--py-files %s", pyFilesCommonURI)), EntryPoint: aws.String(entryPointUri), EntryPointArguments: args, }, @@ -154,8 +160,8 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w var runDetails string for { - gotRun, err := t.emrClient.GetJobRun(ctx, &emr.GetJobRunInput{ - ApplicationId: aws.String(t.cfg.Compute.ApplicationId), + gotRun, err := e.c.GetJobRun(ctx, &emr.GetJobRunInput{ + ApplicationId: aws.String(e.cfg.ApplicationId), JobRunId: start.JobRunId, }) if err != nil { @@ -190,6 +196,10 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, w } } +func (e *emrClient) clientCredSecretName() string { + return e.cfg.SystemsManagerPrefix + sanitizeAndAppendHash(e.materializationName) +} + func encodeInput(in any) ([]byte, error) { var buf bytes.Buffer enc := json.NewEncoder(&buf) diff --git a/materialize-iceberg/python/exec.py b/materialize-iceberg/python/exec.py new file mode 100644 index 0000000000..834e1bb6e1 --- /dev/null +++ b/materialize-iceberg/python/exec.py @@ -0,0 +1,22 @@ +from common import ( + common_args, + get_spark_session, + run_with_status, +) + +args = common_args() +spark = get_spark_session(args) + + +def run(input): + query = input["query"] + + try: + spark.sql(query) + except Exception as e: + raise RuntimeError( + f"Running exec query failed:\n{query}\nOriginal Error:\n{str(e)}" + ) from e + + +run_with_status(args, run) diff --git a/materialize-iceberg/python/python.go b/materialize-iceberg/python/python.go index 09caf51bbc..8de281d030 100644 --- a/materialize-iceberg/python/python.go +++ b/materialize-iceberg/python/python.go @@ -6,6 +6,10 @@ type NestedField struct { Element string `json:"element,omitempty"` } +type ExecInput struct { + Query string `json:"query"` +} + type LoadBinding struct { Binding int `json:"binding"` Keys []NestedField `json:"keys"` diff --git a/materialize-iceberg/sqlgen.go b/materialize-iceberg/sqlgen.go index 84d76b31fe..30f4024254 100644 --- a/materialize-iceberg/sqlgen.go +++ b/materialize-iceberg/sqlgen.go @@ -1,13 +1,17 @@ package connector import ( + "fmt" "strings" "text/template" + + "github.com/apache/iceberg-go" ) type templates struct { - loadQuery *template.Template - mergeQuery *template.Template + loadQuery *template.Template + mergeQuery *template.Template + migrateQuery *template.Template } type templateInput struct { @@ -15,15 +19,51 @@ type templateInput struct { Bounds []mergeBound } +type migrateColumn struct { + Field string + FromType string + TargetType iceberg.Type +} + +type migrateInput struct { + ResourcePath []string + Migrations []migrateColumn +} + +func quoteIdentifier(in string) string { return "`" + in + "`" } + func parseTemplates() templates { tpl := template.New("root").Funcs(template.FuncMap{ "QuoteIdentifier": quoteIdentifier, - "TableFQN": tableFQN, + "TableFQN": func(in []string) string { + quotedParts := make([]string, len(in)) + for i, part := range in { + quotedParts[i] = quoteIdentifier(part) + } + return strings.Join(quotedParts, ".") + }, + "IsBinary": func(m mapped) bool { return m.type_.Equals(iceberg.BinaryType{}) }, + "MigrateColumnName": func(f string) string { return f + migrateFieldSuffix }, + "CastSQL": func(m migrateColumn) string { + ident := quoteIdentifier(m.Field) + switch m.FromType { + case "binary": + return fmt.Sprintf("BASE64(%s)", ident) + case "timestamptz": + // timestamptz columns have an internal storage representation + // in UTC so this will always result in a Z timestamp string, + // which is consistent with what you get if you read a + // timestamptz column. + return fmt.Sprintf(`DATE_FORMAT(%s, 'yyyy-MM-dd\'T\'HH:mm:ss.SSSSSS\'Z\'')`, ident) + default: + return fmt.Sprintf("CAST(%s AS %s)", ident, m.TargetType.Type()) + } + }, }) parsed := template.Must(tpl.Parse(` {{ define "maybe_unbase64_rhs" -}} -{{- if $.Mapped.IsBinary -}}unbase64(r.{{ QuoteIdentifier $.Field }}){{ else }}r.{{ QuoteIdentifier $.Field }}{{ end }} +{{- if IsBinary $.Mapped -}}UNBASE64(r.{{ QuoteIdentifier $.Field }}){{ else }}r.{{ QuoteIdentifier $.Field }}{{ end }} {{- end }} {{ define "loadQuery" -}} @@ -62,20 +102,20 @@ WHEN NOT MATCHED AND r.{{ QuoteIdentifier $.Mapped.Document.Field }} != '"delete {{- end -}} ) {{ end }} + +{{ define "migrateQuery" -}} +UPDATE {{ TableFQN $.ResourcePath }} +SET +{{- range $ind, $col := $.Migrations }} + {{- if $ind }}, {{ end }} + {{ QuoteIdentifier (MigrateColumnName $col.Field) }} = {{ CastSQL $col }} +{{- end }} +{{ end }} `)) return templates{ - loadQuery: parsed.Lookup("loadQuery"), - mergeQuery: parsed.Lookup("mergeQuery"), - } -} - -func quoteIdentifier(in string) string { return "`" + in + "`" } - -func tableFQN(in []string) string { - quotedParts := make([]string, len(in)) - for i, part := range in { - quotedParts[i] = quoteIdentifier(part) + loadQuery: parsed.Lookup("loadQuery"), + mergeQuery: parsed.Lookup("mergeQuery"), + migrateQuery: parsed.Lookup("migrateQuery"), } - return strings.Join(quotedParts, ".") } diff --git a/materialize-iceberg/sqlgen_test.go b/materialize-iceberg/sqlgen_test.go index 77bcc1d1bc..b86622f489 100644 --- a/materialize-iceberg/sqlgen_test.go +++ b/materialize-iceberg/sqlgen_test.go @@ -16,20 +16,26 @@ func TestTemplates(t *testing.T) { var snap strings.Builder + makeProjection := func(field string) boilerplate.Projection { + return boilerplate.Projection{ + Projection: pf.Projection{Field: field}, + } + } + keys := []boilerplate.MappedProjection[mapped]{ - {Projection: pf.Projection{Field: "first-key"}}, - {Projection: pf.Projection{Field: "second-key"}, Mapped: mapped{type_: iceberg.BinaryType{}}}, - {Projection: pf.Projection{Field: "third-key"}}, + {Projection: makeProjection("first-key"), Mapped: mapped{iceberg.StringType{}}}, + {Projection: makeProjection("second-key"), Mapped: mapped{iceberg.BinaryType{}}}, + {Projection: makeProjection("third-key"), Mapped: mapped{iceberg.Int64Type{}}}, } values := []boilerplate.MappedProjection[mapped]{ - {Projection: pf.Projection{Field: "first-val"}}, - {Projection: pf.Projection{Field: "second-val"}}, - {Projection: pf.Projection{Field: "third-val"}}, - {Projection: pf.Projection{Field: "fourth-val"}}, + {Projection: makeProjection("first-val"), Mapped: mapped{iceberg.StringType{}}}, + {Projection: makeProjection("second-val"), Mapped: mapped{iceberg.StringType{}}}, + {Projection: makeProjection("third-val"), Mapped: mapped{iceberg.BinaryType{}}}, + {Projection: makeProjection("fourth-val"), Mapped: mapped{iceberg.StringType{}}}, } - doc := &boilerplate.MappedProjection[mapped]{Projection: pf.Projection{Field: "flow_document"}} + doc := &boilerplate.MappedProjection[mapped]{Projection: makeProjection("flow_document"), Mapped: mapped{iceberg.StringType{}}} input := templateInput{ binding: binding{ @@ -60,6 +66,27 @@ func TestTemplates(t *testing.T) { }, } + mInput := migrateInput{ + ResourcePath: []string{"some", "table"}, + Migrations: []migrateColumn{ + { + Field: "long_to_decimal", + FromType: "long", + TargetType: iceberg.DecimalTypeOf(38, 0), + }, + { + Field: "datetime_to_string", + FromType: "timestamptz", + TargetType: iceberg.StringType{}, + }, + { + Field: "binary_to_string", + FromType: "binary", + TargetType: iceberg.StringType{}, + }, + }, + } + snap.WriteString("--- Begin load query ---\n") require.NoError(t, templates.loadQuery.Execute(&snap, input)) snap.WriteString("--- End load query ---") @@ -70,6 +97,12 @@ func TestTemplates(t *testing.T) { require.NoError(t, templates.mergeQuery.Execute(&snap, input)) snap.WriteString("--- End merge query ---") + snap.WriteString("\n\n") + + snap.WriteString("--- Begin migrate query ---\n") + require.NoError(t, templates.migrateQuery.Execute(&snap, mInput)) + snap.WriteString("--- End migrate query ---") + cupaloy.SnapshotT(t, snap.String()) } diff --git a/materialize-iceberg/transactor.go b/materialize-iceberg/transactor.go index 99cd876f79..09be577e86 100644 --- a/materialize-iceberg/transactor.go +++ b/materialize-iceberg/transactor.go @@ -11,7 +11,6 @@ import ( "strings" "sync" - emr "github.com/aws/aws-sdk-go-v2/service/emrserverless" "github.com/aws/aws-sdk-go-v2/service/s3" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go/aws" @@ -50,23 +49,14 @@ type transactor struct { be *boilerplate.BindingEvents cfg config s3Client *s3.Client - emrClient *emr.Client + emrClient *emrClient templates templates bindings []binding loadFiles *boilerplate.StagedFiles storeFiles *boilerplate.StagedFiles - emrAuth struct { - credentialSecretName string - scope string - } - - pyFiles struct { - commonURI string - loadURI string - mergeURI string - } + pyFiles pyFileURIs } func (t *transactor) RecoverCheckpoint(ctx context.Context, spec pf.MaterializationSpec, rangeSpec pf.RangeSpec) (boilerplate.RuntimeCheckpoint, error) { @@ -146,11 +136,11 @@ func (t *transactor) Load(it *m.LoadIterator, loaded func(binding int, doc json. // In addition to removing the staged load keys, the loaded document result // files that are written to the staging location must also be removed. - cleanupResults := t.cleanPrefixOnceFn(ctx, t.cfg.Compute.Bucket, outputPrefix) + cleanupResults := cleanPrefixOnceFn(ctx, t.s3Client, t.cfg.Compute.Bucket, outputPrefix) defer cleanupResults() t.be.StartedEvaluatingLoads() - if err := t.runEmrJob(ctx, fmt.Sprintf("load for: %s", t.materializationName), loadInput, outputPrefix, t.pyFiles.loadURI); err != nil { + if err := t.emrClient.runJob(ctx, loadInput, t.pyFiles.load, t.pyFiles.common, fmt.Sprintf("load for: %s", t.materializationName), outputPrefix); err != nil { return fmt.Errorf("load job failed: %w", err) } else if err := t.loadFiles.CleanupCurrentTransaction(ctx); err != nil { return fmt.Errorf("cleaning up load files: %w", err) @@ -278,10 +268,10 @@ func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error var stateUpdate *pf.ConnectorState if len(mergeInput.Bindings) > 0 { // Make sure the job status output file gets cleaned up. - cleanupStatus := t.cleanPrefixOnceFn(ctx, t.cfg.Compute.Bucket, outputPrefix) + cleanupStatus := cleanPrefixOnceFn(ctx, t.s3Client, t.cfg.Compute.Bucket, outputPrefix) defer cleanupStatus() - if err := t.runEmrJob(ctx, fmt.Sprintf("store for: %s", t.materializationName), mergeInput, outputPrefix, t.pyFiles.mergeURI); err != nil { + if err := t.emrClient.runJob(ctx, mergeInput, t.pyFiles.merge, t.pyFiles.common, fmt.Sprintf("store for: %s", t.materializationName), outputPrefix); err != nil { return nil, fmt.Errorf("store merge job failed: %w", err) } else if err := cleanupStatus(); err != nil { return nil, fmt.Errorf("cleaning up generated job status file: %w", err) @@ -415,14 +405,47 @@ func (t *transactor) loadWorker(ctx context.Context, loaded func(binding int, do return nil } -func (t *transactor) cleanPrefixOnceFn(ctx context.Context, bucket string, prefix string) func() error { +func (t *transactor) extantFiles(ctx context.Context, originalFileUris []string) (map[string]struct{}, error) { + var prefix string + for _, uri := range originalFileUris { + _, key := s3UriToParts(uri) + this := path.Dir(key) + if prefix == "" { + prefix = this + } else if prefix != this { + return nil, fmt.Errorf("files have different prefixes: %q and %q", prefix, this) + } + } + + out := make(map[string]struct{}) + + paginator := s3.NewListObjectsV2Paginator(t.s3Client, &s3.ListObjectsV2Input{ + Bucket: aws.String(t.cfg.Compute.Bucket), + Prefix: aws.String(prefix), + }) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get next page: %w", err) + } + + for _, obj := range page.Contents { + out[fmt.Sprintf("s3://%s/%s", t.cfg.Compute.Bucket, *obj.Key)] = struct{}{} + } + } + + return out, nil +} + +func cleanPrefixOnceFn(ctx context.Context, s3Client *s3.Client, bucket string, prefix string) func() error { didClean := false return func() error { if didClean { return nil } - paginator := s3.NewListObjectsV2Paginator(t.s3Client, &s3.ListObjectsV2Input{ + paginator := s3.NewListObjectsV2Paginator(s3Client, &s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), }) @@ -444,7 +467,7 @@ func (t *transactor) cleanPrefixOnceFn(ctx context.Context, bucket string, prefi thisPage = append(thisPage, s3types.ObjectIdentifier{Key: obj.Key}) } - if _, err := t.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + if _, err := s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(bucket), Delete: &s3types.Delete{Objects: thisPage}, }); err != nil { @@ -456,37 +479,3 @@ func (t *transactor) cleanPrefixOnceFn(ctx context.Context, bucket string, prefi return nil } } - -func (t *transactor) extantFiles(ctx context.Context, originalFileUris []string) (map[string]struct{}, error) { - var prefix string - for _, uri := range originalFileUris { - _, key := s3UriToParts(uri) - this := path.Dir(key) - if prefix == "" { - prefix = this - } else if prefix != this { - return nil, fmt.Errorf("files have different prefixes: %q and %q", prefix, this) - } - } - - out := make(map[string]struct{}) - - paginator := s3.NewListObjectsV2Paginator(t.s3Client, &s3.ListObjectsV2Input{ - Bucket: aws.String(t.cfg.Compute.Bucket), - Prefix: aws.String(prefix), - }) - - for paginator.HasMorePages() { - page, err := paginator.NextPage(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get next page: %w", err) - } - - for _, obj := range page.Contents { - out[fmt.Sprintf("s3://%s/%s", t.cfg.Compute.Bucket, *obj.Key)] = struct{}{} - } - } - - return out, nil - -} diff --git a/materialize-iceberg/type_mapping.go b/materialize-iceberg/type_mapping.go index c753ff7668..0bedae8a41 100644 --- a/materialize-iceberg/type_mapping.go +++ b/materialize-iceberg/type_mapping.go @@ -20,16 +20,31 @@ func (fc fieldConfig) Validate() error { return nil } func (fc fieldConfig) CastToString() bool { return fc.CastToString_ } type mapped struct { - type_ iceberg.Type - required bool + type_ iceberg.Type } -func (m mapped) IsBinary() bool { - return m.type_ == iceberg.BinaryType{} +func (m mapped) String() string { + return m.type_.String() } +func (m mapped) Compatible(existing boilerplate.ExistingField) bool { + return strings.EqualFold(existing.Type, m.type_.String()) +} + +func (m mapped) CanMigrate(existing boilerplate.ExistingField) bool { + return allowedMigrations.CanMigrate(existing.Type, m.type_) +} + +var allowedMigrations = boilerplate.TypeMigrations[iceberg.Type]{ + "long": {iceberg.DecimalTypeOf(38, 0), iceberg.Float64Type{}}, + "decimal(38, 0)": {iceberg.Float64Type{}}, + boilerplate.AnyExistingType: {iceberg.StringType{}}, +} + +var migrateFieldSuffix = "_flow_tmp" + func mapProjection(p boilerplate.Projection) (mapped, boilerplate.ElementConverter) { - m := mapped{required: p.MustExist} + var m mapped var converter boilerplate.ElementConverter switch ft := p.FlatType.(type) { @@ -125,18 +140,81 @@ func computeSchemaForNewTable(res boilerplate.MappedBinding[config, resource, ma return iceberg.NewSchemaWithIdentifiers(1, identifierFields, fields...) } -func computeSchemaForUpdatedTable(current *iceberg.Schema, update boilerplate.MaterializerBindingUpdate[mapped]) *iceberg.Schema { +func computeSchemaForUpdatedTable( + currentHighestID int, + current *iceberg.Schema, + update boilerplate.MaterializerBindingUpdate[mapped], +) *iceberg.Schema { var nextFields []iceberg.NestedField for _, f := range current.Fields() { + if slices.ContainsFunc(update.FieldsToMigrate, func(upd boilerplate.MigrateField[mapped]) bool { + return f.Name == upd.From.Name+migrateFieldSuffix + }) { + // Prune columns from a prior failed migrations of this spec update. + // This prevents rare cases where a prior migration column type is + // incompatible with a source field that has undergone further + // schema evolution before the migration could be applied. + continue + } + if slices.ContainsFunc(update.NewlyNullableFields, func(field boilerplate.ExistingField) bool { return field.Name == f.Name }) { f.Required = false } + nextFields = append(nextFields, f) } - appendProjectionsAsFields(&nextFields, update.NewProjections, getHighestFieldID(current)) + var tempMigrateProjections []boilerplate.MappedProjection[mapped] + for _, f := range update.FieldsToMigrate { + temp := f.To + temp.Field += migrateFieldSuffix + tempMigrateProjections = append(tempMigrateProjections, temp) + } + + _, lastId := appendProjectionsAsFields(&nextFields, update.NewProjections, currentHighestID) + appendProjectionsAsFields(&nextFields, tempMigrateProjections, lastId) + + return iceberg.NewSchemaWithIdentifiers(current.ID+1, current.IdentifierFieldIDs, nextFields...) +} + +func computeSchemaForCompletedMigrations(current *iceberg.Schema, fieldsToMigrate []boilerplate.MigrateField[mapped]) *iceberg.Schema { + fieldWasMigrated := func(f iceberg.NestedField) bool { + return slices.ContainsFunc(fieldsToMigrate, func(field boilerplate.MigrateField[mapped]) bool { + return f.Name == field.From.Name + }) + } + + fieldMigratedTo := func(f iceberg.NestedField) bool { + return slices.ContainsFunc(fieldsToMigrate, func(field boilerplate.MigrateField[mapped]) bool { + return f.Name == field.From.Name+migrateFieldSuffix + }) + } + + // Sanity checks that we don't remove a column without also renaming one to + // its original name. + var fieldsRemoved []string + var fieldsRenamedTo []string + + var nextFields []iceberg.NestedField + for _, f := range current.Fields() { + if fieldWasMigrated(f) { + fieldsRemoved = append(fieldsRemoved, f.Name) + continue + } else if fieldMigratedTo(f) { + f.Name = strings.TrimSuffix(f.Name, migrateFieldSuffix) + fieldsRenamedTo = append(fieldsRenamedTo, f.Name) + } + + nextFields = append(nextFields, f) + } + + slices.Sort(fieldsRemoved) + slices.Sort(fieldsRenamedTo) + if !slices.Equal(fieldsRemoved, fieldsRenamedTo) { + panic(fmt.Sprintf("application error: fields removed and renamed to are not equal: %s vs %s", fieldsRemoved, fieldsRenamedTo)) + } return iceberg.NewSchemaWithIdentifiers(current.ID+1, current.IdentifierFieldIDs, nextFields...) } @@ -156,7 +234,7 @@ func appendProjectionsAsFields(dst *[]iceberg.NestedField, ps []boilerplate.Mapp ID: id, Name: p.Field, Type: p.Mapped.type_, - Required: p.Mapped.required, + Required: p.MustExist, Doc: strings.ReplaceAll(p.Comment, "\n", " - "), // Glue catalogs don't support newlines in field comments }) ids = append(ids, id) @@ -164,25 +242,3 @@ func appendProjectionsAsFields(dst *[]iceberg.NestedField, ps []boilerplate.Mapp return ids, id } - -// TODO(whb): The version of go-iceberg we are using has a broken -// (*Schema).HighestFieldID method which does not work with most logical types. -// It looks like that is probably fixed in a more recent version, but we need to -// be on Go 1.23 to use that. -func getHighestFieldID(sch *iceberg.Schema) int { - var out int - - for _, f := range sch.Fields() { - out = max(out, f.ID) - if l, ok := f.Type.(*iceberg.ListType); ok { - out = max(out, l.ElementID) - } - // Ignoring the possibility of Map and Struct types, which also have - // elements with their own field IDs since we don't create columns with - // those types. We _also_ don't create columns with list types, but it - // is more conceivable that we could add that in the short term. This - // should all be removed when we upgrade Go & iceberg-go anyway. - } - - return out -} diff --git a/materialize-iceberg/type_mapping_test.go b/materialize-iceberg/type_mapping_test.go index 68935deaa1..573ec7a82e 100644 --- a/materialize-iceberg/type_mapping_test.go +++ b/materialize-iceberg/type_mapping_test.go @@ -1,26 +1,106 @@ package connector import ( + "fmt" + "strings" "testing" "github.com/apache/iceberg-go" + "github.com/bradleyjkemp/cupaloy" + + boilerplate "github.com/estuary/connectors/materialize-boilerplate" + pf "github.com/estuary/flow/go/protocols/flow" "github.com/stretchr/testify/require" ) -func TestGetHighestFieldID(t *testing.T) { +func TestAllowedMigrations(t *testing.T) { + for _, test := range []struct { + from string + to iceberg.Type + }{ + {"long", iceberg.DecimalTypeOf(38, 0)}, + {"long", iceberg.Float64Type{}}, + {"long", iceberg.StringType{}}, + {"decimal(38, 0)", iceberg.Float64Type{}}, + {"decimal(38, 0)", iceberg.StringType{}}, + {"double", iceberg.StringType{}}, + {"boolean", iceberg.StringType{}}, + {"binary", iceberg.StringType{}}, + {"date", iceberg.StringType{}}, + {"timestamptz", iceberg.StringType{}}, + } { + t.Run(fmt.Sprintf("%s->%s", test.from, test.to.String()), func(t *testing.T) { + require.True(t, allowedMigrations.CanMigrate(test.from, test.to)) + }) + } +} + +func TestComputeSchemas(t *testing.T) { + mappedProjection := func(field string, mustExist bool, type_ iceberg.Type) boilerplate.MappedProjection[mapped] { + return boilerplate.MappedProjection[mapped]{ + Projection: boilerplate.Projection{ + Projection: pf.Projection{ + Field: field, + }, + MustExist: mustExist, + }, + Mapped: mapped{type_: type_}, + } + } + originalFields := []iceberg.NestedField{ {ID: 1, Name: "firstKey", Required: true, Type: iceberg.Int64Type{}}, {ID: 2, Name: "secondKey", Required: true, Type: iceberg.StringType{}}, - {ID: 3, Name: "reqVal", Required: true, Type: iceberg.StringType{}}, - {ID: 4, Name: "optVal", Required: false, Type: iceberg.BooleanType{}}, - {ID: 5, Name: "thirdVal", Required: true, Type: iceberg.DecimalTypeOf(38, 0)}, - {ID: 6, Name: "fourthVal", Required: true, Type: &iceberg.ListType{ - ElementID: 7, - Element: iceberg.StringType{}, - }}, + {ID: 3, Name: "val1", Required: true, Type: iceberg.StringType{}}, + {ID: 4, Name: "val2", Required: false, Type: iceberg.BooleanType{}}, + {ID: 5, Name: "dateToStr", Required: true, Type: iceberg.DateType{}}, + {ID: 6, Name: "intToDecimal", Required: false, Type: iceberg.Int64Type{}}, + {ID: 7, Name: "decimalToFloat", Required: true, Type: iceberg.DecimalTypeOf(38, 0)}, + {ID: 8, Name: "timestampToStr", Required: true, Type: iceberg.TimestampTzType{}}, + // A temporary migration column that still exists from a prior failed + // migration, with an incompatible type for the upcoming migration. + {ID: 10, Name: "dateToStr" + migrateFieldSuffix, Required: true, Type: &iceberg.Float64Type{}}, + } + + update := boilerplate.MaterializerBindingUpdate[mapped]{ + NewProjections: []boilerplate.MappedProjection[mapped]{mappedProjection("new", false, iceberg.StringType{})}, + NewlyNullableFields: []boilerplate.ExistingField{{Name: "dateToStr"}, {Name: "dateToStr" + migrateFieldSuffix}}, + FieldsToMigrate: []boilerplate.MigrateField[mapped]{ + { + From: boilerplate.ExistingField{Name: "dateToStr"}, + To: mappedProjection("dateToStr", false, iceberg.StringType{}), + }, + { + From: boilerplate.ExistingField{Name: "intToDecimal"}, + To: mappedProjection("intToDecimal", false, iceberg.DecimalTypeOf(38, 0)), + }, + { + From: boilerplate.ExistingField{Name: "decimalToFloat"}, + To: mappedProjection("decimalToFloat", true, iceberg.Float64Type{}), + }, + { + From: boilerplate.ExistingField{Name: "timestampToStr"}, + To: mappedProjection("timestampToStr", false, iceberg.StringType{}), + }, + }, } originalSchema := iceberg.NewSchemaWithIdentifiers(1, []int{1, 2}, originalFields...) - require.NotEqual(t, originalSchema.HighestFieldID(), getHighestFieldID(originalSchema)) - require.Equal(t, 7, getHighestFieldID(originalSchema)) + nextSchema := computeSchemaForUpdatedTable(12, originalSchema, update) + afterMigrateSchema := computeSchemaForCompletedMigrations(nextSchema, update.FieldsToMigrate) + + var snap strings.Builder + + snap.WriteString("--- Original Schema ---\n") + snap.WriteString(originalSchema.String()) + snap.WriteString("\n\n") + + snap.WriteString("--- Next Schema ---\n") + snap.WriteString(nextSchema.String()) + snap.WriteString("\n\n") + + snap.WriteString("--- After Migrate Schema ---\n") + snap.WriteString(afterMigrateSchema.String()) + + cupaloy.SnapshotT(t, snap.String()) } diff --git a/tests/materialize/materialize-iceberg/checks.sh b/tests/materialize/materialize-iceberg/checks.sh index fcb0a9f630..064f5eaa0a 100755 --- a/tests/materialize/materialize-iceberg/checks.sh +++ b/tests/materialize/materialize-iceberg/checks.sh @@ -5,3 +5,4 @@ if [ "$(uname -s)" = "Darwin" ]; then SED_CMD="gsed" fi $SED_CMD -i'' -E 's|(/)[0-9a-fA-F-]{36}/[0-9a-fA-F-]{36}(\.)|\1/\2|g' ${SNAPSHOT} +$SED_CMD -i'' "s|_${TABLE_SUFFIX}||g" ${SNAPSHOT} diff --git a/tests/materialize/materialize-iceberg/cleanup.sh b/tests/materialize/materialize-iceberg/cleanup.sh index 6ab40d6ebf..1a60b8e245 100644 --- a/tests/materialize/materialize-iceberg/cleanup.sh +++ b/tests/materialize/materialize-iceberg/cleanup.sh @@ -1,7 +1,7 @@ #!/bin/bash function purge() { - ${ICEBERG_HELPER_CMD} --force purge ${NAMESPACE}."$1" + ${ICEBERG_HELPER_CMD} --force purge ${NAMESPACE}."$1"_${TABLE_SUFFIX} } purge "simple" diff --git a/tests/materialize/materialize-iceberg/fetch.sh b/tests/materialize/materialize-iceberg/fetch.sh index 31357a3750..bf2144aed8 100755 --- a/tests/materialize/materialize-iceberg/fetch.sh +++ b/tests/materialize/materialize-iceberg/fetch.sh @@ -5,7 +5,7 @@ set -o pipefail set -o nounset function exportToJsonl() { - ${ICEBERG_HELPER_CMD} read ${NAMESPACE}."$1" | jq -s "sort_by([.id, .flow_published_at]) | { _table: \"$1\", rows: . }" + ${ICEBERG_HELPER_CMD} read ${NAMESPACE}."$1"_${TABLE_SUFFIX} | jq -s "sort_by([.id, .flow_published_at]) | { _table: \"$1\", rows: . }" } exportToJsonl "simple" diff --git a/tests/materialize/materialize-iceberg/setup.sh b/tests/materialize/materialize-iceberg/setup.sh index 158ad9585f..3e4b353a65 100755 --- a/tests/materialize/materialize-iceberg/setup.sh +++ b/tests/materialize/materialize-iceberg/setup.sh @@ -4,22 +4,24 @@ set -o errexit set -o pipefail set -o nounset +export TABLE_SUFFIX=$(head -c 12 /dev/urandom | base64 | tr -dc 'A-Za-z0-9' | head -c 8 | tr '[:upper:]' '[:lower:]') + resources_json_template='[ { "resource": { - "table": "simple" + "table": "simple_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_SIMPLE}" }, { "resource": { - "table": "duplicate_keys_standard" + "table": "duplicate_keys_standard_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_DUPLICATED_KEYS}" }, { "resource": { - "table": "multiple_types" + "table": "multiple_types_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_MULTIPLE_DATATYPES}", "fields": { @@ -34,7 +36,7 @@ resources_json_template='[ }, { "resource": { - "table": "formatted_strings" + "table": "formatted_strings_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_FORMATTED_STRINGS}", "fields": { @@ -43,19 +45,19 @@ resources_json_template='[ }, { "resource": { - "table": "deletions" + "table": "deletions_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_DELETIONS}" }, { "resource": { - "table": "binary_key" + "table": "binary_key_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_BINARY_KEY}" }, { "resource": { - "table": "string_escaped_key" + "table": "string_escaped_key_${TABLE_SUFFIX}" }, "source": "${TEST_COLLECTION_STRING_ESCAPED_KEY}" } diff --git a/tests/materialize/materialize-iceberg/snapshot.json b/tests/materialize/materialize-iceberg/snapshot.json index bd2a886aec..deba90dff2 100644 --- a/tests/materialize/materialize-iceberg/snapshot.json +++ b/tests/materialize/materialize-iceberg/snapshot.json @@ -1,6 +1,6 @@ [ "applied.actionDescription", - "created table \"ci_testing\".\"simple\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: canary: required string (auto-generated projection of JSON at: /canary with inferred types: [string])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"duplicate_keys_standard\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t3: int: optional long (auto-generated projection of JSON at: /int with inferred types: [integer])\n\t4: str: required string (auto-generated projection of JSON at: /str with inferred types: [string])\n\t5: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"multiple_types\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: array_int: optional string (auto-generated projection of JSON at: /array_int with inferred types: [array])\n\t3: binary_field: optional binary (auto-generated projection of JSON at: /binary_field with inferred types: [string])\n\t4: bool_field: optional boolean (auto-generated projection of JSON at: /bool_field with inferred types: [boolean])\n\t5: float_field: optional double (auto-generated projection of JSON at: /float_field with inferred types: [number])\n\t6: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t7: multiple: optional string (auto-generated projection of JSON at: /multiple with inferred types: [array boolean null number object string])\n\t8: nested: optional string (auto-generated projection of JSON at: /nested with inferred types: [object])\n\t9: nullable_int: optional long (auto-generated projection of JSON at: /nullable_int with inferred types: [integer null])\n\t10: str_field: required string (auto-generated projection of JSON at: /str_field with inferred types: [string])\n\t11: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"formatted_strings\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: date: optional date (auto-generated projection of JSON at: /date with inferred types: [string])\n\t3: datetime: optional timestamptz (auto-generated projection of JSON at: /datetime with inferred types: [string])\n\t4: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t5: int_and_str: optional decimal(38, 0) (auto-generated projection of JSON at: /int_and_str with inferred types: [integer string])\n\t6: int_str: optional decimal(38, 0) (auto-generated projection of JSON at: /int_str with inferred types: [string])\n\t7: num_and_str: optional double (auto-generated projection of JSON at: /num_and_str with inferred types: [number string])\n\t8: num_str: optional double (auto-generated projection of JSON at: /num_str with inferred types: [string])\n\t9: time: optional string (auto-generated projection of JSON at: /time with inferred types: [string])\n\t10: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"deletions\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: _meta/op: optional string (auto-generated projection of JSON at: /_meta/op with inferred types: [string])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"binary_key\" as table {\n\t1: id: required binary (auto-generated projection of JSON at: /id with inferred types: [string])\n\t2: counter: optional long (auto-generated projection of JSON at: /counter with inferred types: [integer])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}\ncreated table \"ci_testing\".\"string_escaped_key\" as table {\n\t1: id: required string (auto-generated projection of JSON at: /id with inferred types: [string])\n\t2: counter: optional long (auto-generated projection of JSON at: /counter with inferred types: [integer])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of JSON at: with inferred types: [object])\n}" + "created table \"ci_testing\".\"simple\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: canary: required string (auto-generated projection of JSON at: /canary with inferred types: [string])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"duplicate_keys_standard\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t3: int: optional long (auto-generated projection of JSON at: /int with inferred types: [integer])\n\t4: str: required string (auto-generated projection of JSON at: /str with inferred types: [string])\n\t5: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"multiple_types\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: array_int: optional string (auto-generated projection of JSON at: /array_int with inferred types: [array])\n\t3: binary_field: optional binary (auto-generated projection of JSON at: /binary_field with inferred types: [string])\n\t4: bool_field: optional boolean (auto-generated projection of JSON at: /bool_field with inferred types: [boolean])\n\t5: float_field: optional double (auto-generated projection of JSON at: /float_field with inferred types: [number])\n\t6: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t7: multiple: optional string (auto-generated projection of JSON at: /multiple with inferred types: [array boolean null number object string])\n\t8: nested: optional string (auto-generated projection of JSON at: /nested with inferred types: [object])\n\t9: nullable_int: optional long (auto-generated projection of JSON at: /nullable_int with inferred types: [integer null])\n\t10: str_field: required string (auto-generated projection of JSON at: /str_field with inferred types: [string])\n\t11: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"formatted_strings\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: date: optional date (auto-generated projection of JSON at: /date with inferred types: [string])\n\t3: datetime: optional timestamptz (auto-generated projection of JSON at: /datetime with inferred types: [string])\n\t4: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t5: int_and_str: optional decimal(38, 0) (auto-generated projection of JSON at: /int_and_str with inferred types: [integer string])\n\t6: int_str: optional decimal(38, 0) (auto-generated projection of JSON at: /int_str with inferred types: [string])\n\t7: num_and_str: optional double (auto-generated projection of JSON at: /num_and_str with inferred types: [number string])\n\t8: num_str: optional double (auto-generated projection of JSON at: /num_str with inferred types: [string])\n\t9: time: optional string (auto-generated projection of JSON at: /time with inferred types: [string])\n\t10: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"deletions\" as table {\n\t1: id: required long (auto-generated projection of JSON at: /id with inferred types: [integer])\n\t2: _meta/op: optional string (auto-generated projection of JSON at: /_meta/op with inferred types: [string])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"binary_key\" as table {\n\t1: id: required binary (auto-generated projection of JSON at: /id with inferred types: [string])\n\t2: counter: optional long (auto-generated projection of JSON at: /counter with inferred types: [integer])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}\ncreated table \"ci_testing\".\"string_escaped_key\" as table {\n\t1: id: required string (auto-generated projection of JSON at: /id with inferred types: [string])\n\t2: counter: optional long (auto-generated projection of JSON at: /counter with inferred types: [integer])\n\t3: flow_published_at: required timestamptz (Flow Publication Time - Flow publication date-time of this document - auto-generated projection of JSON at: /_meta/uuid with inferred types: [string])\n\t4: flow_document: required string (auto-generated projection of the root document with inferred types: [object])\n}" ] [ "connectorState", @@ -15,7 +15,7 @@ "updated": { "ci_testing%2Fbinary_key": { "binding": 5, - "query": "MERGE INTO `ci_testing`.`binary_key` AS l\nUSING merge_view_5 AS r\nON \n\tl.`id` = unbase64(r.`id`)\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = unbase64(r.`id`), l.`counter` = r.`counter`, l.`flow_published_at` = r.`flow_published_at`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `counter`, `flow_published_at`, `flow_document`) VALUES (unbase64(r.`id`), r.`counter`, r.`flow_published_at`, r.`flow_document`)\n", + "query": "MERGE INTO `ci_testing`.`binary_key` AS l\nUSING merge_view_5 AS r\nON \n\tl.`id` = UNBASE64(r.`id`)\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = UNBASE64(r.`id`), l.`counter` = r.`counter`, l.`flow_published_at` = r.`flow_published_at`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `counter`, `flow_published_at`, `flow_document`) VALUES (UNBASE64(r.`id`), r.`counter`, r.`flow_published_at`, r.`flow_document`)\n", "columns": [ { "name": "id", @@ -143,7 +143,7 @@ }, "ci_testing%2Fmultiple_types": { "binding": 2, - "query": "MERGE INTO `ci_testing`.`multiple_types` AS l\nUSING merge_view_2 AS r\nON \n\tl.`id` = r.`id` AND l.`id` >= 1 AND l.`id` <= 10\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = r.`id`, l.`array_int` = r.`array_int`, l.`binary_field` = unbase64(r.`binary_field`), l.`bool_field` = r.`bool_field`, l.`float_field` = r.`float_field`, l.`flow_published_at` = r.`flow_published_at`, l.`multiple` = r.`multiple`, l.`nested` = r.`nested`, l.`nullable_int` = r.`nullable_int`, l.`str_field` = r.`str_field`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `array_int`, `binary_field`, `bool_field`, `float_field`, `flow_published_at`, `multiple`, `nested`, `nullable_int`, `str_field`, `flow_document`) VALUES (r.`id`, r.`array_int`, unbase64(r.`binary_field`), r.`bool_field`, r.`float_field`, r.`flow_published_at`, r.`multiple`, r.`nested`, r.`nullable_int`, r.`str_field`, r.`flow_document`)\n", + "query": "MERGE INTO `ci_testing`.`multiple_types` AS l\nUSING merge_view_2 AS r\nON \n\tl.`id` = r.`id` AND l.`id` >= 1 AND l.`id` <= 10\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = r.`id`, l.`array_int` = r.`array_int`, l.`binary_field` = UNBASE64(r.`binary_field`), l.`bool_field` = r.`bool_field`, l.`float_field` = r.`float_field`, l.`flow_published_at` = r.`flow_published_at`, l.`multiple` = r.`multiple`, l.`nested` = r.`nested`, l.`nullable_int` = r.`nullable_int`, l.`str_field` = r.`str_field`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `array_int`, `binary_field`, `bool_field`, `float_field`, `flow_published_at`, `multiple`, `nested`, `nullable_int`, `str_field`, `flow_document`) VALUES (r.`id`, r.`array_int`, UNBASE64(r.`binary_field`), r.`bool_field`, r.`float_field`, r.`flow_published_at`, r.`multiple`, r.`nested`, r.`nullable_int`, r.`str_field`, r.`flow_document`)\n", "columns": [ { "name": "id", @@ -254,7 +254,7 @@ "updated": { "ci_testing%2Fbinary_key": { "binding": 5, - "query": "MERGE INTO `ci_testing`.`binary_key` AS l\nUSING merge_view_5 AS r\nON \n\tl.`id` = unbase64(r.`id`)\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = unbase64(r.`id`), l.`counter` = r.`counter`, l.`flow_published_at` = r.`flow_published_at`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `counter`, `flow_published_at`, `flow_document`) VALUES (unbase64(r.`id`), r.`counter`, r.`flow_published_at`, r.`flow_document`)\n", + "query": "MERGE INTO `ci_testing`.`binary_key` AS l\nUSING merge_view_5 AS r\nON \n\tl.`id` = UNBASE64(r.`id`)\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = UNBASE64(r.`id`), l.`counter` = r.`counter`, l.`flow_published_at` = r.`flow_published_at`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `counter`, `flow_published_at`, `flow_document`) VALUES (UNBASE64(r.`id`), r.`counter`, r.`flow_published_at`, r.`flow_document`)\n", "columns": [ { "name": "id", @@ -382,7 +382,7 @@ }, "ci_testing%2Fmultiple_types": { "binding": 2, - "query": "MERGE INTO `ci_testing`.`multiple_types` AS l\nUSING merge_view_2 AS r\nON \n\tl.`id` = r.`id` AND l.`id` >= 6 AND l.`id` <= 10\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = r.`id`, l.`array_int` = r.`array_int`, l.`binary_field` = unbase64(r.`binary_field`), l.`bool_field` = r.`bool_field`, l.`float_field` = r.`float_field`, l.`flow_published_at` = r.`flow_published_at`, l.`multiple` = r.`multiple`, l.`nested` = r.`nested`, l.`nullable_int` = r.`nullable_int`, l.`str_field` = r.`str_field`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `array_int`, `binary_field`, `bool_field`, `float_field`, `flow_published_at`, `multiple`, `nested`, `nullable_int`, `str_field`, `flow_document`) VALUES (r.`id`, r.`array_int`, unbase64(r.`binary_field`), r.`bool_field`, r.`float_field`, r.`flow_published_at`, r.`multiple`, r.`nested`, r.`nullable_int`, r.`str_field`, r.`flow_document`)\n", + "query": "MERGE INTO `ci_testing`.`multiple_types` AS l\nUSING merge_view_2 AS r\nON \n\tl.`id` = r.`id` AND l.`id` >= 6 AND l.`id` <= 10\nWHEN MATCHED AND r.`flow_document` = '\"delete\"' THEN DELETE\nWHEN MATCHED THEN UPDATE SET l.`id` = r.`id`, l.`array_int` = r.`array_int`, l.`binary_field` = UNBASE64(r.`binary_field`), l.`bool_field` = r.`bool_field`, l.`float_field` = r.`float_field`, l.`flow_published_at` = r.`flow_published_at`, l.`multiple` = r.`multiple`, l.`nested` = r.`nested`, l.`nullable_int` = r.`nullable_int`, l.`str_field` = r.`str_field`, l.`flow_document` = r.`flow_document`\nWHEN NOT MATCHED AND r.`flow_document` != '\"delete\"' THEN INSERT (`id`, `array_int`, `binary_field`, `bool_field`, `float_field`, `flow_published_at`, `multiple`, `nested`, `nullable_int`, `str_field`, `flow_document`) VALUES (r.`id`, r.`array_int`, UNBASE64(r.`binary_field`), r.`bool_field`, r.`float_field`, r.`flow_published_at`, r.`multiple`, r.`nested`, r.`nullable_int`, r.`str_field`, r.`flow_document`)\n", "columns": [ { "name": "id",