Skip to content

Commit 7aecd20

Browse files
committed
pgcdc: fix snapshot consistency
The previous approach here was busted. The *ONLY* safe way to get a snapshot and a consistent start LSN for said snapshot is by creating a replication slot. Any other technique is wrong. We also don't want to create the replication slot until we've finished the snapshot phase so if something fails, we don't skip the snapshot phase as if it completed fine. Our workaround is to do the following: 1. Create a TEMPORARY replication slot with exported snapshot 2. Stream said snapshot. 3. Copy the replication slot to preserve the LSN, but make it not temporary (this requires PG >= 12, but I wish there was a command to make a replication slot permanent). 4. Start streaming from the initial LSN. This was the only thing that I could think of that allowed for not skipping the snapshot phase on restart without external state storage. As such we drop support for two unsupported postgres versions, which I think is fine.
1 parent dc9ac0a commit 7aecd20

File tree

6 files changed

+329
-363
lines changed

6 files changed

+329
-363
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,16 @@ All notable changes to this project will be documented in this file.
1414
- Processor `schema_registry_decode` now adds metadata `schema_id` for the schema's ID in the schema registry. (@rockwotj)
1515
- Field `schema_evolution.processors` added to `snowpipe_streaming` to support side effects or enrichment during schema evolution. (@rockwotj)
1616

17+
### Fixed
18+
19+
- Fix a snapshot stream consistency issue with `postgres_cdc` where data could be missed if writes where happening during the snapshot phase. (@rockwotj)
20+
1721
### Changed
1822

1923
- Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj)
2024
- The `snowpipe_streaming` output now has better error handling for authentication failures when uploading to cloud storage. (@rockwotj)
2125
- Field `schema_evolution.new_column_type_mapping` for `snowpipe_streaming` is deprecated and can be replaced with `schema_evolution.processors`. (@rockwotj)
26+
- Dropped support for postgres 10 and 11 in `postgres_cdc`. (@rockwotj)
2227

2328
## 4.45.1 - 2025-01-17
2429

internal/impl/postgresql/integration_test.go

