Skip to content

Commit 0dda4ca

Browse files
committedMar 7, 2025·
source-oracle: fix integer key encoding
1 parent d2eb328 commit 0dda4ca

8 files changed

+236
-44
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# ================================
2+
# Collection "acmeCo/test/c__flow_test_logminer/t12319541": 1 Documents
3+
# ================================
4+
{"NUM18":999999999999999999,"_meta":{"op":"c","source":{"schema":"C##FLOW_TEST_LOGMINER","snapshot":true,"table":"T12319541","row_id":"AAAAAAAAAAAAAAAAAA","rs_id":"111111111111111111","ssn":111}}}
5+
# ================================
6+
# Final State Checkpoint
7+
# ================================
8+
{"bindingStateV1":{"C%23%23FLOW_TEST_LOGMINER%2FT12319541":{"backfilled":1,"key_columns":["NUM18"],"mode":"Active"}},"cursor":"11111111"}
9+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
Binding 0:
2+
{
3+
"recommended_name": "c__flow_test_logminer/t12319541",
4+
"resource_config_json": {
5+
"namespace": "C##FLOW_TEST_LOGMINER",
6+
"stream": "T12319541"
7+
},
8+
"document_schema_json": {
9+
"$defs": {
10+
"C##FLOW_TEST_LOGMINERT12319541": {
11+
"type": "object",
12+
"required": [
13+
"NUM18"
14+
],
15+
"$anchor": "C##FLOW_TEST_LOGMINERT12319541",
16+
"properties": {
17+
"NUM18": {
18+
"type": "integer",
19+
"description": "(source type: non-nullable NUMBER)"
20+
}
21+
}
22+
}
23+
},
24+
"allOf": [
25+
{
26+
"if": {
27+
"properties": {
28+
"_meta": {
29+
"properties": {
30+
"op": {
31+
"const": "d"
32+
}
33+
}
34+
}
35+
}
36+
},
37+
"then": {
38+
"reduce": {
39+
"delete": true,
40+
"strategy": "merge"
41+
}
42+
},
43+
"else": {
44+
"reduce": {
45+
"strategy": "merge"
46+
}
47+
},
48+
"required": [
49+
"_meta"
50+
],
51+
"properties": {
52+
"_meta": {
53+
"type": "object",
54+
"required": [
55+
"op",
56+
"source"
57+
],
58+
"properties": {
59+
"before": {
60+
"$ref": "#C##FLOW_TEST_LOGMINERT12319541",
61+
"description": "Record state immediately before this change was applied.",
62+
"reduce": {
63+
"strategy": "firstWriteWins"
64+
}
65+
},
66+
"op": {
67+
"enum": [
68+
"c",
69+
"d",
70+
"u"
71+
],
72+
"description": "Change operation type: 'c' Create/Insert, 'u' Update, 'd' Delete."
73+
},
74+
"source": {
75+
"$id": "https://github.com/estuary/connectors/source-oracle/oracle-source",
76+
"properties": {
77+
"ts_ms": {
78+
"type": "integer",
79+
"description": "Unix timestamp (in millis) at which this event was recorded by the database."
80+
},
81+
"schema": {
82+
"type": "string",
83+
"description": "Database schema (namespace) of the event."
84+
},
85+
"snapshot": {
86+
"type": "boolean",
87+
"description": "Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log."
88+
},
89+
"table": {
90+
"type": "string",
91+
"description": "Database table of the event."
92+
},
93+
"scn": {
94+
"type": "integer",
95+
"description": "SCN of this event"
96+
},
97+
"row_id": {
98+
"type": "string",
99+
"description": "ROWID of the document"
100+
},
101+
"rs_id": {
102+
"type": "string",
103+
"description": "Record Set ID of the logical change"
104+
},
105+
"ssn": {
106+
"type": "integer",
107+
"description": "SQL sequence number of the logical change"
108+
}
109+
},
110+
"type": "object",
111+
"required": [
112+
"schema",
113+
"table",
114+
"row_id",
115+
"rs_id",
116+
"ssn"
117+
]
118+
}
119+
},
120+
"reduce": {
121+
"strategy": "merge"
122+
}
123+
}
124+
}
125+
},
126+
{
127+
"$ref": "#C##FLOW_TEST_LOGMINERT12319541"
128+
}
129+
]
130+
},
131+
"key": [
132+
"/NUM18"
133+
]
134+
}
135+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# ================================
2+
# Collection "acmeCo/test/c__flow_test_logminer/t12319541": 1 Documents
3+
# ================================
4+
{"NUM18":999999999999999998,"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"C##FLOW_TEST_LOGMINER","table":"T12319541","scn":11111111,"row_id":"AAAAAAAAAAAAAAAAAA","rs_id":"111111111111111111","ssn":111}}}
5+
# ================================
6+
# Final State Checkpoint
7+
# ================================
8+
{"bindingStateV1":{"C%23%23FLOW_TEST_LOGMINER%2FT12319541":{"backfilled":1,"key_columns":["NUM18"],"mode":"Active"}},"cursor":"11111111"}
9+

‎source-oracle/backfill.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (db *oracleDatabase) ScanTableChunk(ctx context.Context, info *sqlcapture.D
4040
query = db.keylessScanQuery(info, schema, table)
4141
args = []any{afterRowID}
4242

43-
case sqlcapture.TableStatePreciseBackfill:
43+
case sqlcapture.TableStatePreciseBackfill, sqlcapture.TableStateUnfilteredBackfill:
4444
if resumeAfter != nil {
4545
var resumeKey, err = sqlcapture.UnpackTuple(resumeAfter, decodeKeyFDB)
4646
if err != nil {
@@ -195,7 +195,7 @@ var columnBinaryKeyComparison = map[string]bool{
195195
// render a "cast" expression for a column so that we can cast it to the
196196
// type and format we expect for that column (e.g. for timestamps we expect a certain format with UTC timezone, etc.)
197197
func castColumn(col sqlcapture.ColumnInfo) string {
198-
var dataType = col.DataType.(oracleColumnType).original
198+
var dataType = col.DataType.(oracleColumnType).Original
199199
var isDateTime = dataType == "DATE" || strings.HasPrefix(dataType, "TIMESTAMP")
200200
var isInterval = dataType == "INTERVAL"
201201

@@ -259,7 +259,7 @@ func (db *oracleDatabase) buildScanQuery(start bool, info *sqlcapture.DiscoveryI
259259
var quotedName = quoteColumnName(colName)
260260
// If a precise backfill is requested *and* the column type requires binary ordering for precise
261261
// backfill comparisons to work, add the 'COLLATE BINARY' qualifier to the column name.
262-
if columnTypes[colName].jsonType == "string" && columnBinaryKeyComparison[colName] {
262+
if columnTypes[colName].JsonType == "string" && columnBinaryKeyComparison[colName] {
263263
pkey = append(pkey, quotedName+" COLLATE BINARY")
264264
} else {
265265
pkey = append(pkey, quotedName)

‎source-oracle/capture_test.go

+35
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,41 @@ func TestAllTypes(t *testing.T) {
9595
t.Run("replication", func(t *testing.T) { tests.VerifiedCapture(ctx, t, cs) })
9696
}
9797

98+
func TestIntegerKey(t *testing.T) {
99+
var unique = "12319541"
100+
var tb, ctx = oracleTestBackend(t, "config.pdb.yaml"), context.Background()
101+
var typesAndValues = [][]any{
102+
{"num18", "NUMBER(18, 0) PRIMARY KEY", 999999999999999999},
103+
}
104+
105+
var columnDefs = "("
106+
var vals []any
107+
for idx, tv := range typesAndValues {
108+
if idx > 0 {
109+
columnDefs += ", "
110+
}
111+
columnDefs += fmt.Sprintf("%s %s", tv[0].(string), tv[1].(string))
112+
vals = append(vals, tv[2])
113+
}
114+
columnDefs += ")"
115+
var tableName = tb.CreateTable(ctx, t, unique, columnDefs)
116+
117+
tb.Insert(ctx, t, tableName, [][]any{vals})
118+
119+
// Discover the catalog and verify that the table schemas looks correct
120+
t.Run("discover", func(t *testing.T) {
121+
tb.CaptureSpec(ctx, t).VerifyDiscover(ctx, t, regexp.MustCompile(unique))
122+
})
123+
124+
// Perform an initial backfill
125+
var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(unique))
126+
t.Run("backfill", func(t *testing.T) { tests.VerifiedCapture(ctx, t, cs) })
127+
128+
// Add more data and read it via replication
129+
tb.Insert(ctx, t, tableName, [][]any{{vals[0].(int) - 1}})
130+
t.Run("replication", func(t *testing.T) { tests.VerifiedCapture(ctx, t, cs) })
131+
}
132+
98133
func TestNullValues(t *testing.T) {
99134
var unique = "18110541"
100135
var tb, ctx = oracleTestBackend(t, "config.pdb.yaml"), context.Background()

‎source-oracle/discovery.go

+30-31
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package main
33
import (
44
"context"
55
"database/sql"
6-
"encoding/json"
76
"fmt"
87
"reflect"
98
"slices"
9+
"strconv"
1010
"strings"
1111

1212
"github.com/estuary/connectors/sqlcapture"
@@ -109,37 +109,36 @@ func translateRecordField(column *sqlcapture.ColumnInfo, val interface{}) (inter
109109
case nil:
110110
return nil, nil
111111
case string:
112-
if dataType.jsonType != "string" {
113-
var out = reflect.New(dataType.t).Interface()
114-
if err := json.Unmarshal([]byte(v), out); err != nil {
115-
return nil, err
116-
}
117-
return out, nil
112+
if dataType.JsonType == "integer" {
113+
return strconv.Atoi(v)
114+
} else if dataType.JsonType == "number" {
115+
return strconv.ParseFloat(v, 64)
118116
} else {
119117
return val, nil
120118
}
121119
default:
122120
}
123121

124122
var rv = reflect.ValueOf(val)
125-
if rv.CanConvert(dataType.t) {
126-
return rv.Convert(dataType.t).Interface(), nil
123+
if rv.CanConvert(dataType.T) {
124+
return rv.Convert(dataType.T).Interface(), nil
127125
}
126+
128127
return val, nil
129128
}
130129

131130
func (ct *oracleColumnType) toJSONSchemaType() *jsonschema.Schema {
132131
var out = &jsonschema.Schema{
133-
Format: ct.format,
132+
Format: ct.Format,
134133
Extras: make(map[string]interface{}),
135134
}
136135

137-
if ct.jsonType == "" {
136+
if ct.JsonType == "" {
138137
// No type constraint.
139-
} else if ct.nullable {
140-
out.Extras["type"] = []string{ct.jsonType, "null"} // Use variadic form.
138+
} else if ct.Nullable {
139+
out.Extras["type"] = []string{ct.JsonType, "null"} // Use variadic form.
141140
} else {
142-
out.Type = ct.jsonType
141+
out.Type = ct.JsonType
143142
}
144143
return out
145144
}
@@ -268,18 +267,18 @@ SELECT t.owner, t.table_name, t.column_id, t.column_name, t.nullable, t.data_typ
268267
ON (t.owner = c.owner AND t.table_name = c.table_name AND t.column_name = c.column_name)`
269268

270269
type oracleColumnType struct {
271-
original string
272-
length int
273-
scale int16
274-
precision int16
275-
t reflect.Type
276-
format string
277-
jsonType string
278-
nullable bool
270+
Original string
271+
Length int
272+
Scale int16
273+
Precision int16
274+
T reflect.Type
275+
Format string
276+
JsonType string
277+
Nullable bool
279278
}
280279

281280
func (ct oracleColumnType) String() string {
282-
return ct.original
281+
return ct.Original
283282
}
284283

285284
// SMALLINT, INT and INTEGER have a default precision 38 which is not included in the column information
@@ -383,14 +382,14 @@ func getColumns(ctx context.Context, conn *sql.DB, tables []*sqlcapture.Discover
383382
}
384383

385384
sc.DataType = oracleColumnType{
386-
original: dataType,
387-
scale: dataScale.Int16,
388-
precision: precision,
389-
length: dataLength,
390-
t: t,
391-
format: format,
392-
jsonType: jsonType,
393-
nullable: sc.IsNullable,
385+
Original: dataType,
386+
Scale: dataScale.Int16,
387+
Precision: precision,
388+
Length: dataLength,
389+
T: t,
390+
Format: format,
391+
JsonType: jsonType,
392+
Nullable: sc.IsNullable,
394393
}
395394

396395
if isPrimaryKey {

‎source-oracle/main.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net"
1111
"net/url"
1212
"slices"
13+
"strconv"
1314
"strings"
1415
"time"
1516

@@ -299,13 +300,6 @@ func (db *oracleDatabase) FallbackCollectionKey() []string {
299300
}
300301

301302
func encodeKeyFDB(key any, colType oracleColumnType) (tuple.TupleElement, error) {
302-
if colType.jsonType == "integer" {
303-
return key.(int64), nil
304-
} else if colType.jsonType == "number" {
305-
// Sanity check, should not happen
306-
return nil, fmt.Errorf("unsupported %q primary key with scale %d", colType.original, colType.scale)
307-
}
308-
309303
switch key := key.(type) {
310304
case [16]uint8:
311305
var id, err = uuid.FromBytes(key[:])
@@ -316,9 +310,15 @@ func encodeKeyFDB(key any, colType oracleColumnType) (tuple.TupleElement, error)
316310
case time.Time:
317311
return key.Format(sortableRFC3339Nano), nil
318312
case string:
319-
if colType.format == "integer" {
313+
if colType.JsonType == "integer" {
314+
return strconv.Atoi(key)
315+
} else if colType.JsonType == "number" {
316+
return strconv.ParseFloat(key, 64)
317+
}
318+
319+
if colType.Format == "integer" {
320320
// prepend zeros so that string represented numbers are lexicographically consistent
321-
var leadingZeros = strings.Repeat("0", int(colType.precision)-len(key))
321+
var leadingZeros = strings.Repeat("0", int(colType.Precision)-len(key))
322322
if key[0] == '-' {
323323
key = "-" + leadingZeros + key[1:]
324324
} else {

‎source-oracle/replication.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -551,15 +551,20 @@ func (s *replicationStream) receiveMessages(ctx context.Context, startSCN, endSC
551551

552552
var msg logminerMessage
553553
var ts time.Time
554+
var redoSql sql.NullString
554555
var undoSql sql.NullString
555556
var info sql.NullString
556-
if err := rows.Scan(&msg.SCN, &ts, &msg.Op, &msg.SQL, &undoSql, &msg.TableName, &msg.Owner, &msg.Status, &info, &msg.RSID, &msg.SSN, &msg.CSF, &msg.ObjectID, &msg.DataObjectID); err != nil {
557+
if err := rows.Scan(&msg.SCN, &ts, &msg.Op, &redoSql, &undoSql, &msg.TableName, &msg.Owner, &msg.Status, &info, &msg.RSID, &msg.SSN, &msg.CSF, &msg.ObjectID, &msg.DataObjectID); err != nil {
557558
return err
558559
}
559560

560561
// For some reason RSID comes with a space before and after it
561562
msg.RSID = strings.TrimSpace(msg.RSID)
562563

564+
if redoSql.Valid {
565+
msg.SQL = redoSql.String
566+
}
567+
563568
if undoSql.Valid {
564569
msg.UndoSQL = undoSql.String
565570
}

0 commit comments

Comments
 (0)
Please sign in to comment.