Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix endpos detection when source is idle. #673

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,16 @@ RUN dpkg --add-architecture ${TARGETARCH:-arm64} && apt update \

WORKDIR /usr/src/pgcopydb

COPY . .
#
# Avoid dependency with the tests/ and docs/ subdirectories here. We don't
# need to build that docker image again when something is updated outside of
# the src/ dir and some files.
#
COPY Makefile Makefile
COPY src/ src/
COPY GIT-VERSION-GEN GIT-VERSION-GEN
COPY GIT-VERSION-FILE GIT-VERSION-FILE
COPY version version

RUN make -s clean && make -s -j$(nproc) install

Expand Down
12 changes: 2 additions & 10 deletions src/bin/pgcopydb/cli_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1215,17 +1215,9 @@ stream_start_in_mode(LogicalStreamMode mode)
case STREAM_MODE_PREFETCH:
{
/*
* Remove the possibly still existing stream context files from
* previous round of operations (--resume, etc). We want to make
* sure that the catchup process reads the files created on this
* connection.
* Just hand over processing to followDB() which implements the
* previous rounds cleaning and all the jazz really.
*/
if (!stream_cleanup_context(&specs))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
}

if (!followDB(&copySpecs, &specs))
{
/* errors have already been logged */
Expand Down
2 changes: 1 addition & 1 deletion src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ stream_apply_setup(StreamSpecs *specs, StreamApplyContext *context)
}

char *process =
specs->mode == STREAM_MODE_CATCHUP ? "Catchup-up with" : "Replaying";
specs->mode == STREAM_MODE_CATCHUP ? "Catching-up with" : "Replaying";

