Skip to content

Commit

Permalink
{WIP} Use SQLite as the file format for CDC streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitri committed Jun 17, 2024
1 parent 5c50c3b commit eadf844
Show file tree
Hide file tree
Showing 18 changed files with 865 additions and 938 deletions.
75 changes: 74 additions & 1 deletion src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,17 @@ static char *sourceDBcreateDDLs[] = {
"create table sentinel("
" id integer primary key check (id = 1), "
" startpos pg_lsn, endpos pg_lsn, apply bool, "
" write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)"
" write_lsn pg_lsn, flush_lsn pg_lsn, "
" transform_lsn pg_lsn, "
" replay_lsn pg_lsn)",

"create table cdc_files("
" id integer primary key, filename text unique, timeline integer, "
" startpos pg_lsn, endpos pg_lsn, "
" start_time_epoch integer, done_time_epoch integer)",

"create table timeline_history("
" tli integer primary key, startpos pg_lsn, endpos pg_lsn)"
};


Expand Down Expand Up @@ -407,6 +417,28 @@ static char *targetDBcreateDDLs[] = {
};


static char *replayDBcreateDDLs[] = {
"create table output("
" id integer primary key, "
" action text, xid integer, lsn pg_lsn, timestamp text, "
" message text)",

"create unique index o_a_lsn on output(action, lsn)",
"create unique index o_a_xid on output(action, xid)",

"create table stmt(hash text primary key, sql text)",
"create unique index stmt_hash on stmt(hash)",

"create table replay("
" id integer primary key, "
" action text, xid integer, lsn pg_lsn, timestamp text, "
" stmt_hash text references stmt(hash), stmt_args jsonb)",

"create index r_xid on replay(xid)",
"create index r_lsn on replay(lsn)",
};


static char *sourceDBdropDDLs[] = {
"drop table if exists setup",
"drop table if exists section",
Expand Down Expand Up @@ -473,6 +505,12 @@ static char *targetDBdropDDLs[] = {
};


static char *replayDBdropDDLs[] = {
"drop table if exists output",
"drop table if exists replay"
};


/*
* catalog_init_from_specs initializes our internal catalog database file from
* a specification.
Expand Down Expand Up @@ -945,6 +983,13 @@ catalog_create_schema(DatabaseCatalog *catalog)
break;
}

case DATABASE_CATALOG_TYPE_REPLAY:
{
createDDLs = replayDBcreateDDLs;
count = sizeof(replayDBcreateDDLs) / sizeof(replayDBcreateDDLs[0]);
break;
}

default:
{
log_error("BUG: called catalog_init for unknown type %d",
Expand Down Expand Up @@ -1005,6 +1050,13 @@ catalog_drop_schema(DatabaseCatalog *catalog)
break;
}

case DATABASE_CATALOG_TYPE_REPLAY:
{
dropDDLs = replayDBdropDDLs;
count = sizeof(replayDBdropDDLs) / sizeof(replayDBdropDDLs[0]);
break;
}

default:
{
log_error("BUG: called catalog_drop_schema for unknown type %d",
Expand Down Expand Up @@ -7996,6 +8048,27 @@ catalog_bind_parameters(sqlite3 *db,

switch (p->type)
{
case BIND_PARAMETER_TYPE_NULL:
{
int rc = sqlite3_bind_null(ppStmt, n);

if (rc != SQLITE_OK)
{
log_error("[SQLite %d] Failed to bind \"%s\" to NULL: %s",
rc,
p->name,
sqlite3_errstr(rc));
return false;
}

if (logSQL)
{
appendPQExpBuffer(debugParameters, "%s", "null");
}

break;
}

case BIND_PARAMETER_TYPE_INT:
{
int rc = sqlite3_bind_int(ppStmt, n, p->intVal);
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ bool catalog_count_summary_done_fetch(SQLiteQuery *query);
typedef enum
{
BIND_PARAMETER_TYPE_UNKNOWN = 0,
BIND_PARAMETER_TYPE_NULL,
BIND_PARAMETER_TYPE_INT,
BIND_PARAMETER_TYPE_INT64,
BIND_PARAMETER_TYPE_TEXT
Expand Down
2 changes: 2 additions & 0 deletions src/bin/pgcopydb/cli_clone_follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ clone_and_follow(CopyDataSpec *copySpecs)
copyDBoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs->catalogs.source),
&(copySpecs->catalogs.replay),
copyDBoptions.stdIn,
copyDBoptions.stdOut,
logSQL))
Expand Down Expand Up @@ -386,6 +387,7 @@ cli_follow(int argc, char **argv)
copyDBoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
copyDBoptions.stdIn,
copyDBoptions.stdOut,
logSQL))
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/cli_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ cli_create_snapshot(int argc, char **argv)
createSNoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
createSNoptions.stdIn,
createSNoptions.stdOut,
logSQL))
Expand Down
30 changes: 7 additions & 23 deletions src/bin/pgcopydb/cli_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ cli_stream_setup(int argc, char **argv)
streamDBoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
streamDBoptions.stdIn,
streamDBoptions.stdOut,
logSQL))
Expand Down Expand Up @@ -709,6 +710,7 @@ cli_stream_catchup(int argc, char **argv)
streamDBoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
streamDBoptions.stdIn,
streamDBoptions.stdOut,
logSQL))
Expand Down Expand Up @@ -792,6 +794,7 @@ cli_stream_replay(int argc, char **argv)
streamDBoptions.endpos,
STREAM_MODE_REPLAY,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
true, /* stdin */
true, /* stdout */
logSQL))
Expand All @@ -800,17 +803,6 @@ cli_stream_replay(int argc, char **argv)
exit(EXIT_CODE_INTERNAL_ERROR);
}

