Skip to content

Commit

Permalink
source-postgres: Add use_schema_inference feature flag
Browse files Browse the repository at this point in the history
The implementation of this flag is a little bit tricky for SQL
CDC connectors because the JSON schema generation is ultimately
handed off to the generic `sqlcapture` package, so I've opted to
add a property to the discovered tables indicating whether the
generated JSON schema ought to request schema inference.
  • Loading branch information
willdonnelly committed Feb 4, 2025
1 parent f81c5c3 commit 386562d
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 1 deletion.
3 changes: 3 additions & 0 deletions source-postgres/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (db *postgresDatabase) DiscoverTables(ctx context.Context) (map[sqlcapture.
// We want to exclude the watermarks table from the output bindings, but we still discover it
table.OmitBinding = true
}
if db.featureFlags["use_schema_inference"] {
table.UseSchemaInference = true
}
tableMap[streamID] = table
}
for _, column := range columns {
Expand Down
6 changes: 5 additions & 1 deletion source-postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ type advancedConfig struct {
FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."`
}

var featureFlagDefaults = map[string]bool{}
var featureFlagDefaults = map[string]bool{
// When set, discovered collection schemas will request that schema inference be
// used _in addition to_ the full column/types discovery we already do.
"use_schema_inference": false,
}

// Validate checks that the configuration possesses all required properties.
func (c *Config) Validate() error {
Expand Down
3 changes: 3 additions & 0 deletions sqlcapture/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func DiscoverCatalog(ctx context.Context, db Database) ([]*pc.Response_Discovere
{Ref: "#" + anchor},
},
}
if table.UseSchemaInference {
schema.Extras = map[string]any{"x-infer-schema": true}
}

var rawSchema, err = schema.MarshalJSON()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions sqlcapture/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ type DiscoveryInfo struct {
BaseTable bool // True if the table type is 'BASE TABLE' and false for views or other not-physical-table entities.
OmitBinding bool // True if the table should be omitted from discovery catalog generation.

UseSchemaInference bool // True if generated JSON schemas for this table should request schema inference.

// UnpredictableKeyOrdering will be true when the connector is unable to guarantee
// (for a particular table) that serialized RowKey values will accurately reproduce
// (when compared bytewise lexicographically) the database sort ordering of the same
Expand Down

0 comments on commit 386562d

Please sign in to comment.