if (context->endpos != InvalidXLogRecPtr)
{
Expand Down
51 changes: 43 additions & 8 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,35 @@ startLogicalStreaming(StreamSpecs *specs)
StreamContext *privateContext = &(specs->private);
context.private = (void *) privateContext;

/*
* We could be resuming operations after having already reached the endpos,
* in that case just skip streaming entirely. We would still need to create
* the context files with the timeline and wal_segment_size, that's why
* this check is so late here.
*/
if (specs->endpos <= specs->startpos)
{
log_info("Skipping streaming: endpos %X/%X had already been reached",
LSN_FORMAT_ARGS(specs->endpos));

/*
* When skipping streaming entirely, and in file-based mode where the
* transform process reads from a queue, make sure the transform
* process reaches the end of the queue.
*/
if (privateContext->mode == STREAM_MODE_PREFETCH ||
privateContext->mode == STREAM_MODE_CATCHUP)
{
if (!stream_transform_send_stop(privateContext->transformQueue))
{
log_error("Failed to send STOP to the transform queue");
return false;
}
}

return true;
}

log_notice("Connecting to logical decoding replication stream");

/*
Expand Down Expand Up @@ -633,10 +662,13 @@ streamCheckResumePosition(StreamSpecs *specs)
{
specs->startpos = sentinel.startpos;

log_info("Resuming streaming at LSN %X/%X "
"from replication slot \"%s\"",
LSN_FORMAT_ARGS(specs->startpos),
specs->slot.slotName);
if (specs->startpos < specs->endpos)
{
log_info("Resuming streaming at LSN %X/%X "
"from replication slot \"%s\"",
LSN_FORMAT_ARGS(specs->startpos),
specs->slot.slotName);
}
}
}
else
Expand All @@ -649,10 +681,13 @@ streamCheckResumePosition(StreamSpecs *specs)

specs->startpos = latest->lsn;

log_info("Resuming streaming at LSN %X/%X "
"from JSON file \"%s\" ",
LSN_FORMAT_ARGS(specs->startpos),
latestStreamedContent.filename);
if (specs->startpos < specs->endpos)
{
log_info("Resuming streaming at LSN %X/%X "
"from JSON file \"%s\" ",
LSN_FORMAT_ARGS(specs->startpos),
latestStreamedContent.filename);
}

char *latestMessage = latestStreamedContent.lbuf.lines[lastLineNb];
log_notice("Resume replication from latest message: %s", latestMessage);
Expand Down
1 change: 0 additions & 1 deletion src/bin/pgcopydb/ld_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ char * StreamActionToString(StreamAction action);

/* ld_transform.c */
bool stream_transform_worker(StreamSpecs *specs);
bool stream_transform_from_queue(StreamSpecs *specs);
bool stream_transform_add_file(Queue *queue, uint64_t firstLSN);
bool stream_transform_send_stop(Queue *queue);

Expand Down
55 changes: 24 additions & 31 deletions src/bin/pgcopydb/ld_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -622,37 +622,6 @@ stream_transform_rotate(StreamContext *privateContext)
*/
bool
stream_transform_worker(StreamSpecs *specs)
{
/*
* The timeline and wal segment size are determined when connecting to the
* source database, and stored to local files at that time. When the Stream
* Transform Worker process is created, that information is read from our
* local files.
*/
if (!stream_read_context(&(specs->paths), &(specs->system), &(specs->WalSegSz)))
{
if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
{
log_debug("Stream Transform Worker startup was interrupted");
return true;
}

log_error("Failed to read the streaming context information "
"from the source database, see above for details");
return false;
}

return stream_transform_from_queue(specs);
}


/*
* stream_transform_from_queue loops over messages from a System V queue, each
* message contains the WAL.json and the WAL.sql file names. When receiving
* such a message, the WAL.json file is transformed into the WAL.sql file.
*/
bool
stream_transform_from_queue(StreamSpecs *specs)
{
DatabaseCatalog *sourceDB = specs->sourceDB;

Expand Down Expand Up @@ -775,6 +744,30 @@ stream_transform_file_at_lsn(StreamSpecs *specs, uint64_t lsn)
char walFileName[MAXPGPATH] = { 0 };
char sqlFileName[MAXPGPATH] = { 0 };

/*
* The timeline and wal segment size are determined when connecting to the
* source database, and stored to local files at that time. When the Stream
* Transform Worker process is created, that information is read from our
* local files.
*/
if (specs->system.timeline == 0 || specs->WalSegSz == 0)
{
if (!stream_read_context(&(specs->paths),
&(specs->system),
&(specs->WalSegSz)))
{
if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
{
log_debug("Stream Transform Worker startup was interrupted");
return true;
}

log_error("Failed to read the streaming context information "
"from the source database, see above for details");
return false;
}
}

if (!stream_compute_pathnames(specs->WalSegSz,
specs->system.timeline,
lsn,
Expand Down
25 changes: 17 additions & 8 deletions src/bin/pgcopydb/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -4038,14 +4038,22 @@ pgsql_stream_logical(LogicalStreamClient *client, LogicalStreamContext *context)

client->last_status = client->now;

/* the endpos target might have been updated in the past */
if (context->endpos != InvalidXLogRecPtr &&
context->endpos <= cur_record_lsn)
if (client->endpos != InvalidXLogRecPtr &&
client->endpos <= cur_record_lsn)
{
log_warn("New endpos %X/%X is in the past, current "
"record LSN is %X/%X",
LSN_FORMAT_ARGS(context->endpos),
LSN_FORMAT_ARGS(cur_record_lsn));
log_debug("pgsql_stream_logical: reached endpos at %X/%X ",
LSN_FORMAT_ARGS(cur_record_lsn));

context->cur_record_lsn = cur_record_lsn;

if (!flushAndSendFeedback(client, context))
{
goto error;
}

prepareToTerminate(client, false, InvalidXLogRecPtr);
time_to_abort = true;
break;
}
}

Expand Down Expand Up @@ -4184,7 +4192,8 @@ pgsql_stream_logical(LogicalStreamClient *client, LogicalStreamContext *context)
}
replyRequested = copybuf[pos];

if (client->endpos != InvalidXLogRecPtr && cur_record_lsn >= client->endpos)
if (client->endpos != InvalidXLogRecPtr &&
client->endpos < cur_record_lsn)
{
/*
* If there's nothing to read on the socket until a keepalive
Expand Down
30 changes: 15 additions & 15 deletions tests/cdc-endpos-between-transaction/000000010000000000000002.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{"action":"K","lsn":"0/244BA20","timestamp":"2023-12-27 10:51:22.877492+0000"}
{"action":"B","xid":"490","lsn":"0/244BA20","timestamp":"2023-12-27 10:51:22.877539+0000","message":{"action":"B","xid":490}}
{"action":"I","xid":"490","lsn":"0/244BA20","timestamp":"2023-12-27 10:51:22.877762+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1000},{"name":"name","type":"text","value":"Fantasy"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-08 00:00:01+00"}]}}
{"action":"I","xid":"490","lsn":"0/244BAB0","timestamp":"2023-12-27 10:51:22.877783+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1001},{"name":"name","type":"text","value":"History"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-09 00:00:01+00"}]}}
{"action":"I","xid":"490","lsn":"0/244BB40","timestamp":"2023-12-27 10:51:22.877795+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1002},{"name":"name","type":"text","value":"Adventure"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-10 00:00:01+00"}]}}
{"action":"I","xid":"490","lsn":"0/244BBD0","timestamp":"2023-12-27 10:51:22.877807+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1003},{"name":"name","type":"text","value":"Musical"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-11 00:00:01+00"}]}}
{"action":"I","xid":"490","lsn":"0/244BC60","timestamp":"2023-12-27 10:51:22.877818+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1004},{"name":"name","type":"text","value":"Western"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-12 00:00:01+00"}]}}
{"action":"C","xid":"490","lsn":"0/244BD20","timestamp":"2023-12-27 10:51:22.877826+0000","message":{"action":"C","xid":490}}
{"action":"B","xid":"491","lsn":"0/244BD20","timestamp":"2023-12-27 10:51:22.877870+0000","message":{"action":"B","xid":491}}
{"action":"I","xid":"491","lsn":"0/244BD20","timestamp":"2023-12-27 10:51:22.877881+0000","message":{"action":"I","xid":491,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1005},{"name":"name","type":"text","value":"Mystery"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-13 00:00:01+00"}]}}
{"action":"I","xid":"491","lsn":"0/244BDB0","timestamp":"2023-12-27 10:51:22.877887+0000","message":{"action":"I","xid":491,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1006},{"name":"name","type":"text","value":"Historical drama"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-14 00:00:01+00"}]}}
{"action":"I","xid":"491","lsn":"0/244BE48","timestamp":"2023-12-27 10:51:22.877893+0000","message":{"action":"I","xid":491,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1008},{"name":"name","type":"text","value":"Thriller"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-15 00:00:01+00"}]}}
{"action":"K","lsn":"0/244BE48","timestamp":"2023-12-27 10:51:22.877898+0000"}
{"action":"E","lsn":"0/244BE48"}
{"action":"R","lsn":"0/244BE48"}
{"action":"K","lsn":"0/24D4CD8","timestamp":"2024-02-07 13:38:21.286175+0000"}
{"action":"B","xid":"490","lsn":"0/24D4CD8","timestamp":"2024-02-07 13:38:21.286249+0000","message":{"action":"B","xid":490}}
{"action":"I","xid":"490","lsn":"0/24D4CD8","timestamp":"2024-02-07 13:38:21.286607+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1000},{"name":"name","type":"text","value":"Fantasy"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-08 00:00:01+00"}]}}
{"action":"I","xid":"490","lsn":"0/24D4D68","timestamp":"2024-02-07 13:38:21.286646+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1001},{"name":"name","type":"text","value":"History"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-09 00:00:01+00"}]}}
{"action":"I","xid":"490","lsn":"0/24D4DF8","timestamp":"2024-02-07 13:38:21.286662+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1002},{"name":"name","type":"text","value":"Adventure"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-10 00:00:01+00"}]}}
{"action":"I","xid":"490","lsn":"0/24D4E88","timestamp":"2024-02-07 13:38:21.286676+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1003},{"name":"name","type":"text","value":"Musical"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-11 00:00:01+00"}]}}
{"action":"I","xid":"490","lsn":"0/24D4F18","timestamp":"2024-02-07 13:38:21.286702+0000","message":{"action":"I","xid":490,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1004},{"name":"name","type":"text","value":"Western"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-12 00:00:01+00"}]}}
{"action":"C","xid":"490","lsn":"0/24D4FD8","timestamp":"2024-02-07 13:38:21.286715+0000","message":{"action":"C","xid":490}}
{"action":"B","xid":"491","lsn":"0/24D4FD8","timestamp":"2024-02-07 13:38:21.286909+0000","message":{"action":"B","xid":491}}
{"action":"I","xid":"491","lsn":"0/24D4FD8","timestamp":"2024-02-07 13:38:21.286943+0000","message":{"action":"I","xid":491,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1005},{"name":"name","type":"text","value":"Mystery"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-13 00:00:01+00"}]}}
{"action":"I","xid":"491","lsn":"0/24D5068","timestamp":"2024-02-07 13:38:21.286960+0000","message":{"action":"I","xid":491,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1006},{"name":"name","type":"text","value":"Historical drama"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-14 00:00:01+00"}]}}
{"action":"I","xid":"491","lsn":"0/24D5100","timestamp":"2024-02-07 13:38:21.286974+0000","message":{"action":"I","xid":491,"schema":"public","table":"category","columns":[{"name":"category_id","type":"integer","value":1008},{"name":"name","type":"text","value":"Thriller"},{"name":"last_update","type":"timestamp with time zone","value":"2022-12-15 00:00:01+00"}]}}
{"action":"K","lsn":"0/24D5100","timestamp":"2024-02-07 13:38:21.286996+0000"}
{"action":"E","lsn":"0/24D5100"}
{"action":"R","lsn":"0/24D5100"}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
-- KEEPALIVE {"lsn":"0/2448730","timestamp":"2023-12-21 17:07:18.986320+0000"}
BEGIN; -- {"xid":491,"lsn":"0/244B450","timestamp":"2023-12-21 17:07:19.3436+0000","commit_lsn":"0/244B750"}
-- KEEPALIVE {"lsn":"0/24D5700","timestamp":"2024-02-07 13:30:03.406814+0000"}
BEGIN; -- {"xid":490,"lsn":"0/24D5700","timestamp":"2024-02-07 13:30:03.407465+0000","commit_lsn":"0/24D5A00"}
PREPARE 86e87d54 AS INSERT INTO "public"."category" ("category_id", "name", "last_update") overriding system value VALUES ($1, $2, $3), ($4, $5, $6), ($7, $8, $9), ($10, $11, $12), ($13, $14, $15);
EXECUTE 86e87d54["1000","Fantasy","2022-12-08 00:00:01+00","1001","History","2022-12-09 00:00:01+00","1002","Adventure","2022-12-10 00:00:01+00","1003","Musical","2022-12-11 00:00:01+00","1004","Western","2022-12-12 00:00:01+00"];
COMMIT; -- {"xid":491,"lsn":"0/244B750","timestamp":"2023-12-21 17:07:19.3436+0000"}
BEGIN; -- {"xid":492,"lsn":"0/244B750","timestamp":"2023-12-21 17:07:19.3543+0000"}
COMMIT; -- {"xid":490,"lsn":"0/24D5A00","timestamp":"2024-02-07 13:30:03.407465+0000"}
BEGIN; -- {"xid":491,"lsn":"0/24D5A00","timestamp":"2024-02-07 13:30:03.407565+0000"}
PREPARE 918852ce AS INSERT INTO "public"."category" ("category_id", "name", "last_update") overriding system value VALUES ($1, $2, $3), ($4, $5, $6), ($7, $8, $9);
EXECUTE 918852ce["1005","Mystery","2022-12-13 00:00:01+00","1006","Historical drama","2022-12-14 00:00:01+00","1008","Thriller","2022-12-15 00:00:01+00"];
-- KEEPALIVE {"lsn":"0/244B878","timestamp":"2023-12-21 17:07:19.3615+0000"}
-- ENDPOS {"lsn":"0/244B878"}
ROLLBACK; -- {"xid":492,"lsn":"0/244B878","timestamp":"2023-12-21 17:07:19.3543+0000"}
-- KEEPALIVE {"lsn":"0/24D5B28","timestamp":"2024-02-07 13:30:03.407667+0000"}
-- ENDPOS {"lsn":"0/24D5B28"}
ROLLBACK; -- {"xid":491,"lsn":"0/24D5B28","timestamp":"2024-02-07 13:30:03.407565+0000"}
7 changes: 7 additions & 0 deletions tests/cdc-endpos-between-transaction/copydb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ diff ${expected} ${result}
# now prefetch the changes again, which should be a noop
pgcopydb stream prefetch --resume --endpos "${lsn}" --trace

# recheck output (should have been a noop)
jq "${JQSCRIPT}" /usr/src/pgcopydb/${WALFILE} > ${expected}
jq "${JQSCRIPT}" ${SHAREDIR}/${WALFILE} > ${result}

diff ${expected} ${result} || cat ${SHAREDIR}/${WALFILE}
diff ${expected} ${result}

# now transform the JSON file into SQL
SQLFILENAME=`basename ${WALFILE} .json`.sql

Expand Down
Loading
Loading