Skip to content

Commit 0c3044e

Browse files
authored
[Hotfix][CDC] Fix split schema change stream (apache#7003)
1 parent 904e9cf commit 0c3044e

File tree

2 files changed

+115
-65
lines changed

2 files changed

+115
-65
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java

+94-63
Original file line numberDiff line numberDiff line change
@@ -171,69 +171,7 @@ private Iterator<SourceRecords> splitNormalStream(List<DataChangeEvent> batchEve
171171
* checkpoint-after] [a, b, c, d, e]
172172
*/
173173
Iterator<SourceRecords> splitSchemaChangeStream(List<DataChangeEvent> batchEvents) {
174-
List<SourceRecords> sourceRecordsSet = new ArrayList<>();
175-
176-
List<SourceRecord> sourceRecordList = new ArrayList<>();
177-
SourceRecord previousRecord = null;
178-
for (int i = 0; i < batchEvents.size(); i++) {
179-
DataChangeEvent event = batchEvents.get(i);
180-
SourceRecord currentRecord = event.getRecord();
181-
if (!shouldEmit(currentRecord)) {
182-
continue;
183-
}
184-
185-
if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
186-
if (!schemaChangeResolver.support(currentRecord)) {
187-
continue;
188-
}
189-
190-
if (previousRecord == null) {
191-
// add schema-change-before to first
192-
sourceRecordList.add(
193-
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
194-
sourceRecordsSet.add(new SourceRecords(sourceRecordList));
195-
sourceRecordList = new ArrayList<>();
196-
sourceRecordList.add(currentRecord);
197-
} else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) {
198-
sourceRecordList.add(currentRecord);
199-
} else {
200-
sourceRecordList.add(
201-
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
202-
sourceRecordsSet.add(new SourceRecords(sourceRecordList));
203-
sourceRecordList = new ArrayList<>();
204-
sourceRecordList.add(currentRecord);
205-
}
206-
} else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
207-
|| SourceRecordUtils.isHeartbeatRecord(currentRecord)) {
208-
if (previousRecord == null
209-
|| SourceRecordUtils.isDataChangeRecord(previousRecord)
210-
|| SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
211-
sourceRecordList.add(currentRecord);
212-
} else {
213-
sourceRecordList.add(
214-
WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord));
215-
sourceRecordsSet.add(new SourceRecords(sourceRecordList));
216-
sourceRecordList = new ArrayList<>();
217-
sourceRecordList.add(currentRecord);
218-
}
219-
}
220-
previousRecord = currentRecord;
221-
if (i == batchEvents.size() - 1) {
222-
if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
223-
sourceRecordList.add(
224-
WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord));
225-
}
226-
sourceRecordsSet.add(new SourceRecords(sourceRecordList));
227-
}
228-
}
229-
230-
if (sourceRecordsSet.size() > 1) {
231-
log.debug(
232-
"Split events stream into {} batches and mark schema checkpoint before/after",
233-
sourceRecordsSet.size());
234-
}
235-
236-
return sourceRecordsSet.iterator();
174+
return new SchemaChangeStreamSplitter().split(batchEvents);
237175
}
238176