/*
* Remove the possibly still existing stream context files from
* previous round of operations (--resume, etc). We want to make sure
* that the catchup process reads the files created on this connection.
*/
if (!stream_cleanup_context(&specs))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
}

/*
* Before starting the receive, transform, and apply sub-processes, we need
* to set the sentinel endpos to the command line --endpos option, when
Expand Down Expand Up @@ -917,6 +909,7 @@ cli_stream_transform(int argc, char **argv)
streamDBoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
streamDBoptions.stdIn,
streamDBoptions.stdOut,
logSQL))
Expand Down Expand Up @@ -1079,6 +1072,7 @@ cli_stream_apply(int argc, char **argv)
streamDBoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
true, /* streamDBoptions.stdIn */
false, /* streamDBoptions.stdOut */
logSQL))
Expand All @@ -1102,6 +1096,7 @@ cli_stream_apply(int argc, char **argv)

if (!stream_apply_init_context(&context,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
&(copySpecs.cfPaths.cdc),
&(streamDBoptions.connStrings),
streamDBoptions.origin,
Expand Down Expand Up @@ -1190,6 +1185,7 @@ stream_start_in_mode(LogicalStreamMode mode)
streamDBoptions.endpos,
mode,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
streamDBoptions.stdIn,
streamDBoptions.stdOut,
logSQL))
Expand All @@ -1214,18 +1210,6 @@ stream_start_in_mode(LogicalStreamMode mode)

case STREAM_MODE_PREFETCH:
{
/*
* Remove the possibly still existing stream context files from
* previous round of operations (--resume, etc). We want to make
* sure that the catchup process reads the files created on this
* connection.
*/
if (!stream_cleanup_context(&specs))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
}