+111-52
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ package pgstream
1111
import (
1212
"context"
1313
"database/sql"
14+
"encoding/json"
1415
"fmt"
1516
"strings"
1617
"sync"
18+
"sync/atomic"
1719
"testing"
1820
"time"
1921

@@ -26,6 +28,7 @@ import (
2628
"github.com/stretchr/testify/assert"
2729
"github.com/stretchr/testify/require"
2830

31+
"github.com/redpanda-data/connect/v4/internal/asyncroutine"
2932
"github.com/redpanda-data/connect/v4/internal/license"
3033

3134
"github.com/ory/dockertest/v3"
@@ -142,6 +145,11 @@ func ResourceWithPostgreSQLVersion(t *testing.T, pool *dockertest.Pool, version
142145
return err
143146
}
144147

148+
_, err = db.Exec("CREATE TABLE IF NOT EXISTS seq (id serial PRIMARY KEY);")
149+
if err != nil {
150+
return err
151+
}
152+
145153
// flights_non_streamed is a control table with data that should not be streamed or queried by snapshot streaming
146154
_, err = db.Exec("CREATE TABLE IF NOT EXISTS flights_non_streamed (id serial PRIMARY KEY, name VARCHAR(50), created_at TIMESTAMP);")
147155

@@ -156,7 +164,6 @@ func ResourceWithPostgreSQLVersion(t *testing.T, pool *dockertest.Pool, version
156164
func TestIntegrationPostgresNoTxnMarkers(t *testing.T) {
157165
t.Parallel()
158166
integration.CheckSkip(t)
159-
tmpDir := t.TempDir()
160167
pool, err := dockertest.NewPool("")
161168
require.NoError(t, err)
162169

@@ -193,15 +200,8 @@ pg_stream:
193200
- '"FlightsCompositePK"'
194201
`, databaseURL)
195202

196-
cacheConf := fmt.Sprintf(`
197-
label: pg_stream_cache
198-
file:
199-
directory: %v
200-
`, tmpDir)
201-
202203
streamOutBuilder := service.NewStreamBuilder()
203204
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: TRACE`))
204-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
205205
require.NoError(t, streamOutBuilder.AddInputYAML(template))
206206

207207
var outBatches []string
@@ -251,7 +251,6 @@ file:
251251

252252
streamOutBuilder = service.NewStreamBuilder()
253253
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`))
254-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
255254
require.NoError(t, streamOutBuilder.AddInputYAML(template))
256255

257256
outBatches = []string{}
@@ -291,7 +290,6 @@ file:
291290

292291
func TestIntegrationPgStreamingFromRemoteDB(t *testing.T) {
293292
t.Skip("This test requires a remote database to run. Aimed to test remote databases")
294-
tmpDir := t.TempDir()
295293

296294
// tables: users, products, orders, order_items
297295

@@ -311,15 +309,8 @@ pg_stream:
311309
- order_items
312310
`
313311

314-
cacheConf := fmt.Sprintf(`
315-
label: pg_stream_cache
316-
file:
317-
directory: %v
318-
`, tmpDir)
319-
320312
streamOutBuilder := service.NewStreamBuilder()
321313
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
322-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
323314
require.NoError(t, streamOutBuilder.AddInputYAML(template))
324315

325316
var outMessages int64
@@ -368,7 +359,6 @@ file:
368359
func TestIntegrationPostgresIncludeTxnMarkers(t *testing.T) {
369360
t.Parallel()
370361
integration.CheckSkip(t)
371-
tmpDir := t.TempDir()
372362
pool, err := dockertest.NewPool("")
373363
require.NoError(t, err)
374364

@@ -404,15 +394,8 @@ pg_stream:
404394
- flights
405395
`, databaseURL)
406396

407-
cacheConf := fmt.Sprintf(`
408-
label: pg_stream_cache
409-
file:
410-
directory: %v
411-
`, tmpDir)
412-
413397
streamOutBuilder := service.NewStreamBuilder()
414398
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
415-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
416399
require.NoError(t, streamOutBuilder.AddInputYAML(template))
417400

418401
var outBatches []string
@@ -463,7 +446,6 @@ file:
463446

464447
streamOutBuilder = service.NewStreamBuilder()
465448
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`))
466-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
467449
require.NoError(t, streamOutBuilder.AddInputYAML(template))
468450

469451
outBatches = []string{}
@@ -503,7 +485,6 @@ file:
503485

504486
func TestIntegrationPgCDCForPgOutputStreamComplexTypesPlugin(t *testing.T) {
505487
integration.CheckSkip(t)
506-
tmpDir := t.TempDir()
507488
pool, err := dockertest.NewPool("")
508489
require.NoError(t, err)
509490

@@ -558,15 +539,8 @@ pg_stream:
558539
- complex_types_example
559540
`, databaseURL)
560541

561-
cacheConf := fmt.Sprintf(`
562-
label: pg_stream_cache
563-
file:
564-
directory: %v
565-
`, tmpDir)
566-
567542
streamOutBuilder := service.NewStreamBuilder()
568543
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: TRACE`))
569-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
570544
require.NoError(t, streamOutBuilder.AddInputYAML(template))
571545

