Skip to content

Commit 8d2be28

Browse files
committed
materialize-s3-iceberg: default to delta updates if config is absent
Unless the delta updates configuration is explicitly false, we will consider delta updates to be enabled. Practically speaking the connector only works in delta updates mode, and will error out if this is set to false. This will allow the materialization to work with a `sourceCapture` set, with new bindings added an absent configuration value for `delta_updates`.
1 parent c9e98ee commit 8d2be28

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

materialize-s3-iceberg/driver.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func parse8601(in string) (time.Duration, error) {
124124
type resource struct {
125125
Table string `json:"table" jsonschema:"title=Table,description=Name of the database table." jsonschema_extras:"x-collection-name=true"`
126126
Namespace string `json:"namespace,omitempty" jsonschema:"title=Alternative Namespace,description=Alternative namespace for this table (optional)."`
127-
Delta bool `json:"delta_updates,omitempty" jsonschema:"default=true,title=Delta Update,description=Should updates to this table be done via delta updates. Currently this connector only supports delta updates."`
127+
Delta *bool `json:"delta_updates,omitempty" jsonschema:"default=true,title=Delta Update,description=Should updates to this table be done via delta updates. Currently this connector only supports delta updates."`
128128
}
129129

130130
func newResource(cfg config) resource {
@@ -158,7 +158,7 @@ func (r resource) Validate() error {
158158
return fmt.Errorf("table %q must not contain dots", r.Table)
159159
} else if strings.Contains(r.Namespace, ".") {
160160
return fmt.Errorf("namespace %q must not contain dots", r.Namespace)
161-
} else if !r.Delta {
161+
} else if r.Delta != nil && !*r.Delta {
162162
return fmt.Errorf("connector only supports delta update mode: delta update must be enabled")
163163
}
164164

@@ -271,9 +271,14 @@ func (driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Respo
271271
for idx, binding := range req.Bindings {
272272
res := resources[idx]
273273

274+
deltaUpdates := true
275+
if res.Delta != nil {
276+
deltaUpdates = *res.Delta
277+
}
278+
274279
constraints, err := validator.ValidateBinding(
275280
res.path(),
276-
res.Delta,
281+
deltaUpdates,
277282
binding.Backfill,
278283
binding.Collection,
279284
binding.FieldConfigJsonMap,
@@ -285,7 +290,7 @@ func (driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Respo
285290

286291
out = append(out, &pm.Response_Validated_Binding{
287292
Constraints: constraints,
288-
DeltaUpdates: res.Delta,
293+
DeltaUpdates: deltaUpdates,
289294
ResourcePath: res.path(),
290295
})
291296
}

materialize-s3-iceberg/driver_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ func TestValidateAndApply(t *testing.T) {
6161
resourceConfig := resource{
6262
Namespace: "test_namespace",
6363
Table: "test_table",
64-
Delta: true,
6564
}
6665

6766
catalog := catalog{

0 commit comments

Comments
 (0)