Skip to content

Commit 83ea67b

Browse files
knizhnikkelvich
authored andcommitted
Add mmts control file
1 parent 9cbd0fe commit 83ea67b

File tree

3 files changed

+74
-8
lines changed

3 files changed

+74
-8
lines changed

multimaster.c

+67-4
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ bool MtmVolksWagenMode;
213213
TransactionId MtmUtilityProcessedInXid;
214214

215215
static char* MtmConnStrs;
216+
static char* MtmClusterName;
216217
static int MtmQueueSize;
217218
static int MtmWorkers;
218219
static int MtmVacuumDelay;
@@ -1866,6 +1867,39 @@ static void MtmRaftableInitialize()
18661867
raftable_start(MtmNodeId - 1);
18671868
}
18681869

1870+
static void MtmCheckControlFile(void)
1871+
{
1872+
char controlFilePath[MAXPGPATH];
1873+
char buf[MULTIMASTER_MAX_CTL_STR_SIZE];
1874+
FILE* f;
1875+
snprintf(controlFilePath, MAXPGPATH, "%s/global/mmts_control", DataDir);
1876+
f = fopen(controlFilePath, "r");
1877+
if (f != NULL && fgets(buf, sizeof buf, f)) {
1878+
char* sep = strchr(buf, ':');
1879+
if (sep == NULL) {
1880+
elog(FATAL, "File mmts_control doesn't contain cluster name");
1881+
}
1882+
*sep = '\0';
1883+
if (strcmp(buf, MtmClusterName) != 0) {
1884+
elog(FATAL, "Database belongs to some other cluster %s rather than %s", buf, MtmClusterName);
1885+
}
1886+
if (sscanf(sep+1, "%d", &Mtm->donorNodeId) != 1) {
1887+
elog(FATAL, "File mmts_control doesn't contain node id");
1888+
}
1889+
fclose(f);
1890+
} else {
1891+
if (f != NULL) {
1892+
fclose(f);
1893+
}
1894+
f = fopen(controlFilePath, "w");
1895+
if (f == NULL) {
1896+
elog(FATAL, "Failed to create mmts_control file: %m");
1897+
}
1898+
Mtm->donorNodeId = -1;
1899+
fprintf(f, "%s:%d\n", MtmClusterName, Mtm->donorNodeId);
1900+
fclose(f);
1901+
}
1902+
}
18691903

18701904
static void MtmInitialize()
18711905
{
@@ -1930,6 +1964,8 @@ static void MtmInitialize()
19301964
MtmDoReplication = true;
19311965
TM = &MtmTM;
19321966
LWLockRelease(AddinShmemInitLock);
1967+
1968+
MtmCheckControlFile();
19331969
}
19341970

19351971
static void
@@ -2471,6 +2507,19 @@ _PG_init(void)
24712507
NULL /* GucShowHook show_hook */
24722508
);
24732509

