Skip to content

Commit

Permalink
added custom metrics implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-yb committed Oct 28, 2024
1 parent 4e55ebd commit 825a5b6
Show file tree
Hide file tree
Showing 20 changed files with 739 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -989,12 +989,12 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(PostgresSourceInfoStructMaker.class.getName());

public static final Field TASK_ID = Field.create("task.id")
.withDisplayName("ID of the connector task")
.withType(Type.INT)
.withDefault(0)
.withImportance(Importance.LOW)
.withDescription("Internal use only");
// public static final Field TASK_ID = Field.create("task.id")
// .withDisplayName("ID of the connector task")
// .withType(Type.INT)
// .withDefault(0)
// .withImportance(Importance.LOW)
// .withDescription("Internal use only");

public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns")
.withDisplayName("Comma separated primary key fields")
Expand Down Expand Up @@ -1135,9 +1135,9 @@ public boolean isFlushLsnOnSource() {
return flushLsnOnSource;
}

public int taskId() {
return getConfig().getInteger(TASK_ID);
}
// public int taskId() {
// return getConfig().getInteger(TASK_ID);
// }

public String primaryKeyHashColumns() {
return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import io.debezium.connector.postgresql.metrics.YugabyteDBMetricsFactory;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -109,9 +110,10 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry);

schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId());
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.getTaskId());
final PostgresPartition.Provider partitionProvider = new PostgresPartition.Provider(connectorConfig, config);
final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig));
partitionProvider, new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();

Expand Down Expand Up @@ -249,7 +251,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
replicationConnection,
slotCreatedInfo,
slotInfo),
new DefaultChangeEventSourceMetricsFactory<>(),
new YugabyteDBMetricsFactory(partitionProvider.getPartitions()),
dispatcher,
schema,
snapshotter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ public class PostgresPartition extends AbstractPartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";

private final String serverName;
private final int taskId;
private final String taskId;

