Skip to content

Commit

Permalink
Some cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitri committed Jul 15, 2024
1 parent fcef354 commit 9520000
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 46 deletions.
28 changes: 26 additions & 2 deletions src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -7884,7 +7884,19 @@ catalog_sql_execute(SQLiteQuery *query)
if (rc != SQLITE_DONE)
{
log_error("Failed to execute statement: %s", query->sql);
log_error("[SQLite %d] %s", rc, sqlite3_errstr(rc));

int offset = sqlite3_error_offset(query->db);

if (offset != -1)
{
/* "Failed to step through statement: %s" is 34 chars of prefix */
log_error("%34s%*s^", " ", offset, " ");
}

log_error("[SQLite %d: %s]: %s",
rc,
sqlite3_errstr(rc),
sqlite3_errmsg(query->db));

(void) sqlite3_clear_bindings(query->ppStmt);
(void) sqlite3_finalize(query->ppStmt);
Expand Down Expand Up @@ -7943,7 +7955,19 @@ catalog_sql_execute(SQLiteQuery *query)
if (catalog_sql_step(query) != SQLITE_DONE)
{
log_error("Failed to execute statement: %s", query->sql);
log_error("[SQLite %d] %s", rc, sqlite3_errstr(rc));

int offset = sqlite3_error_offset(query->db);

if (offset != -1)
{
/* "Failed to step through statement: %s" is 34 chars of prefix */
log_error("%34s%*s^", " ", offset, " ");
}

log_error("[SQLite %d: %s]: %s",
rc,
sqlite3_errstr(rc),
sqlite3_errmsg(query->db));

(void) sqlite3_clear_bindings(query->ppStmt);
(void) sqlite3_finalize(query->ppStmt);
Expand Down
46 changes: 23 additions & 23 deletions src/bin/pgcopydb/dump_restore.c
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ copydb_write_restore_list_hook(void *ctx, ArchiveContentItem *item)
if (item->desc == ARCHIVE_TAG_DATABASE)
{
skip = true;
log_notice("Skipping DATABASE \"%s\"", name);
log_debug("Skipping DATABASE \"%s\"", name);
}

/*
Expand All @@ -709,7 +709,7 @@ copydb_write_restore_list_hook(void *ctx, ArchiveContentItem *item)
item->tagType == ARCHIVE_TAG_TYPE_EXTENSION)
{
skip = true;
log_notice("Skipping COMMENT ON EXTENSION \"%s\"", name);
log_debug("Skipping COMMENT ON EXTENSION \"%s\"", name);
}

if (!skip && catOid == PG_NAMESPACE_OID)
Expand All @@ -728,23 +728,23 @@ copydb_write_restore_list_hook(void *ctx, ArchiveContentItem *item)
{
skip = true;

log_notice("Skipping already existing dumpId %d: %s %u %s",
item->dumpId,
item->description,
item->objectOid,
item->restoreListName);
log_debug("Skipping already existing dumpId %d: %s %u %s",
item->dumpId,
item->description,
item->objectOid,
item->restoreListName);
}
}

if (!skip && copydb_objectid_has_been_processed_already(specs, item))
{
skip = true;

log_notice("Skipping already processed dumpId %d: %s %u %s",
item->dumpId,
item->description,
item->objectOid,
item->restoreListName);
log_debug("Skipping already processed dumpId %d: %s %u %s",
item->dumpId,
item->description,
item->objectOid,
item->restoreListName);
}

/*
Expand Down Expand Up @@ -773,23 +773,23 @@ copydb_write_restore_list_hook(void *ctx, ArchiveContentItem *item)
{
skip = true;

log_notice("Skipping materialized view refresh dumpId %d: %s %u %s",
item->dumpId,
item->description,
item->objectOid,
item->restoreListName);
log_debug("Skipping materialized view refresh dumpId %d: %s %u %s",
item->dumpId,
item->description,
item->objectOid,
item->restoreListName);
}

if (!skip && copydb_objectid_is_filtered_out(specs, oid, name))
{
skip = true;

log_notice("Skipping filtered-out dumpId %d: %s %u %u %s",
item->dumpId,
item->description,
item->catalogOid,
item->objectOid,
item->restoreListName);
log_debug("Skipping filtered-out dumpId %d: %s %u %u %s",
item->dumpId,
item->description,
item->catalogOid,
item->objectOid,
item->restoreListName);
}

PQExpBuffer buf = createPQExpBuffer();
Expand Down
44 changes: 25 additions & 19 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,8 @@ streamCloseFile(LogicalStreamContext *context, bool time_to_abort)
* partial transaction.
*/
if (time_to_abort &&
privateContext->jsonFile != NULL &&

/* privateContext->jsonFile != NULL && */
privateContext->endpos != InvalidXLogRecPtr &&
privateContext->endpos <= context->cur_record_lsn)
{
Expand Down Expand Up @@ -1228,9 +1229,9 @@ streamFlush(LogicalStreamContext *context)
{
StreamContext *privateContext = (StreamContext *) context->private;

log_debug("streamFlush: %X/%X %X/%X",
LSN_FORMAT_ARGS(context->tracking->written_lsn),
LSN_FORMAT_ARGS(context->cur_record_lsn));
log_warn("streamFlush: %X/%X %X/%X",
LSN_FORMAT_ARGS(context->tracking->written_lsn),
LSN_FORMAT_ARGS(context->cur_record_lsn));

/* if needed, flush our current file now (fsync) */
if (context->tracking->flushed_lsn < context->tracking->written_lsn)
Expand All @@ -1250,14 +1251,19 @@ streamFlush(LogicalStreamContext *context)
/*
* streamKeepalive ensures we have a valid jsonFile by calling
* streamRotateFile, so we can safely call fsync here.
*
* TODO: remove that code.
*/
int fd = fileno(privateContext->jsonFile);

if (fsync(fd) != 0)
if (privateContext->jsonFile != NULL)
{
log_error("Failed to fsync file \"%s\": %m",
privateContext->partialFileName);
return false;
int fd = fileno(privateContext->jsonFile);

if (fsync(fd) != 0)
{
log_error("Failed to fsync file \"%s\": %m",
privateContext->partialFileName);
return false;
}
}

context->tracking->flushed_lsn = context->tracking->written_lsn;
Expand Down Expand Up @@ -1431,15 +1437,15 @@ stream_sync_sentinel(LogicalStreamContext *context)
context->endpos = sentinel.endpos;
context->tracking->applied_lsn = sentinel.replay_lsn;

log_debug("stream_sync_sentinel: "
"write_lsn %X/%X flush_lsn %X/%X apply_lsn %X/%X "
"startpos %X/%X endpos %X/%X apply %s",
LSN_FORMAT_ARGS(context->tracking->written_lsn),
LSN_FORMAT_ARGS(context->tracking->flushed_lsn),
LSN_FORMAT_ARGS(context->tracking->applied_lsn),
LSN_FORMAT_ARGS(privateContext->startpos),
LSN_FORMAT_ARGS(privateContext->endpos),
privateContext->apply ? "enabled" : "disabled");
log_warn("stream_sync_sentinel: "
"write_lsn %X/%X flush_lsn %X/%X apply_lsn %X/%X "
"startpos %X/%X endpos %X/%X apply %s",
LSN_FORMAT_ARGS(context->tracking->written_lsn),
LSN_FORMAT_ARGS(context->tracking->flushed_lsn),
LSN_FORMAT_ARGS(context->tracking->applied_lsn),
LSN_FORMAT_ARGS(privateContext->startpos),
LSN_FORMAT_ARGS(privateContext->endpos),
privateContext->apply ? "enabled" : "disabled");

return true;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/follow-wal2json/copydb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ psql -o /tmp/d.out -d ${PGCOPYDB_SOURCE_PGURI} -1 -f /usr/src/pagila/pagila-data
psql -d ${PGCOPYDB_SOURCE_PGURI} -f /usr/src/pgcopydb/ddl.sql

# pgcopydb clone uses the environment variables
pgcopydb clone --follow --plugin wal2json --debug
pgcopydb clone --follow --plugin wal2json --notice

db="/var/lib/postgres/.local/share/pgcopydb/00000001-*.db"

Expand Down
3 changes: 2 additions & 1 deletion tests/follow-wal2json/inject.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ pgcopydb stream sentinel get
# that the other process in the pgcopydb service is done before exiting
# here.
#
opts="--noheader --column"
sql="select '${lsn}' <= flush_lsn from sentinel"

while [ `sqlite3 ${db} "${sql}"` != '1' ]
while [ `sqlite3 ${opts} ${db} "${sql}" 2>/dev/null` != '1' ]
do
sleep 1
done
Expand Down

0 comments on commit 9520000

Please sign in to comment.