Skip to content

Commit 615451f

Browse files
authored
[yugabyte/yugabyte-db#26069] Custom metrics implementation for multi task model (#165)
## Problem Currently, the connector follows a single task single partition model and thus uses the `DefaultChangeEventSourceMetricsFactory` which doesn't let us override the metric object names. Since we cannot override names, this causes an issue when we go for the multi task model as there's a conflict in the metric object names if a task registers its metric objects and subsequent tasks try to register their metric objects too. ## Solution This is resolved by adding a custom metrics implementation which lets us customise things accordingly for a multi task model. This is done by following the example of metrics implemented by SqlServerConnector at https://github.com/yugabyte/debezium/tree/2.5.2.Final/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics This closes yugabyte/yugabyte-db#26069
1 parent e68ae92 commit 615451f

21 files changed

+734
-26
lines changed

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java

-11
Original file line numberDiff line numberDiff line change
@@ -1086,13 +1086,6 @@ public static AutoCreateMode parse(String value, String defaultValue) {
10861086
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
10871087
.withDefault(PostgresSourceInfoStructMaker.class.getName());
10881088

1089-
public static final Field TASK_ID = Field.create("task.id")
1090-
.withDisplayName("ID of the connector task")
1091-
.withType(Type.INT)
1092-
.withDefault(0)
1093-
.withImportance(Importance.LOW)
1094-
.withDescription("Internal use only");
1095-
10961089
public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns")
10971090
.withDisplayName("Comma separated primary key fields")
10981091
.withType(Type.STRING)
@@ -1245,10 +1238,6 @@ public boolean isFlushLsnOnSource() {
12451238
return flushLsnOnSource;
12461239
}
12471240

1248-
public int taskId() {
1249-
return getConfig().getInteger(TASK_ID);
1250-
}
1251-
12521241
public String primaryKeyHashColumns() {
12531242
return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS);
12541243
}

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Map;
1414
import java.util.stream.Collectors;
1515

16+
import io.debezium.connector.postgresql.metrics.YugabyteDBMetricsFactory;
1617
import org.apache.kafka.connect.errors.ConnectException;
1718
import org.apache.kafka.connect.errors.RetriableException;
1819
import org.apache.kafka.connect.source.SourceRecord;
@@ -123,9 +124,10 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
123124
final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry);
124125

125126
schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter);
126-
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId());
127+
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.getTaskId());
128+
final PostgresPartition.Provider partitionProvider = new PostgresPartition.Provider(connectorConfig, config);
127129
final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
128-
new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig));
130+
partitionProvider, new PostgresOffsetContext.Loader(connectorConfig));
129131
final Clock clock = Clock.system();
130132
final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
131133

@@ -255,7 +257,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
255257
replicationConnection,
256258
slotCreatedInfo,
257259
slotInfo),
258-
new DefaultChangeEventSourceMetricsFactory<>(),
260+
new YugabyteDBMetricsFactory(partitionProvider.getPartitions()),
259261
dispatcher,
260262
schema,
261263
snapshotter,

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ public class PostgresPartition extends AbstractPartition implements Partition {
2121
private static final String SERVER_PARTITION_KEY = "server";
2222

2323
private final String serverName;
24-
private final int taskId;
24+
private final String taskId;
2525

26-
public PostgresPartition(String serverName, String databaseName, int taskId) {
26+
public PostgresPartition(String serverName, String databaseName, String taskId) {
2727
super(databaseName);
2828
this.serverName = serverName;
2929
this.taskId = taskId;
@@ -57,7 +57,7 @@ public String toString() {
5757
}
5858

5959
public String getPartitionIdentificationKey() {
60-
return String.format("%s_%d", serverName, taskId);
60+
return String.format("%s_%s", serverName, taskId);
6161
}
6262

6363
static class Provider implements Partition.Provider<PostgresPartition> {
@@ -73,7 +73,7 @@ static class Provider implements Partition.Provider<PostgresPartition> {
7373
public Set<PostgresPartition> getPartitions() {
7474
return Collections.singleton(new PostgresPartition(
7575
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()),
76-
connectorConfig.taskId()));
76+
connectorConfig.getTaskId()));
7777
}
7878
}
7979
}

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch
5252
this.schema = schema;
5353
}
5454

55-
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy, int taskId) {
56-
super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet);
55+
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy, String taskId) {
56+
super(config.getContextName(), config.getLogicalName(), taskId, config.getCustomMetricTags(), Collections::emptySet);
5757

5858
this.config = config;
5959
if (config.xminFetchInterval().toMillis() > 0) {

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ protected List<Map<String, String>> getConfigForParallelSnapshotConsumption(int
112112
for (int i = 0; i < maxTasks; ++i) {
113113
Map<String, String> taskProps = new HashMap<>(this.props);
114114

115-
taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i));
115+
taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i));
116116

