Skip to content

Commit 1ac9152

Browse files
authored
CDC: Fix validation logic for configs related to parallel streaming (#173)
This PR fixes the bug while validating the parallel streaming configuration properties and instead of using a common method to validate, we now specify the lambda method directly at the time of `Field` declaration.
1 parent 13c3b13 commit 1ac9152

File tree

3 files changed

+43
-18
lines changed

3 files changed

+43
-18
lines changed

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java

+21-15
Original file line numberDiff line numberDiff line change
@@ -740,19 +740,37 @@ public static SchemaRefreshMode parse(String value) {
740740
.withDisplayName("Slot names for parallel consumption")
741741
.withImportance(Importance.LOW)
742742
.withDescription("Comma separated values for multiple slot names")
743-
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);
743+
.withValidation((config, field, output) -> {
744+
if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) {
745+
output.accept(field, "", "slot.names is only valid with streaming.mode 'parallel'");
746+
return 1;
747+
}
748+
return 0;
749+
});
744750

745751
public static final Field PUBLICATION_NAMES = Field.create("publication.names")
746752
.withDisplayName("Publication names for parallel consumption")
747753
.withImportance(Importance.LOW)
748754
.withDescription("Comma separated values for multiple publication names")
749-
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);
755+
.withValidation((config, field, output) -> {
756+
if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) {
757+
output.accept(field, "", "publication.names is only valid with streaming.mode 'parallel'");
758+
return 1;
759+
}
760+
return 0;
761+
});
750762

751763
public static final Field SLOT_RANGES = Field.create("slot.ranges")
752764
.withDisplayName("Ranges on which a slot is supposed to operate")
753765
.withImportance(Importance.LOW)
754766
.withDescription("Semi-colon separated values for hash ranges to be polled by tasks.")
755-
.withValidation(PostgresConnectorConfig::validateUsageWithParallelStreamingMode);
767+
.withValidation((config, field, output) -> {
768+
if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) {
769+
output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'");
770+
return 1;
771+
}
772+
return 0;
773+
});
756774

757775
public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections")
758776
.withDisplayName("YB load balance connections")
@@ -1580,16 +1598,4 @@ protected static int validateYBHostname(Configuration config, Field field, Field
15801598

15811599
return problemCount;
15821600
}
1583-
1584-
protected static int validateUsageWithParallelStreamingMode(Configuration config, Field field, Field.ValidationOutput problems) {
1585-
String mode = config.getString(STREAMING_MODE);
1586-
int problemCount = 0;
1587-
1588-
if (!StreamingMode.parse(mode).isParallel()) {
1589-
problems.accept(field, config.getString(field), "Configuration is only valid with parallel streaming mode");
1590-
++problemCount;
1591-
}
1592-
1593-
return problemCount;
1594-
}
15951601
}

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java

+7
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
144144

145145
// YB Note: Only applicable when snapshot mode is not parallel.
146146
// this will always have just one task with the given list of properties
147+
148+
// This is to ensure that whenever the connector runs with a single task model,
149+
// the default task ID is populated as this is not done by debezium-core.
150+
if (props != null) {
151+
props.put(PostgresConnectorConfig.TASK_ID, "0");
152+
}
153+
147154
return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props));
148155
}
149156

debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorConfigDefTest.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,22 @@ public void shouldFailIfSlotRangesSpecifiedWithoutParallelStreamingMode() {
103103
.with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.DEFAULT)
104104
.with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536");
105105

106-
int problemCount = PostgresConnectorConfig.validateUsageWithParallelStreamingMode(
107-
configBuilder.build(), PostgresConnectorConfig.SLOT_RANGES, (field, value, problemMessage) -> System.out.println(problemMessage));
106+
boolean valid = PostgresConnectorConfig.SLOT_RANGES.validate(
107+
configBuilder.build(), (field, value, problemMessage) -> System.out.println(problemMessage));
108108

109-
assertThat(problemCount == 1).isTrue();
109+
assertThat(valid).isFalse();
110+
}
111+
112+
@Test
113+
public void ensureNoErrorWhenProperParallelStreamingConfigSpecified() {
114+
Configuration.Builder configBuilder = TestHelper.defaultConfig()
115+
.with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.PARALLEL)
116+
.with(PostgresConnectorConfig.SLOT_RANGES, "0,10;10,65536");
117+
118+
boolean valid = PostgresConnectorConfig.SLOT_RANGES.validate(
119+
configBuilder.build(), (field, value, problemMessage) -> System.out.println(problemMessage));
120+
121+
assertThat(valid).isTrue();
110122
}
111123

112124
public void validateCorrectHostname(boolean multiNode) {

0 commit comments

Comments
 (0)