public PostgresPartition(String serverName, String databaseName, int taskId) {
public PostgresPartition(String serverName, String databaseName, String taskId) {
super(databaseName);
this.serverName = serverName;
this.taskId = taskId;
Expand Down Expand Up @@ -57,7 +57,7 @@ public String toString() {
}

public String getPartitionIdentificationKey() {
return String.format("%s_%d", serverName, taskId);
return String.format("%s_%s", serverName, taskId);
}

static class Provider implements Partition.Provider<PostgresPartition> {
Expand All @@ -73,7 +73,7 @@ static class Provider implements Partition.Provider<PostgresPartition> {
public Set<PostgresPartition> getPartitions() {
return Collections.singleton(new PostgresPartition(
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()),
connectorConfig.taskId()));
connectorConfig.getTaskId()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch
this.schema = schema;
}

protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy, int taskId) {
super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet);
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy, String taskId) {
super(config.getContextName(), config.getLogicalName(), taskId, config.getCustomMetricTags(), Collections::emptySet);

this.config = config;
if (config.xminFetchInterval().toMillis() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected List<Map<String, String>> getConfigForParallelSnapshotConsumption(int
for (int i = 0; i < maxTasks; ++i) {
Map<String, String> taskProps = new HashMap<>(this.props);

taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i));
taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i));

long lowerBound = i * rangeSize;
long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.debezium.connector.postgresql.metrics;

import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.data.Envelope;
import io.debezium.metrics.Metrics;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.meters.CommonEventMeter;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.spi.schema.DataCollectionId;
import org.apache.kafka.connect.data.Struct;

import java.util.Map;

abstract class AbstractYugabyteDBPartitionMetrics extends YugabyteDBMetrics implements YugabyteDBPartitionMetricsMXBean {
private final CommonEventMeter commonEventMeter;

AbstractYugabyteDBPartitionMetrics(CdcSourceTaskContext taskContext, Map<String, String> tags,
EventMetadataProvider metadataProvider) {
super(taskContext, tags);
this.commonEventMeter = new CommonEventMeter(taskContext.getClock(), metadataProvider);
}

@Override
public String getLastEvent() {
return commonEventMeter.getLastEvent();
}

@Override
public long getMilliSecondsSinceLastEvent() {
return commonEventMeter.getMilliSecondsSinceLastEvent();
}

@Override
public long getTotalNumberOfEventsSeen() {
return commonEventMeter.getTotalNumberOfEventsSeen();
}

@Override
public long getTotalNumberOfCreateEventsSeen() {
return commonEventMeter.getTotalNumberOfCreateEventsSeen();
}

@Override
public long getTotalNumberOfUpdateEventsSeen() {
return commonEventMeter.getTotalNumberOfUpdateEventsSeen();
}

@Override
public long getTotalNumberOfDeleteEventsSeen() {
return commonEventMeter.getTotalNumberOfDeleteEventsSeen();
}

@Override
public long getNumberOfEventsFiltered() {
return commonEventMeter.getNumberOfEventsFiltered();
}

@Override
public long getNumberOfErroneousEvents() {
return commonEventMeter.getNumberOfErroneousEvents();
}

/**
* Invoked if an event is processed for a captured table.
*/
void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) {
commonEventMeter.onEvent(source, offset, key, value, operation);
}

/**
* Invoked for events pertaining to non-captured tables.
*/
void onFilteredEvent(String event) {
commonEventMeter.onFilteredEvent();
}

/**
* Invoked for events pertaining to non-captured tables.
*/
void onFilteredEvent(String event, Envelope.Operation operation) {
commonEventMeter.onFilteredEvent(operation);
}

/**
* Invoked for events that cannot be processed.
*/
void onErroneousEvent(String event) {
commonEventMeter.onErroneousEvent();
}

/**
* Invoked for events that cannot be processed.
*/
void onErroneousEvent(String event, Envelope.Operation operation) {
commonEventMeter.onErroneousEvent(operation);
}

/**
* Invoked for events that represent a connector event.
*/
void onConnectorEvent(ConnectorEvent event) {
}

@Override
public void reset() {
commonEventMeter.reset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.debezium.connector.postgresql.metrics;

import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.data.Envelope;
import io.debezium.metrics.Metrics;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.metrics.ChangeEventSourceMetrics;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
import org.apache.kafka.connect.data.Struct;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

abstract class AbstractYugabyteDBTaskMetrics<B extends AbstractYugabyteDBPartitionMetrics> extends YugabyteDBMetrics
implements ChangeEventSourceMetrics<PostgresPartition>, YugabyteDBTaskMetricsMXBean {

private final ChangeEventQueueMetrics changeEventQueueMetrics;
private final Map<PostgresPartition, B> beans = new HashMap<>();

AbstractYugabyteDBTaskMetrics(CdcSourceTaskContext taskContext,
String contextName,
ChangeEventQueueMetrics changeEventQueueMetrics,
Collection<PostgresPartition> partitions,
Function<PostgresPartition, B> beanFactory) {
super(taskContext, Collect.linkMapOf(
"server", taskContext.getConnectorName(),
"task", taskContext.getTaskId(),
"context", contextName));
this.changeEventQueueMetrics = changeEventQueueMetrics;

for (PostgresPartition partition : partitions) {
beans.put(partition, beanFactory.apply(partition));
}
}

@Override
public synchronized void register() {
super.register();
beans.values().forEach(YugabyteDBMetrics::register);
}

@Override
public synchronized void unregister() {
beans.values().forEach(YugabyteDBMetrics::unregister);
super.unregister();
}

@Override
public void reset() {
beans.values().forEach(B::reset);
}

@Override
public void onEvent(PostgresPartition partition, DataCollectionId source, OffsetContext offset, Object key,
Struct value, Envelope.Operation operation) {
onPartitionEvent(partition, bean -> bean.onEvent(source, offset, key, value, operation));
}

@Override
public void onFilteredEvent(PostgresPartition partition, String event) {
onPartitionEvent(partition, bean -> bean.onFilteredEvent(event));
}

@Override
public void onFilteredEvent(PostgresPartition partition, String event, Envelope.Operation operation) {
onPartitionEvent(partition, bean -> bean.onFilteredEvent(event, operation));
}

@Override
public void onErroneousEvent(PostgresPartition partition, String event) {
onPartitionEvent(partition, bean -> bean.onErroneousEvent(event));
}

@Override
public void onErroneousEvent(PostgresPartition partition, String event, Envelope.Operation operation) {
onPartitionEvent(partition, bean -> bean.onErroneousEvent(event, operation));
}

@Override
public void onConnectorEvent(PostgresPartition partition, ConnectorEvent event) {
onPartitionEvent(partition, bean -> bean.onConnectorEvent(event));
}

@Override
public int getQueueTotalCapacity() {
return changeEventQueueMetrics.totalCapacity();
}

@Override
public int getQueueRemainingCapacity() {
return changeEventQueueMetrics.remainingCapacity();
}

@Override
public long getMaxQueueSizeInBytes() {
return changeEventQueueMetrics.maxQueueSizeInBytes();
}

@Override
public long getCurrentQueueSizeInBytes() {
return changeEventQueueMetrics.currentQueueSizeInBytes();
}

protected void onPartitionEvent(PostgresPartition partition, Consumer<B> handler) {
B bean = beans.get(partition);
if (bean == null) {
throw new IllegalArgumentException("MBean for partition " + partition + " are not registered");
}
handler.accept(bean);
}
}
Loading

0 comments on commit 825a5b6

Please sign in to comment.