9
9
import java .util .Map ;
10
10
import java .util .Objects ;
11
11
import java .util .OptionalLong ;
12
+ import java .util .concurrent .ConcurrentLinkedQueue ;
12
13
import java .util .concurrent .atomic .AtomicReference ;
13
14
14
15
import org .apache .kafka .connect .errors .ConnectException ;
@@ -80,12 +81,16 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
80
81
*/
81
82
private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0 ;
82
83
private Lsn lastCompletelyProcessedLsn ;
84
+ private Lsn lastSentFeedback = Lsn .valueOf (2L );
83
85
private PostgresOffsetContext effectiveOffset ;
84
86
87
+ protected ConcurrentLinkedQueue <Lsn > commitTimes ;
88
+
85
89
/**
86
90
* For DEBUGGING
87
91
*/
88
92
private OptionalLong lastTxnidForWhichCommitSeen = OptionalLong .empty ();
93
+ private long recordCount = 0 ;
89
94
90
95
public PostgresStreamingChangeEventSource (PostgresConnectorConfig connectorConfig , Snapshotter snapshotter ,
91
96
PostgresConnection connection , PostgresEventDispatcher <TableId > dispatcher , ErrorHandler errorHandler , Clock clock ,
@@ -101,7 +106,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
101
106
this .snapshotter = snapshotter ;
102
107
this .replicationConnection = (PostgresReplicationConnection ) replicationConnection ;
103
108
this .connectionProbeTimer = ElapsedTimeStrategy .constant (Clock .system (), connectorConfig .statusUpdateInterval ());
104
-
109
+ this . commitTimes = new ConcurrentLinkedQueue <>();
105
110
}
106
111
107
112
@ Override
@@ -121,6 +126,20 @@ private void initSchema() {
121
126
}
122
127
}
123
128
129
+ public Lsn getLsn (PostgresOffsetContext offsetContext , PostgresConnectorConfig .LsnType lsnType ) {
130
+ if (lsnType .isSequence ()) {
131
+ return this .effectiveOffset .lastCompletelyProcessedLsn () != null ? this .effectiveOffset .lastCompletelyProcessedLsn ()
132
+ : this .effectiveOffset .lsn ();
133
+ } else {
134
+ // We are in the block for HYBRID_TIME lsn type and last commit can be null for cases
135
+ // where we have just started/restarted the connector, in that case, we simply sent the
136
+ // initial value of lastSentFeedback and let the server handle the time we
137
+ // should get the changes from.
138
+ return this .effectiveOffset .lastCommitLsn () == null ?
139
+ lastSentFeedback : this .effectiveOffset .lastCommitLsn ();
140
+ }
141
+ }
142
+
124
143
@ Override
125
144
public void execute (ChangeEventSourceContext context , PostgresPartition partition , PostgresOffsetContext offsetContext )
126
145
throws InterruptedException {
@@ -148,17 +167,24 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
148
167
}
149
168
150
169
if (hasStartLsnStoredInContext ) {
151
- // start streaming from the last recorded position in the offset
152
- final Lsn lsn = this .effectiveOffset .lastCompletelyProcessedLsn () != null ? this .effectiveOffset .lastCompletelyProcessedLsn ()
153
- : this .effectiveOffset .lsn ();
170
+ final Lsn lsn = getLsn (this .effectiveOffset , connectorConfig .slotLsnType ());
154
171
final Operation lastProcessedMessageType = this .effectiveOffset .lastProcessedMessageType ();
155
- LOGGER .info ("Retrieved latest position from stored offset '{}'" , lsn );
156
- walPosition = new WalPositionLocator (this .effectiveOffset .lastCommitLsn (), lsn , lastProcessedMessageType );
172
+
173
+ if (this .effectiveOffset .lastCommitLsn () == null ) {
174
+ LOGGER .info ("Last commit stored in offset is null" );
175
+ }
176
+
177
+ LOGGER .info ("Retrieved last committed LSN from stored offset '{}'" , lsn );
178
+
179
+ walPosition = new WalPositionLocator (this .effectiveOffset .lastCommitLsn (), lsn ,
180
+ lastProcessedMessageType , connectorConfig .slotLsnType ().isHybridTime () /* isLsnTypeHybridTime */ );
181
+
157
182
replicationStream .compareAndSet (null , replicationConnection .startStreaming (lsn , walPosition ));
183
+ lastSentFeedback = lsn ;
158
184
}
159
185
else {
160
186
LOGGER .info ("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN..." );
161
- walPosition = new WalPositionLocator ();
187
+ walPosition = new WalPositionLocator (this . connectorConfig . slotLsnType (). isHybridTime () );
162
188
replicationStream .compareAndSet (null , replicationConnection .startStreaming (walPosition ));
163
189
}
164
190
// for large dbs, the refresh of schema can take too much time
@@ -188,7 +214,13 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
188
214
} catch (Exception e ) {
189
215
LOGGER .info ("Commit failed while preparing for reconnect" , e );
190
216
}
191
- walPosition .enableFiltering ();
217
+
218
+ // Do not filter anything when lsn type is hybrid time. This is to avoid the WalPositionLocator complaining
219
+ // about the LSN not being present in the lsnSeen set.
220
+ if (connectorConfig .slotLsnType ().isSequence ()) {
221
+ walPosition .enableFiltering ();
222
+ }
223
+
192
224
stream .stopKeepAlive ();
193
225
replicationConnection .reconnect ();
194
226
@@ -198,7 +230,11 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
198
230
replicationConnection .getConnectedNodeIp ());
199
231
}
200
232
201
- replicationStream .set (replicationConnection .startStreaming (walPosition .getLastEventStoredLsn (), walPosition ));
233
+ // For the HybridTime mode, we always want to resume from the position of last commit so that we
234
+ // send complete transactions and do not resume from the last event stored LSN.
235
+ Lsn lastStoredLsn = connectorConfig .slotLsnType ().isHybridTime () ? walPosition .getLastCommitStoredLsn () : walPosition .getLastEventStoredLsn ();
236
+ replicationStream .set (replicationConnection .startStreaming (lastStoredLsn , walPosition ));
237
+
202
238
stream = this .replicationStream .get ();
203
239
stream .startKeepAlive (Threads .newSingleThreadExecutor (YugabyteDBConnector .class , connectorConfig .getLogicalName (), KEEP_ALIVE_THREAD_NAME ));
204
240
}
@@ -292,6 +328,8 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
292
328
LOGGER .debug ("Processing BEGIN with end LSN {} and txnid {}" , lsn , message .getTransactionId ());
293
329
} else {
294
330
LOGGER .debug ("Processing COMMIT with end LSN {} and txnid {}" , lsn , message .getTransactionId ());
331
+ LOGGER .debug ("Record count in the txn {} is {} with commit time {}" , message .getTransactionId (), recordCount , lsn .asLong () - 1 );
332
+ recordCount = 0 ;
295
333
}
296
334
297
335
OptionalLong currentTxnid = message .getTransactionId ();
@@ -308,7 +346,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
308
346
// Don't skip on BEGIN message as it would flush LSN for the whole transaction
309
347
// too early
310
348
if (message .getOperation () == Operation .COMMIT ) {
311
- commitMessage (partition , offsetContext , lsn );
349
+ commitMessage (partition , offsetContext , lsn , message );
312
350
}
313
351
return ;
314
352
}
@@ -321,7 +359,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
321
359
dispatcher .dispatchTransactionStartedEvent (partition , toString (message .getTransactionId ()), offsetContext , message .getCommitTime ());
322
360
}
323
361
else if (message .getOperation () == Operation .COMMIT ) {
324
- commitMessage (partition , offsetContext , lsn );
362
+ commitMessage (partition , offsetContext , lsn , message );
325
363
dispatcher .dispatchTransactionCommittedEvent (partition , offsetContext , message .getCommitTime ());
326
364
}
327
365
maybeWarnAboutGrowingWalBacklog (true );
@@ -333,7 +371,7 @@ else if (message.getOperation() == Operation.MESSAGE) {
333
371
334
372
// non-transactional message that will not be followed by a COMMIT message
335
373
if (message .isLastEventForLsn ()) {
336
- commitMessage (partition , offsetContext , lsn );
374
+ commitMessage (partition , offsetContext , lsn , message );
337
375
}
338
376
339
377
dispatcher .dispatchLogicalDecodingMessage (
@@ -346,6 +384,9 @@ else if (message.getOperation() == Operation.MESSAGE) {
346
384
}
347
385
// DML event
348
386
else {
387
+ LOGGER .trace ("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}" , lsn , lastCompletelyProcessedLsn );
388
+ ++recordCount ;
389
+
349
390
TableId tableId = null ;
350
391
if (message .getOperation () != Operation .NOOP ) {
351
392
tableId = PostgresSchema .parse (message .getTable ());
@@ -384,7 +425,17 @@ private void searchWalPosition(ChangeEventSourceContext context, PostgresPartiti
384
425
while (context .isRunning () && resumeLsn .get () == null ) {
385
426
386
427
boolean receivedMessage = stream .readPending (message -> {
387
- final Lsn lsn = stream .lastReceivedLsn ();
428
+ final Lsn lsn ;
429
+ if (connectorConfig .slotLsnType ().isHybridTime ()) {
430
+ // Last commit can be null for cases where
431
+ // we have just started/restarted the connector, in that case, we simply sent the
432
+ // initial value of lastSentFeedback and let the server handle the time we
433
+ // should get the changes from.
434
+
435
+ lsn = walPosition .getLastCommitStoredLsn () != null ? walPosition .getLastCommitStoredLsn () : lastSentFeedback ;
436
+ } else {
437
+ lsn = stream .lastReceivedLsn ();
438
+ }
388
439
resumeLsn .set (walPosition .resumeFromLsn (lsn , message ).orElse (null ));
389
440
});
390
441
@@ -412,9 +463,17 @@ private void probeConnectionIfNeeded() throws SQLException {
412
463
}
413
464
}
414
465
415
- private void commitMessage (PostgresPartition partition , PostgresOffsetContext offsetContext , final Lsn lsn ) throws SQLException , InterruptedException {
466
+ private void commitMessage (PostgresPartition partition , PostgresOffsetContext offsetContext , final Lsn lsn , ReplicationMessage message ) throws SQLException , InterruptedException {
416
467
lastCompletelyProcessedLsn = lsn ;
417
468
offsetContext .updateCommitPosition (lsn , lastCompletelyProcessedLsn );
469
+
470
+ if (this .connectorConfig .slotLsnType ().isHybridTime ()) {
471
+ if (message .getOperation () == Operation .COMMIT ) {
472
+ LOGGER .debug ("Adding '{}' as lsn to the commit times queue" , Lsn .valueOf (lsn .asLong () - 1 ));
473
+ commitTimes .add (Lsn .valueOf (lsn .asLong () - 1 ));
474
+ }
475
+ }
476
+
418
477
maybeWarnAboutGrowingWalBacklog (false );
419
478
dispatcher .dispatchHeartbeatEvent (partition , offsetContext );
420
479
}
@@ -470,11 +529,23 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
470
529
return ;
471
530
}
472
531
532
+ Lsn finalLsn ;
533
+ if (this .connectorConfig .slotLsnType ().isHybridTime ()) {
534
+ finalLsn = getLsnToBeFlushed (lsn );
535
+ } else {
536
+ finalLsn = lsn ;
537
+ }
538
+
473
539
if (LOGGER .isDebugEnabled ()) {
474
- LOGGER .debug ("Flushing LSN to server: {}" , lsn );
540
+ LOGGER .debug ("Flushing LSN to server: {}" , finalLsn );
475
541
}
476
542
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
477
- replicationStream .flushLsn (lsn );
543
+ replicationStream .flushLsn (finalLsn );
544
+
545
+ if (this .connectorConfig .slotLsnType ().isHybridTime ()) {
546
+ lastSentFeedback = finalLsn ;
547
+ cleanCommitTimeQueue (finalLsn );
548
+ }
478
549
}
479
550
else {
480
551
LOGGER .debug ("Streaming has already stopped, ignoring commit callback..." );
@@ -485,6 +556,45 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
485
556
}
486
557
}
487
558
559
+ /**
560
+ * Returns the LSN that should be flushed to the service. The {@code commitTimes} list will have
561
+ * a list of all the commit times for which we have received a commit record. All we want now
562
+ * is that whenever we get a commit callback, we should be flushing a time just smaller than
563
+ * the one we have gotten the callback on.
564
+ * @param lsn the {@link Lsn} received in callback
565
+ * @return the {@link Lsn} to be flushed
566
+ */
567
+ protected Lsn getLsnToBeFlushed (Lsn lsn ) {
568
+ if (commitTimes == null || commitTimes .isEmpty ()) {
569
+ // This means that the queue has not been initialised and the task is still starting.
570
+ return lastSentFeedback ;
571
+ }
572
+
573
+ Lsn result = lastSentFeedback ;
574
+
575
+ if (LOGGER .isDebugEnabled ()) {
576
+ LOGGER .debug ("Queue at this time: {}" , commitTimes );
577
+ }
578
+
579
+ for (Lsn commitLsn : commitTimes ) {
580
+ if (commitLsn .compareTo (lsn ) < 0 ) {
581
+ LOGGER .debug ("Assigning result as {}" , commitLsn );
582
+ result = commitLsn ;
583
+ } else {
584
+ // This will be the loop exit when we encounter any bigger element.
585
+ break ;
586
+ }
587
+ }
588
+
589
+ return result ;
590
+ }
591
+
592
+ protected void cleanCommitTimeQueue (Lsn lsn ) {
593
+ if (commitTimes != null ) {
594
+ commitTimes .removeIf (ele -> ele .compareTo (lsn ) < 1 );
595
+ }
596
+ }
597
+
488
598
@ Override
489
599
public PostgresOffsetContext getOffsetContext () {
490
600
return effectiveOffset ;
0 commit comments