Skip to content

Commit fda46b6

Browse files
committed
Refactor replication connections.
1 parent c656695 commit fda46b6

File tree

3 files changed

+23
-28
lines changed

3 files changed

+23
-28
lines changed

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ WHERE oid = $1::regclass`,
374374
* If (partial) replication was done before on this slot, this clears the state
375375
* and starts again from scratch.
376376
*/
377-
async startInitialReplication(replicationConnection: pgwire.PgConnection, status: InitResult) {
377+
async startInitialReplication(status: InitResult) {
378378
// If anything here errors, the entire replication process is aborted,
379379
// and all connections are closed, including this one.
380380
const db = await this.connections.snapshotConnection();
@@ -396,7 +396,12 @@ WHERE oid = $1::regclass`,
396396

397397
// We use the replication connection here, not a pool.
398398
// The replication slot must be created before we start snapshotting tables.
399-
await replicationConnection.query(`CREATE_REPLICATION_SLOT ${slotName} LOGICAL pgoutput`);
399+
const initReplicationConnection = await this.connections.replicationConnection();
400+
try {
401+
await initReplicationConnection.query(`CREATE_REPLICATION_SLOT ${slotName} LOGICAL pgoutput`);
402+
} finally {
403+
await initReplicationConnection.end();
404+
}
400405

401406
this.logger.info(`Created replication slot ${slotName}`);
402407
}
@@ -815,37 +820,34 @@ WHERE oid = $1::regclass`,
815820

816821
async replicate() {
817822
try {
818-
// If anything errors here, the entire replication process is halted, and
819-
// all connections automatically closed, including this one.
820-
const initReplicationConnection = await this.connections.replicationConnection();
821-
await this.initReplication(initReplicationConnection);
822-
await initReplicationConnection.end();
823+
await this.initReplication();
823824

824-
// At this point, the above connection has often timed out, so we start a new one
825-
const streamReplicationConnection = await this.connections.replicationConnection();
826-
await this.streamChanges(streamReplicationConnection);
827-
await streamReplicationConnection.end();
825+
// At this point, the above connection has often timed out, so we start a new one in streamChanges().
826+
await this.streamChanges();
828827
} catch (e) {
829828
await this.storage.reportError(e);
830829
throw e;
831830
}
832831
}
833832

834-
async initReplication(replicationConnection: pgwire.PgConnection) {
833+
async initReplication() {
835834
const result = await this.initSlot();
836835
if (result.needsInitialSync) {
837-
await this.startInitialReplication(replicationConnection, result);
836+
await this.startInitialReplication(result);
838837
}
839838
}
840839

841-
async streamChanges(replicationConnection: pgwire.PgConnection) {
840+
async streamChanges() {
841+
const streamReplicationConnection = await this.connections.replicationConnection();
842842
try {
843-
await this.streamChangesInternal(replicationConnection);
843+
await this.streamChangesInternal(streamReplicationConnection);
844844
} catch (e) {
845845
if (isReplicationSlotInvalidError(e)) {
846846
throw new MissingReplicationSlotError(e.message, e);
847847
}
848848
throw e;
849+
} finally {
850+
await streamReplicationConnection.end();
849851
}
850852
}
851853

modules/module-postgres/test/src/slow_tests.test.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ function defineSlowTests(factory: storage.TestStorageFactory) {
7070

7171
async function testRepeatedReplication(testOptions: { compact: boolean; maxBatchSize: number; numBatches: number }) {
7272
const connections = new PgManager(TEST_CONNECTION_OPTIONS, {});
73-
const replicationConnection = await connections.replicationConnection();
7473
const pool = connections.pool;
7574
await clearTestDb(pool);
7675
await using f = await factory();
@@ -97,9 +96,9 @@ bucket_definitions:
9796
);
9897
await pool.query(`ALTER TABLE test_data REPLICA IDENTITY FULL`);
9998

100-
await walStream.initReplication(replicationConnection);
99+
await walStream.initReplication();
101100
let abort = false;
102-
streamPromise = walStream.streamChanges(replicationConnection).finally(() => {
101+
streamPromise = walStream.streamChanges().finally(() => {
103102
abort = true;
104103
});
105104
const start = Date.now();
@@ -331,7 +330,6 @@ bucket_definitions:
331330
i += 1;
332331

333332
const connections = new PgManager(TEST_CONNECTION_OPTIONS, {});
334-
const replicationConnection = await connections.replicationConnection();
335333

336334
abortController = new AbortController();
337335
const options: WalStreamOptions = {
@@ -347,9 +345,9 @@ bucket_definitions:
347345
// 3. Start initial replication, then streaming, but don't wait for any of this
348346
let initialReplicationDone = false;
349347
streamPromise = (async () => {
350-
await walStream.initReplication(replicationConnection);
348+
await walStream.initReplication();
351349
initialReplicationDone = true;
352-
await walStream.streamChanges(replicationConnection);
350+
await walStream.streamChanges();
353351
})()
354352
.catch((e) => {
355353
initialReplicationDone = true;

modules/module-postgres/test/src/wal_stream_utils.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ export class WalStreamTestContext implements AsyncDisposable {
1919
private abortController = new AbortController();
2020
private streamPromise?: Promise<void>;
2121
public storage?: SyncRulesBucketStorage;
22-
private replicationConnection?: pgwire.PgConnection;
2322
private snapshotPromise?: Promise<void>;
2423

2524
/**
@@ -150,18 +149,14 @@ export class WalStreamTestContext implements AsyncDisposable {
150149

151150
async replicateSnapshot() {
152151
const promise = (async () => {
153-
this.replicationConnection = await this.connectionManager.replicationConnection();
154-
await this.walStream.initReplication(this.replicationConnection);
152+
await this.walStream.initReplication();
155153
})();
156154
this.snapshotPromise = promise.catch((e) => e);
157155
await promise;
158156
}
159157

160158
startStreaming() {
161-
if (this.replicationConnection == null) {
162-
throw new Error('Call replicateSnapshot() before startStreaming()');
163-
}
164-
this.streamPromise = this.walStream.streamChanges(this.replicationConnection!);
159+
this.streamPromise = this.walStream.streamChanges();
165160
}
166161

167162
async getCheckpoint(options?: { timeout?: number }) {

0 commit comments

Comments
 (0)