Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.idea
target
.project
.settings
.classpath
.java-version
*.iml
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Apache Kafka Client Metrics Reporter Plugin
103 changes: 103 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.kafka.instaclustr</groupId>
<artifactId>apache-kafka-client-metrics-reporter-plugin</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.13</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>

<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.5</version>
</dependency>

<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>1.8.0-alpha</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.75.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.75.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.75.0</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.32.0</version>
</dependency>

</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.7.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.instaclustr.kafka.KafkaClientMetricsReporter</mainClass>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.instaclustr.kafka;

import java.util.List;
import java.util.Map;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaClientMetricsReporter implements MetricsReporter, ClientTelemetry {

private static final Logger logger = LoggerFactory.getLogger(KafkaClientMetricsReporter.class);


@Override
public void init(List<KafkaMetric> metrics) {
logger.debug("Initializing the client metric reporter: {}", metrics);
}

@Override
public void configure(final Map<String, ?> configs) {
logger.debug("Configuration of the reporter");
}

@Override
public void metricChange(final KafkaMetric metric) {
logger.debug("Changing the metric {}", metric.metricName());
}

@Override
public void metricRemoval(final KafkaMetric metric) {
logger.debug("Removing the metric {}", metric.metricName());
}

@Override
public void close() {
logger.debug("Closing the reporter");
}

@Override
public ClientTelemetryReceiver clientReceiver() {
return new KafkaClientMetricsReporterReceiver();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.instaclustr.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.FileInputStream;
import java.util.Map;

public class KafkaClientMetricsReporterConfig {

public Map<String, Object> configurations;
private static final Logger logger = LoggerFactory.getLogger(KafkaClientMetricsReporterConfig.class);

public KafkaClientMetricsReporterConfig() {
try {
Yaml yaml = new Yaml();
final String kafkaClientMetricsConfigFilePath = System.getenv("KAFKA_CLIENT_METRICS_CONFIG_PATH");
logger.debug("Loading telemetry config from: {}", kafkaClientMetricsConfigFilePath);
final FileInputStream fis = new FileInputStream(kafkaClientMetricsConfigFilePath);
this.configurations = yaml.load(fis);

} catch (final Exception ex) {
logger.debug("Failed to load telemetry config", ex);
throw new RuntimeException("Failed to load telemetry config", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.instaclustr.kafka;

import com.instaclustr.kafka.exporters.MetricsExporter;
import com.instaclustr.kafka.exporters.MetricsExporterFactory;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaClientMetricsReporterReceiver implements ClientTelemetryReceiver {

private final MetricsExporter metricsExporter;
private static final Logger logger = LoggerFactory.getLogger(KafkaClientMetricsReporterReceiver.class);

public KafkaClientMetricsReporterReceiver() {
logger.info("Initializing the Kafka Client Metrics Reporter Receiver");
final KafkaClientMetricsReporterConfig kafkaClientMetricsReporterConfig = new KafkaClientMetricsReporterConfig();
this.metricsExporter = MetricsExporterFactory.create(kafkaClientMetricsReporterConfig.configurations);
logger.info("Initialized the Kafka Client Metrics Reporter Receiver");
}

@Override
public void exportMetrics(AuthorizableRequestContext requestContext, ClientTelemetryPayload telemetryPayload) {
metricsExporter.export(telemetryPayload);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.instaclustr.kafka.exporters;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

public class GrpcMetricsExporter implements MetricsExporter {
private final String endpoint;
private final int timeout;
private final ManagedChannel channel;
private final MetricsServiceGrpc.MetricsServiceBlockingStub stub;
private final Logger logger = LoggerFactory.getLogger(GrpcMetricsExporter.class);

public GrpcMetricsExporter(String endpoint, int timeout) {
this.endpoint = endpoint;
this.timeout = timeout;
this.channel = ManagedChannelBuilder
.forTarget(endpoint)
.usePlaintext()
.build();
this.stub = MetricsServiceGrpc
.newBlockingStub(channel)
.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
}

@Override
public void export(ClientTelemetryPayload payload) {
try {
ByteBuffer byteBuffer = payload.data();
byte[] bytes;

if (byteBuffer.hasArray()) {
bytes = byteBuffer.array();
} else {
bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
}

ExportMetricsServiceRequest req = ExportMetricsServiceRequest.parseFrom(bytes);

stub.export(req);
logger.info("Successfully exported {} ResourceMetrics to {}",
req.getResourceMetricsCount(), endpoint);
} catch (Exception e) {
logger.error("Failed to export OTLP metrics to {}: {}", endpoint, e.getMessage(), e);
}
}

/**
* Call this on shutdown to cleanly close the gRPC channel.
*/
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(this.timeout, TimeUnit.MILLISECONDS);
}
}
Loading