Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-postgres: Feature flag use_schema_inference #2337

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions source-postgres/.snapshots/TestGeneric-SpecResponse
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@
"type": "boolean",
"title": "Read-Only Capture",
"description": "When set the capture will operate in read-only mode and avoid operations such as watermark writes. This comes with some tradeoffs; consult the connector documentation for more information."
},
"feature_flags": {
"type": "string",
"title": "Feature Flags",
"description": "This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."
}
},
"additionalProperties": false,
Expand Down
3 changes: 3 additions & 0 deletions source-postgres/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (db *postgresDatabase) DiscoverTables(ctx context.Context) (map[sqlcapture.
// We want to exclude the watermarks table from the output bindings, but we still discover it
table.OmitBinding = true
}
if db.featureFlags["use_schema_inference"] {
table.UseSchemaInference = true
}
tableMap[streamID] = table
}
for _, column := range columns {
Expand Down
19 changes: 18 additions & 1 deletion source-postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ func connectPostgres(ctx context.Context, name string, cfg json.RawMessage) (sql
}
}

var db = &postgresDatabase{config: &config}
var featureFlags = boilerplate.ParseFeatureFlags(config.Advanced.FeatureFlags, featureFlagDefaults)
if config.Advanced.FeatureFlags != "" {
logrus.WithField("flags", featureFlags).Info("parsed feature flags")
}

var db = &postgresDatabase{
config: &config,
featureFlags: featureFlags,
}
if err := db.connect(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -107,6 +115,13 @@ type advancedConfig struct {
MinimumBackfillXID string `json:"min_backfill_xid,omitempty" jsonschema:"title=Minimum Backfill XID,description=Only backfill rows with XMIN values greater (in a 32-bit modular comparison) than the specified XID. Helpful for reducing re-backfill data volume in certain edge cases." jsonschema_extras:"pattern=^[0-9]+$"`
MaximumBackfillXID string `json:"max_backfill_xid,omitempty" jsonschema:"title=Maximum Backfill XID,description=Only backfill rows with XMIN values smaller (in a 32-bit modular comparison) than the specified XID. Helpful for reducing re-backfill data volume in certain edge cases." jsonschema_extras:"pattern=^[0-9]+$"`
ReadOnlyCapture bool `json:"read_only_capture,omitempty" jsonschema:"title=Read-Only Capture,description=When set the capture will operate in read-only mode and avoid operations such as watermark writes. This comes with some tradeoffs; consult the connector documentation for more information."`
FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."`
}

var featureFlagDefaults = map[string]bool{
// When set, discovered collection schemas will request that schema inference be
// used _in addition to_ the full column/types discovery we already do.
"use_schema_inference": false,
}

// Validate checks that the configuration possesses all required properties.
Expand Down Expand Up @@ -226,6 +241,8 @@ type postgresDatabase struct {
explained map[sqlcapture.StreamID]struct{} // Tracks tables which have had an `EXPLAIN` run on them during this connector invocation
includeTxIDs map[sqlcapture.StreamID]bool // Tracks which tables should have XID properties in their replication metadata
tablesPublished map[sqlcapture.StreamID]bool // Tracks which tables are part of the configured publication

featureFlags map[string]bool // Parsed feature flag settings with defaults applied
}

func (db *postgresDatabase) HistoryMode() bool {
Expand Down
3 changes: 3 additions & 0 deletions source-postgres/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (
dbCapturePass = flag.String("db_capture_pass", "secret1234", "The password for the capture user")

readOnlyCapture = flag.Bool("read_only_capture", false, "When true, run test captures in read-only mode")

testFeatureFlags = flag.String("feature_flags", "", "Feature flags to apply to all test captures.")
)

const testSchemaName = "test"
Expand Down Expand Up @@ -75,6 +77,7 @@ func postgresTestBackend(t testing.TB) *testBackend {
Password: *dbCapturePass,
Database: *dbName,
}
captureConfig.Advanced.FeatureFlags = *testFeatureFlags
captureConfig.Advanced.BackfillChunkSize = 16
if *readOnlyCapture {
captureConfig.Advanced.ReadOnlyCapture = true
Expand Down
3 changes: 3 additions & 0 deletions sqlcapture/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func DiscoverCatalog(ctx context.Context, db Database) ([]*pc.Response_Discovere
{Ref: "#" + anchor},
},
}
if table.UseSchemaInference {
schema.Extras = map[string]any{"x-infer-schema": true}
}

var rawSchema, err = schema.MarshalJSON()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions sqlcapture/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ type DiscoveryInfo struct {
BaseTable bool // True if the table type is 'BASE TABLE' and false for views or other not-physical-table entities.
OmitBinding bool // True if the table should be omitted from discovery catalog generation.

UseSchemaInference bool // True if generated JSON schemas for this table should request schema inference.

// UnpredictableKeyOrdering will be true when the connector is unable to guarantee
// (for a particular table) that serialized RowKey values will accurately reproduce
// (when compared bytewise lexicographically) the database sort ordering of the same
Expand Down
Loading