117117
long lowerBound = i * rangeSize;
118118
long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.debezium.connector.postgresql.metrics;
2+
3+
import io.debezium.connector.common.CdcSourceTaskContext;
4+
import io.debezium.connector.postgresql.PostgresTaskContext;
5+
import io.debezium.data.Envelope;
6+
import io.debezium.metrics.Metrics;
7+
import io.debezium.pipeline.ConnectorEvent;
8+
import io.debezium.pipeline.meters.CommonEventMeter;
9+
import io.debezium.pipeline.source.spi.EventMetadataProvider;
10+
import io.debezium.pipeline.spi.OffsetContext;
11+
import io.debezium.spi.schema.DataCollectionId;
12+
import org.apache.kafka.connect.data.Struct;
13+
14+
import java.util.Map;
15+
16+
abstract class AbstractYugabyteDBPartitionMetrics extends YugabyteDBMetrics implements YugabyteDBPartitionMetricsMXBean {
17+
private final CommonEventMeter commonEventMeter;
18+
19+
AbstractYugabyteDBPartitionMetrics(CdcSourceTaskContext taskContext, Map<String, String> tags,
20+
EventMetadataProvider metadataProvider) {
21+
super(taskContext, tags);
22+
this.commonEventMeter = new CommonEventMeter(taskContext.getClock(), metadataProvider);
23+
}
24+
25+
@Override
26+
public String getLastEvent() {
27+
return commonEventMeter.getLastEvent();
28+
}
29+
30+
@Override
31+
public long getMilliSecondsSinceLastEvent() {
32+
return commonEventMeter.getMilliSecondsSinceLastEvent();
33+
}
34+
35+
@Override
36+
public long getTotalNumberOfEventsSeen() {
37+
return commonEventMeter.getTotalNumberOfEventsSeen();
38+
}
39+
40+
@Override
41+
public long getTotalNumberOfCreateEventsSeen() {
42+
return commonEventMeter.getTotalNumberOfCreateEventsSeen();
43+
}
44+
45+
@Override
46+
public long getTotalNumberOfUpdateEventsSeen() {
47+
return commonEventMeter.getTotalNumberOfUpdateEventsSeen();
48+
}
49+
50+
@Override
51+
public long getTotalNumberOfDeleteEventsSeen() {
52+
return commonEventMeter.getTotalNumberOfDeleteEventsSeen();
53+
}
54+
55+
@Override
56+
public long getNumberOfEventsFiltered() {
57+
return commonEventMeter.getNumberOfEventsFiltered();
58+
}
59+
60+
@Override
61+
public long getNumberOfErroneousEvents() {
62+
return commonEventMeter.getNumberOfErroneousEvents();
63+
}
64+
65+
/**
66+
* Invoked if an event is processed for a captured table.
67+
*/
68+
void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) {
69+
commonEventMeter.onEvent(source, offset, key, value, operation);
70+
}
71+
72+
/**
73+
* Invoked for events pertaining to non-captured tables.
74+
*/
75+
void onFilteredEvent(String event) {
76+
commonEventMeter.onFilteredEvent();
77+
}
78+
79+
/**
80+
* Invoked for events pertaining to non-captured tables.
81+
*/
82+
void onFilteredEvent(String event, Envelope.Operation operation) {
83+
commonEventMeter.onFilteredEvent(operation);
84+
}
85+
86+
/**
87+
* Invoked for events that cannot be processed.
88+
*/
89+
void onErroneousEvent(String event) {
90+
commonEventMeter.onErroneousEvent();
91+
}
92+
93+
/**
94+
* Invoked for events that cannot be processed.
95+
*/
96+
void onErroneousEvent(String event, Envelope.Operation operation) {
97+
commonEventMeter.onErroneousEvent(operation);
98+
}
99+
100+
/**
101+
* Invoked for events that represent a connector event.
102+
*/
103+
void onConnectorEvent(ConnectorEvent event) {
104+
}
105+
106+
@Override
107+
public void reset() {
108+
commonEventMeter.reset();
109+
}
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package io.debezium.connector.postgresql.metrics;
2+
3+
import io.debezium.connector.base.ChangeEventQueueMetrics;
4+
import io.debezium.connector.common.CdcSourceTaskContext;
5+
import io.debezium.connector.postgresql.PostgresPartition;
6+
import io.debezium.data.Envelope;
7+
import io.debezium.metrics.Metrics;
8+
import io.debezium.pipeline.ConnectorEvent;
9+
import io.debezium.pipeline.metrics.ChangeEventSourceMetrics;
10+
import io.debezium.pipeline.spi.OffsetContext;
11+
import io.debezium.spi.schema.DataCollectionId;
12+
import io.debezium.util.Collect;
13+
import org.apache.kafka.connect.data.Struct;
14+
15+
import java.util.Collection;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
import java.util.function.Consumer;
19+
import java.util.function.Function;
20+
21+
abstract class AbstractYugabyteDBTaskMetrics<B extends AbstractYugabyteDBPartitionMetrics> extends YugabyteDBMetrics
22+
implements ChangeEventSourceMetrics<PostgresPartition>, YugabyteDBTaskMetricsMXBean {
23+
24+
private final ChangeEventQueueMetrics changeEventQueueMetrics;
25+
private final Map<PostgresPartition, B> beans = new HashMap<>();
26+
27+
AbstractYugabyteDBTaskMetrics(CdcSourceTaskContext taskContext,
28+
String contextName,
29+
ChangeEventQueueMetrics changeEventQueueMetrics,
30+
Collection<PostgresPartition> partitions,
31+
Function<PostgresPartition, B> beanFactory) {
32+
super(taskContext, Collect.linkMapOf(
33+
"server", taskContext.getConnectorName(),
34+
"task", taskContext.getTaskId(),
35+
"context", contextName));
36+
this.changeEventQueueMetrics = changeEventQueueMetrics;
37+
38+
for (PostgresPartition partition : partitions) {
39+
beans.put(partition, beanFactory.apply(partition));
40+
}
41+
}
42+
43+
@Override
44+
public synchronized void register() {
45+
super.register();
46+
beans.values().forEach(YugabyteDBMetrics::register);
47+
}
48+
49+
@Override
50+
public synchronized void unregister() {
51+
beans.values().forEach(YugabyteDBMetrics::unregister);
52+
super.unregister();
53+
}
54+
55+
@Override
56+
public void reset() {
57+
beans.values().forEach(B::reset);
58+
}
59+
60+
@Override
61+
public void onEvent(PostgresPartition partition, DataCollectionId source, OffsetContext offset, Object key,
62+
Struct value, Envelope.Operation operation) {
63+
onPartitionEvent(partition, bean -> bean.onEvent(source, offset, key, value, operation));
64+
}
65+
66+
@Override
67+
public void onFilteredEvent(PostgresPartition partition, String event) {
68+
onPartitionEvent(partition, bean -> bean.onFilteredEvent(event));
69+
}
70+
71+
@Override
72+
public void onFilteredEvent(PostgresPartition partition, String event, Envelope.Operation operation) {
73+
onPartitionEvent(partition, bean -> bean.onFilteredEvent(event, operation));
74+
}
75+
76+
@Override
77+
public void onErroneousEvent(PostgresPartition partition, String event) {
78+
onPartitionEvent(partition, bean -> bean.onErroneousEvent(event));
79+
}
80+
81+
@Override
82+
public void onErroneousEvent(PostgresPartition partition, String event, Envelope.Operation operation) {
83+
onPartitionEvent(partition, bean -> bean.onErroneousEvent(event, operation));
84+
}
85+
86+
@Override
87+
public void onConnectorEvent(PostgresPartition partition, ConnectorEvent event) {
88+
onPartitionEvent(partition, bean -> bean.onConnectorEvent(event));
89+
}
90+
91+
@Override
92+
public int getQueueTotalCapacity() {
93+
return changeEventQueueMetrics.totalCapacity();
94+
}
95+
96+
@Override
97+
public int getQueueRemainingCapacity() {
98+
return changeEventQueueMetrics.remainingCapacity();
99+
}
100+
101+
@Override
102+
public long getMaxQueueSizeInBytes() {
103+
return changeEventQueueMetrics.maxQueueSizeInBytes();
104+
}
105+
106+
@Override
107+
public long getCurrentQueueSizeInBytes() {
108+
return changeEventQueueMetrics.currentQueueSizeInBytes();
109+
}
110+
111+
protected void onPartitionEvent(PostgresPartition partition, Consumer<B> handler) {
112+
B bean = beans.get(partition);
113+
if (bean == null) {
114+
throw new IllegalArgumentException("MBean for partition " + partition + " are not registered");
115+
}
116+
handler.accept(bean);
117+
}
118+
}

0 commit comments

Comments
 (0)