239177
private void checkReadException() {
@@ -349,4 +287,97 @@ private void configureFilter() {
349287
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
350288
this.pureBinlogPhaseTables.clear();
351289
}
290+
291+
class SchemaChangeStreamSplitter {
292+
private List<SourceRecords> blockSet;
293+
private List<SourceRecord> currentBlock;
294+
private SourceRecord previousRecord;
295+
296+
public SchemaChangeStreamSplitter() {
297+
blockSet = new ArrayList<>();
298+
currentBlock = new ArrayList<>();
299+
previousRecord = null;
300+
}
301+
302+
public Iterator<SourceRecords> split(List<DataChangeEvent> batchEvents) {
303+
for (int i = 0; i < batchEvents.size(); i++) {
304+
DataChangeEvent event = batchEvents.get(i);
305+
SourceRecord currentRecord = event.getRecord();
306+
if (!shouldEmit(currentRecord)) {
307+
continue;
308+
}
309+
310+
if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
311+
if (!schemaChangeResolver.support(currentRecord)) {
312+
continue;
313+
}
314+
315+
if (previousRecord == null) {
316+
// add schema-change-before to first
317+
currentBlock.add(
318+
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
319+
flipBlock();
320+
321+
currentBlock.add(currentRecord);
322+
} else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) {
323+
currentBlock.add(currentRecord);
324+
} else {
325+
currentBlock.add(
326+
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
327+
flipBlock();
328+
329+
currentBlock.add(currentRecord);
330+
}
331+
} else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
332+
|| SourceRecordUtils.isHeartbeatRecord(currentRecord)) {
333+
if (previousRecord == null
334+
|| SourceRecordUtils.isDataChangeRecord(previousRecord)
335+
|| SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
336+
currentBlock.add(currentRecord);
337+
} else {
338+
endBlock(previousRecord);
339+
flipBlock();
340+
341+
currentBlock.add(currentRecord);
342+
}
343+
}
344+
345+
previousRecord = currentRecord;
346+
if (i == batchEvents.size() - 1) {
347+
endBlock(currentRecord);
348+
flipBlock();
349+
}
350+
}
351+
352+
endLastBlock(previousRecord);
353+
354+
if (blockSet.size() > 1) {
355+
log.debug(
356+
"Split events stream into {} batches and mark schema change checkpoint",
357+
blockSet.size());
358+
}
359+
360+
return blockSet.iterator();
361+
}
362+
363+
void flipBlock() {
364+
if (!currentBlock.isEmpty()) {
365+
blockSet.add(new SourceRecords(currentBlock));
366+
currentBlock = new ArrayList<>();
367+
}
368+
}
369+
370+
void endBlock(SourceRecord lastRecord) {
371+
if (!currentBlock.isEmpty()) {
372+
if (SourceRecordUtils.isSchemaChangeEvent(lastRecord)) {
373+
currentBlock.add(WatermarkEvent.createSchemaChangeAfterWatermark(lastRecord));
374+
}
375+
}
376+
}
377+
378+
void endLastBlock(SourceRecord lastRecord) {
379+
endBlock(lastRecord);
380+
flipBlock();
381+
}
382+
}
352383
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.junit.jupiter.api.Assertions;
3131
import org.junit.jupiter.api.Test;
32+
import org.mockito.stubbing.Answer;
3233

3334
import io.debezium.config.CommonConnectorConfig;
3435
import io.debezium.config.Configuration;
@@ -61,6 +62,7 @@ public class IncrementalSourceStreamFetcherTest {
6162
.with(Heartbeat.HEARTBEAT_INTERVAL, 1)
6263
.with(TRANSACTION_TOPIC, "test")
6364
.build();
65+
private static final String UNKNOWN_SCHEMA_KEY = "UNKNOWN";
6466

6567
@Test
6668
public void testSplitSchemaChangeStream() throws Exception {
@@ -107,6 +109,7 @@ public void testSplitSchemaChangeStream() throws Exception {
107109
inputEvents.add(new DataChangeEvent(createDataEvent()));
108110
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
109111
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
112+
inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent()));
110113
outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
111114
outputEvents.forEachRemaining(records::add);
112115

@@ -134,6 +137,7 @@ public void testSplitSchemaChangeStream() throws Exception {
134137
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
135138
inputEvents.add(new DataChangeEvent(createDataEvent()));
136139
inputEvents.add(new DataChangeEvent(createDataEvent()));
140+
inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent()));
137141
outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
138142
outputEvents.forEachRemaining(records::add);
139143

@@ -323,13 +327,21 @@ public void testSplitSchemaChangeStream() throws Exception {
323327
}
324328

325329
static SourceRecord createSchemaChangeEvent() {
330+
return createSchemaChangeEvent("SCHEMA_CHANGE_TOPIC");
331+
}
332+
333+
static SourceRecord createSchemaChangeUnknownEvent() {
334+
return createSchemaChangeEvent(UNKNOWN_SCHEMA_KEY);
335+
}
336+
337+
static SourceRecord createSchemaChangeEvent(String topic) {
326338
Schema keySchema =
327339
SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build();
328340
SourceRecord record =
329341
new SourceRecord(
330342
Collections.emptyMap(),
331343
Collections.emptyMap(),
332-
null,
344+
topic,
333345
keySchema,
334346
null,
335347
null,
@@ -377,7 +389,14 @@ static SourceRecord createHeartbeatEvent() throws InterruptedException {
377389

378390
static IncrementalSourceStreamFetcher createFetcher() {
379391
SchemaChangeResolver schemaChangeResolver = mock(SchemaChangeResolver.class);
380-
when(schemaChangeResolver.support(any())).thenReturn(true);
392+
when(schemaChangeResolver.support(any()))
393+
.thenAnswer(
394+
(Answer<Boolean>)
395+
invocationOnMock -> {
396+
SourceRecord record = invocationOnMock.getArgument(0);
397+
return record.topic() == null
398+
|| !record.topic().equalsIgnoreCase(UNKNOWN_SCHEMA_KEY);
399+
});
381400
IncrementalSourceStreamFetcher fetcher =
382401
new IncrementalSourceStreamFetcher(null, 0, schemaChangeResolver);
383402
IncrementalSourceStreamFetcher spy = spy(fetcher);

0 commit comments

Comments
 (0)