Skip to content

Commit 7a812ee

Browse files
committed
Start streaming concurrently - lets see what breaks.
1 parent fda46b6 commit 7a812ee

File tree

1 file changed

+36
-25
lines changed

1 file changed

+36
-25
lines changed

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ export class WalStream {
282282
return result;
283283
}
284284

285-
async initSlot(): Promise<InitResult> {
285+
async checkSlot(): Promise<InitResult> {
286286
await checkSourceConfiguration(this.connections.pool, PUBLICATION_NAME);
287287
await this.ensureStorageCompatibility();
288288

@@ -368,17 +368,9 @@ WHERE oid = $1::regclass`,
368368
}
369369
}
370370

371-
/**
372-
* Start initial replication.
373-
*
374-
* If (partial) replication was done before on this slot, this clears the state
375-
* and starts again from scratch.
376-
*/
377-
async startInitialReplication(status: InitResult) {
371+
private async setupSlot(db: pgwire.PgConnection, status: InitResult) {
378372
// If anything here errors, the entire replication process is aborted,
379373
// and all connections are closed, including this one.
380-
const db = await this.connections.snapshotConnection();
381-
382374
const slotName = this.slot_name;
383375

384376
if (status.needsNewSlot) {
@@ -405,10 +397,14 @@ WHERE oid = $1::regclass`,
405397

406398
this.logger.info(`Created replication slot ${slotName}`);
407399
}
408-
409-
await this.initialReplication(db);
410400
}
411401

402+
/**
403+
* Start initial replication.
404+
*
405+
* If (partial) replication was done before on this slot, this clears the state
406+
* and starts again from scratch.
407+
*/
412408
async initialReplication(db: pgwire.PgConnection) {
413409
const sourceTables = this.sync_rules.getSourceTables();
414410
const flushResults = await this.storage.startBatch(
@@ -818,6 +814,9 @@ WHERE oid = $1::regclass`,
818814
return null;
819815
}
820816

817+
private snapshotPromise: Promise<void> | null = null;
818+
private streamPromise: Promise<void> | null = null;
819+
821820
async replicate() {
822821
try {
823822
await this.initReplication();
@@ -831,24 +830,36 @@ WHERE oid = $1::regclass`,
831830
}
832831

833832
async initReplication() {
834-
const result = await this.initSlot();
835-
if (result.needsInitialSync) {
836-
await this.startInitialReplication(result);
833+
const result = await this.checkSlot();
834+
const db = await this.connections.snapshotConnection();
835+
try {
836+
await this.setupSlot(db, result);
837+
// Trigger here, but we await elsewhere
838+
// TODO: fail on the first error
839+
this.streamChanges().catch((_) => {});
840+
if (result.needsInitialSync) {
841+
await this.initialReplication(db);
842+
}
843+
} finally {
844+
await db.end();
837845
}
838846
}
839847

840848
async streamChanges() {
841-
const streamReplicationConnection = await this.connections.replicationConnection();
842-
try {
843-
await this.streamChangesInternal(streamReplicationConnection);
844-
} catch (e) {
845-
if (isReplicationSlotInvalidError(e)) {
846-
throw new MissingReplicationSlotError(e.message, e);
849+
this.streamPromise ??= (async () => {
850+
const streamReplicationConnection = await this.connections.replicationConnection();
851+
try {
852+
await this.streamChangesInternal(streamReplicationConnection);
853+
} catch (e) {
854+
if (isReplicationSlotInvalidError(e)) {
855+
throw new MissingReplicationSlotError(e.message, e);
856+
}
857+
throw e;
858+
} finally {
859+
await streamReplicationConnection.end();
847860
}
848-
throw e;
849-
} finally {
850-
await streamReplicationConnection.end();
851-
}
861+
})();
862+
await this.streamPromise;
852863
}
853864

854865
private async streamChangesInternal(replicationConnection: pgwire.PgConnection) {

0 commit comments

Comments
 (0)