if (!followDB(&copySpecs, &specs))
{
/* errors have already been logged */
Expand Down
4 changes: 4 additions & 0 deletions src/bin/pgcopydb/copydb.c
Original file line number Diff line number Diff line change
Expand Up @@ -563,17 +563,21 @@ copydb_init_specs(CopyDataSpec *specs,
DatabaseCatalog *source = &(specs->catalogs.source);
DatabaseCatalog *filter = &(specs->catalogs.filter);
DatabaseCatalog *target = &(specs->catalogs.target);
DatabaseCatalog *replay = &(specs->catalogs.replay);

/* init the catalog type */
source->type = DATABASE_CATALOG_TYPE_SOURCE;
filter->type = DATABASE_CATALOG_TYPE_FILTER;
target->type = DATABASE_CATALOG_TYPE_TARGET;
replay->type = DATABASE_CATALOG_TYPE_REPLAY;

/* pick the dbfile from the specs */
strlcpy(source->dbfile, specs->cfPaths.sdbfile, sizeof(source->dbfile));
strlcpy(filter->dbfile, specs->cfPaths.fdbfile, sizeof(filter->dbfile));
strlcpy(target->dbfile, specs->cfPaths.tdbfile, sizeof(target->dbfile));

/* skip replay->dbfile which is rotated */

if (specs->section == DATA_SECTION_ALL ||
specs->section == DATA_SECTION_TABLE_DATA)
{
Expand Down
10 changes: 8 additions & 2 deletions src/bin/pgcopydb/copydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,16 @@ typedef struct ExtensionReqs

/*
* pgcopydb sentinel is a table that's created on the source catalog and allows
* communicating elements from the outside, and in between the receive and
* apply processes.
* communicating elements from the outside, and in between the receive,
* transform and apply processes.
*/
typedef struct CopyDBSentinel
{
bool apply;
uint64_t startpos;
uint64_t endpos;
uint64_t write_lsn;
uint64_t transform_lsn;
uint64_t flush_lsn;
uint64_t replay_lsn;
} CopyDBSentinel;
Expand Down Expand Up @@ -491,6 +492,7 @@ bool sentinel_update_write_flush_lsn(DatabaseCatalog *catalog,
uint64_t write_lsn,
uint64_t flush_lsn);

bool sentinel_update_transform_lsn(DatabaseCatalog *catalog, uint64_t transform_lsn);
bool sentinel_update_replay_lsn(DatabaseCatalog *catalog, uint64_t replay_lsn);

bool sentinel_get(DatabaseCatalog *catalog, CopyDBSentinel *sentinel);
Expand All @@ -501,6 +503,10 @@ bool sentinel_sync_recv(DatabaseCatalog *catalog,
uint64_t flush_lsn,
CopyDBSentinel *sentinel);

bool sentinel_sync_transform(DatabaseCatalog *catalog,
uint64_t transform_lsn,
CopyDBSentinel *sentinel);

bool sentinel_sync_apply(DatabaseCatalog *catalog,
uint64_t replay_lsn,
CopyDBSentinel *sentinel);
Expand Down
37 changes: 0 additions & 37 deletions src/bin/pgcopydb/follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,6 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel, bool verbose)
bool
follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
{
/*
* Remove the possibly still existing stream context files from
* previous round of operations (--resume, etc). We want to make
* sure that the catchup process reads the files created on this
* connection.
*/
if (!stream_cleanup_context(streamSpecs))
{
/* errors have already been logged */
return false;
}

DatabaseCatalog *sourceDB = &(copySpecs->catalogs.source);

if (!catalog_open(sourceDB))
Expand Down Expand Up @@ -417,18 +405,6 @@ follow_prepare_mode_switch(StreamSpecs *streamSpecs,
{
log_info("Catching-up from existing on-disk files");

if (streamSpecs->system.timeline == 0)
{
if (!stream_read_context(&(streamSpecs->paths),
&(streamSpecs->system),
&(streamSpecs->WalSegSz)))
{
log_error("Failed to read the streaming context information "
"from the source database, see above for details");
return false;
}
}

/*
* If the previous mode was catch-up, then before proceeding, we might need
* to empty the transform queue where the STOP message was sent.
Expand Down Expand Up @@ -518,19 +494,6 @@ followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
return false;
}

/*
* Before starting sub-processes, clean-up intermediate files from previous
* round. Here that's the stream context with WAL segment size and timeline
* history, which are fetched from the source server to compute WAL file
* names. The current timeline can only change at a server restart or a
* failover, both with trigger a reconnect.
*/
if (!stream_cleanup_context(streamSpecs))
{
/* errors have already been logged */
return false;
}

/*
* Before starting sub-processes, make sure to close our SQLite catalogs.
* We open the SQLite catalogs again before returning from this function
Expand Down
Loading

0 comments on commit eadf844

Please sign in to comment.