2510+
DefineCustomStringVariable(
2511+
"multimaster.cluster_name",
2512+
"Name of the cluster",
2513+
NULL,
2514+
&MtmClusterName,
2515+
"mmts",
2516+
PGC_BACKEND, /* context */
2517+
0, /* flags */
2518+
NULL, /* GucStringCheckHook check_hook */
2519+
NULL, /* GucStringAssignHook assign_hook */
2520+
NULL /* GucShowHook show_hook */
2521+
);
2522+
24742523
DefineCustomIntVariable(
24752524
"multimaster.node_id",
24762525
"Multimaster node ID",
@@ -2608,8 +2657,10 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
26082657
MtmLock(LW_EXCLUSIVE);
26092658
if (Mtm->status == MTM_RECOVERY) {
26102659
recovery = true;
2611-
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) {
2612-
/* Choose for recovery first available slot */
2660+
if ((Mtm->recoverySlot == 0 && (Mtm->donorNodeId < 0 || Mtm->donorNodeId == nodeId))
2661+
|| Mtm->recoverySlot == nodeId)
2662+
{
2663+
/* Choose for recovery first available slot or slot of donor node (if any) */
26132664
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
26142665
Mtm->recoverySlot = nodeId;
26152666
Mtm->nReceivers = 0;
@@ -2697,6 +2748,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
26972748
{
26982749
ListCell *param;
26992750
bool recoveryCompleted = false;
2751+
XLogRecPtr recoveryStartPos = InvalidXLogRecPtr;
2752+
27002753
MtmIsRecoverySession = false;
27012754
Mtm->nodes[MtmReplicationNodeId-1].senderPid = MyProcPid;
27022755
Mtm->nodes[MtmReplicationNodeId-1].senderStartTime = MtmGetSystemTime();
@@ -2716,11 +2769,21 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
27162769
elog(ERROR, "Replication mode is not specified");
27172770
}
27182771
break;
2772+
} else if (strcmp("mtm_restart_pos", elem->defname) == 0) {
2773+
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
2774+
recoveryStartPos = intVal(elem->arg);
2775+
} else {
2776+
elog(ERROR, "Restart position is not specified");
2777+
}
27192778
}
27202779
}
27212780
MtmLock(LW_EXCLUSIVE);
2722-
if (MtmIsRecoverySession) {
2723-
MTM_LOG1("%d: Node %d start recovery of node %d", MyProcPid, MtmNodeId, MtmReplicationNodeId);
2781+
if (MtmIsRecoverySession) {
2782+
MTM_LOG1("%d: Node %d start recovery of node %d at position %lx", MyProcPid, MtmNodeId, MtmReplicationNodeId, recoveryStartPos);
2783+
Assert(MyReplicationSlot != NULL);
2784+
if (recoveryStartPos < MyReplicationSlot->data.restart_lsn) {
2785+
elog(ERROR, "Specified recovery start position %lx is beyond restart lsn %lx", recoveryStartPos, MyReplicationSlot->data.restart_lsn);
2786+
}
27242787
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
27252788
MtmDisableNode(MtmReplicationNodeId);
27262789
MtmCheckQuorum();

multimaster.h

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
5353
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
5454
#define MULTIMASTER_MAX_LOCAL_TABLES 256
55+
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
5556
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
5657
#define MULTIMASTER_ADMIN "mtm_admin"
5758

@@ -241,6 +242,7 @@ typedef struct
241242
int nActiveTransactions; /* Nunmber of active 2PC transactions */
242243
int nConfigChanges; /* Number of cluster configuration changes */
243244
int recoveryCount; /* Number of completed recoveries */
245+
int donorNodeId; /* Cluster node from which this node was populated */
244246
int64 timeShift; /* Local time correction */
245247
csn_t csn; /* Last obtained timestamp: used to provide unique acending CSNs based on system time */
246248
csn_t lastCsn; /* CSN of last committed transaction */

pglogical_receiver.c

+5-4
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ pglogical_receiver_main(Datum main_arg)
285285
timeline = Mtm->nodes[nodeId-1].timeline;
286286
newTimeline = true;
287287
}
288-
/* My original assumption was that we can perfrom recovery only fromm existed slot,
288+
/* My original assumption was that we can perfrom recovery only from existed slot,
289289
* but unfortunately looks like slots can "disapear" together with WAL-sender.
290290
* So let's try to recreate slot always. */
291291
/* if (mode != REPLMODE_REPLICATION) */
@@ -325,7 +325,7 @@ pglogical_receiver_main(Datum main_arg)
325325
* Them are either empty, either new node is synchronized using base_backup.
326326
* So we assume that LSNs are the same for local and remote node
327327
*/
328-
originStartPos = Mtm->status == MTM_RECOVERY ? GetXLogInsertRecPtr() : InvalidXLogRecPtr;
328+
originStartPos = Mtm->status == MTM_RECOVERY && Mtm->donorNodeId == nodeId ? GetXLogInsertRecPtr() : InvalidXLogRecPtr;
329329
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
330330
} else {
331331
originStartPos = replorigin_get_progress(originId, false);
@@ -335,13 +335,14 @@ pglogical_receiver_main(Datum main_arg)
335335
CommitTransactionCommand();
336336
}
337337

338-
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s')",
338+
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx')",
339339
slotName,
340340
(uint32) (originStartPos >> 32),
341341
(uint32) originStartPos,
342342
MULTIMASTER_MAX_PROTO_VERSION,
343343
MULTIMASTER_MIN_PROTO_VERSION,
344-
MtmReplicationModeName[mode]
344+
MtmReplicationModeName[mode],
345+
originStartPos
345346
);
346347
res = PQexec(conn, query->data);
347348
if (PQresultStatus(res) != PGRES_COPY_BOTH)

0 commit comments

Comments
 (0)