572546
var outBatches []string
@@ -622,11 +596,10 @@ file:
622596
func TestIntegrationMultiplePostgresVersions(t *testing.T) {
623597
integration.CheckSkip(t)
624598
// running tests in the look to test different PostgreSQL versions
625-
for _, version := range []string{"17", "16", "15", "14", "13", "12", "11", "10"} {
599+
for _, version := range []string{"17", "16", "15", "14", "13", "12"} {
626600
v := version
627601
t.Run(version, func(t *testing.T) {
628602
t.Parallel()
629-
tmpDir := t.TempDir()
630603
pool, err := dockertest.NewPool("")
631604
require.NoError(t, err)
632605

@@ -665,15 +638,8 @@ pg_stream:
665638
- FLIGHTS
666639
`, databaseURL)
667640

668-
cacheConf := fmt.Sprintf(`
669-
label: pg_stream_cache
670-
file:
671-
directory: %v
672-
`, tmpDir)
673-
674641
streamOutBuilder := service.NewStreamBuilder()
675642
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
676-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
677643
require.NoError(t, streamOutBuilder.AddInputYAML(template))
678644

679645
var outBatches []string
@@ -723,7 +689,6 @@ file:
723689

724690
streamOutBuilder = service.NewStreamBuilder()
725691
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
726-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
727692
require.NoError(t, streamOutBuilder.AddInputYAML(template))
728693

729694
outBatches = []string{}
@@ -766,7 +731,6 @@ file:
766731
func TestIntegrationTOASTValues(t *testing.T) {
767732
t.Parallel()
768733
integration.CheckSkip(t)
769-
tmpDir := t.TempDir()
770734
pool, err := dockertest.NewPool("")
771735
require.NoError(t, err)
772736

@@ -806,15 +770,8 @@ pg_stream:
806770
- large_values
807771
`, databaseURL)
808772

809-
cacheConf := fmt.Sprintf(`
810-
label: pg_stream_cache
811-
file:
812-
directory: %v
813-
`, tmpDir)
814-
815773
streamOutBuilder := service.NewStreamBuilder()
816774
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: TRACE`))
817-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
818775
require.NoError(t, streamOutBuilder.AddInputYAML(template))
819776

820777
var outBatches []string
@@ -865,3 +822,105 @@ file:
865822

866823
require.NoError(t, streamOut.StopWithin(time.Second*10))
867824
}
825+
826+
func TestIntegrationSnapshotConsistency(t *testing.T) {
827+
t.Parallel()
828+
integration.CheckSkip(t)
829+
pool, err := dockertest.NewPool("")
830+
require.NoError(t, err)
831+
832+
var (
833+
resource *dockertest.Resource
834+
db *sql.DB
835+
)
836+
837+
resource, db, err = ResourceWithPostgreSQLVersion(t, pool, "16")
838+
require.NoError(t, err)
839+
require.NoError(t, resource.Expire(120))
840+
841+
hostAndPort := resource.GetHostPort("5432/tcp")
842+
hostAndPortSplited := strings.Split(hostAndPort, ":")
843+
password := "l]YLSc|4[i56%{gY"
844+
845+
require.NoError(t, err)
846+
847+
databaseURL := fmt.Sprintf("user=user_name password=%s dbname=dbname sslmode=disable host=%s port=%s", password, hostAndPortSplited[0], hostAndPortSplited[1])
848+
template := fmt.Sprintf(`
849+
read_until:
850+
# Stop when we're idle for 3 seconds, which means our writer stopped
851+
idle_timeout: 3s
852+
input:
853+
pg_stream:
854+
dsn: %s
855+
slot_name: test_slot
856+
stream_snapshot: true
857+
snapshot_batch_size: 1
858+
schema: public
859+
tables:
860+
- seq
861+
`, databaseURL)
862+
863+
streamOutBuilder := service.NewStreamBuilder()
864+
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: DEBUG`))
865+
require.NoError(t, streamOutBuilder.AddInputYAML(template))
866+
867+
var sequenceNumbers []int64
868+
var batchMu sync.Mutex
869+
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, batch service.MessageBatch) error {
870+
batchMu.Lock()
871+
defer batchMu.Unlock()
872+
for _, msg := range batch {
873+
msg, err := msg.AsStructured()
874+
if err != nil {
875+
return err
876+
}
877+
seq, err := msg.(map[string]any)["id"].(json.Number).Int64()
878+
if err != nil {
879+
return err
880+
}
881+
sequenceNumbers = append(sequenceNumbers, seq)
882+
}
883+
return nil
884+
}))
885+
886+
// Continuously write so there is a chance we skip data between snapshot and stream hand off.
887+
var count atomic.Int64
888+
writer := asyncroutine.NewPeriodic(time.Microsecond, func() {
889+
_, err := db.Exec("INSERT INTO seq DEFAULT VALUES")
890+
require.NoError(t, err)
891+
count.Add(1)
892+
})
893+
writer.Start()
894+
t.Cleanup(writer.Stop)
895+
896+
// Wait to write some values so there are some values in the snapshot
897+
time.Sleep(10 * time.Millisecond)
898+
899+
// Now start our stream
900+
streamOut, err := streamOutBuilder.Build()
901+
require.NoError(t, err)
902+
license.InjectTestService(streamOut.Resources())
903+
streamStopped := make(chan any, 1)
904+
go func() {
905+
err = streamOut.Run(context.Background())
906+
require.NoError(t, err)
907+
streamStopped <- nil
908+
}()
909+
// Let the writer write a little more
910+
time.Sleep(5 * time.Second)
911+
writer.Stop()
912+
// Okay now wait for the stream to finish (the stream auto closes after it gets nothing for 3 seconds)
913+
select {
914+
case <-streamStopped:
915+
case <-time.After(30 * time.Second):
916+
require.Fail(t, "stream did not complete in time")
917+
}
918+
require.NoError(t, streamOut.StopWithin(10*time.Second))
919+
expected := []int64{}
920+
for i := range count.Load() {
921+
expected = append(expected, i+1)
922+
}
923+
batchMu.Lock()
924+
require.Equal(t, expected, sequenceNumbers)
925+
batchMu.Unlock()
926+
}

0 commit comments

Comments
 (0)