From 25c04354f18dc52ddbb163d5572d2b2e2294cc46 Mon Sep 17 00:00:00 2001 From: Daniel Leon Date: Tue, 28 Jul 2020 01:26:50 +0300 Subject: [PATCH] Added KafkaMetrics internal support with MetricsReporter --- .../src/main/resources/reference.conf | 10 +++ .../kafka/internal/KafkaGauge.scala | 28 +++++++ .../KafkaInternalInstrumentation.scala | 76 +++++++++++++++++++ .../KafkaInternalInstrumentationSpec.scala | 41 ++++++++++ 4 files changed, 155 insertions(+) create mode 100644 instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/internal/KafkaGauge.scala create mode 100644 instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/internal/KafkaInternalInstrumentation.scala create mode 100644 instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/internal/KafkaInternalInstrumentationSpec.scala diff --git a/instrumentation/kamon-kafka/src/main/resources/reference.conf b/instrumentation/kamon-kafka/src/main/resources/reference.conf index d0e6a2b49..76dc79c6b 100644 --- a/instrumentation/kamon-kafka/src/main/resources/reference.conf +++ b/instrumentation/kamon-kafka/src/main/resources/reference.conf @@ -17,6 +17,16 @@ kamon.instrumentation.kafka { use-delayed-spans = no } } + + internal { + metrics { + sample-interval.ms = 1000 + + # Sets the default metric prefix for the exported internal kafka metrics + # More details about the available metrics are available at https://docs.confluent.io/current/kafka/monitoring.html + prefix = "kafka" + } + } } kanela.modules { diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/internal/KafkaGauge.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/internal/KafkaGauge.scala new file mode 100644 index 000000000..d0af6ba02 --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/internal/KafkaGauge.scala @@ -0,0 +1,28 @@ +package kamon.instrumentation.kafka.internal + +import kamon.Kamon +import kamon.tag.TagSet +import org.apache.kafka.common.metrics.KafkaMetric + +import scala.collection.JavaConverters._ + +case class KafkaGauge(kafkaMetric: KafkaMetric, prefix: String) { + private[internal] val tags = TagSet.from( + kafkaMetric.metricName.tags.asScala.mapValues(v => v.asInstanceOf[Any]).toMap + ) + private[internal] val gauge = Kamon.gauge(name(), "").withTags(tags) + + def name(): String = { + val mn = kafkaMetric.metricName + s"${prefix}_${mn.group}_${mn.name}".replaceAll("-", "_") + } + + def metricValue: Long = kafkaMetric.metricValue match { + case d: java.lang.Double => d.toLong + case _ => 0L + } + + def update(): Unit = gauge.update(metricValue) + + def remove(): Unit = gauge.remove() +} diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/internal/KafkaInternalInstrumentation.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/internal/KafkaInternalInstrumentation.scala new file mode 100644 index 000000000..60a5c8727 --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/internal/KafkaInternalInstrumentation.scala @@ -0,0 +1,76 @@ +package kamon.instrumentation.kafka.internal + +import java.util +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import kamon.Kamon +import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter} + +import scala.concurrent.duration._ +import scala.util.Try + +/** + * Using kafka Client default metric listener to extract data. + * In order to have access to them, first you must register KafkaInternalInstrumentation as a listener + * using the "metric.reporters" property when creating a KafkaProducer or KafkaConsumer + * + * The key name is also available at org.apache.kafka.clients.CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG + * + * More details about the exposed metrics: https://docs.confluent.io/current/kafka/monitoring.html + */ +class KafkaInternalInstrumentation extends MetricsReporter { + + import KafkaInternalInstrumentation._ + + private[internal] var metricPrefix = "" + private[internal] val metrics: AtomicReference[List[KafkaGauge]] = new AtomicReference(List.empty) + private[internal] val updater: AtomicReference[Option[ScheduledFuture[_]]] = new AtomicReference(None) + private[internal] val doUpdate: Runnable = () => metrics.get().foreach(_.update()) + + def add(metric: KafkaMetric): Unit = { + val mn = metric.metricName + if (!metricBlacklist.contains(mn.name)) { + val bridge = KafkaGauge(metric, metricPrefix) + metrics.getAndUpdate((l: List[KafkaGauge]) => bridge +: l) + } + } + + override def configure(configs: util.Map[String, _]): Unit = { + val configValue = for { + configValue <- Option(configs.get(reportIntervalKey)) + interval <- Try(configValue.toString.toLong).toOption + } yield interval + val interval = configValue.getOrElse(defaultReportInterval) + + updater.set(Some(Kamon.scheduler().scheduleAtFixedRate(doUpdate, interval, interval, TimeUnit.MILLISECONDS))) + metricPrefix = Option(configs.get(metricPrefixKey)).map(_.toString).getOrElse("kafka") + } + + override def init(metrics: util.List[KafkaMetric]): Unit = metrics.forEach(add) + + override def metricChange(metric: KafkaMetric): Unit = { + metricRemoval(metric) + add(metric) + } + + override def metricRemoval(metric: KafkaMetric): Unit = { + val oldMetrics = metrics.getAndUpdate((l: List[KafkaGauge]) => l.filterNot(_.kafkaMetric == metric)) + oldMetrics.find(_.kafkaMetric == metric).foreach(_.remove()) + } + + override def close(): Unit = updater.get().foreach(_.cancel(true)) +} + +object KafkaInternalInstrumentation { + + private[internal] val metricBlacklist = Set("commit-id", "version") + private[internal] val defaultReportInterval = 1.second.toMillis + + private[internal] val reportIntervalKey = "kamon.instrumentation.kafka.internal.metrics-sample-interval.ms" + private[internal] val metricPrefixKey = "kamon.instrumentation.kafka.internal.metrics.prefix" + + def kafkaReporterValue(): String = { + KafkaInternalInstrumentation.getClass.getCanonicalName.split("\\$").last + } +} diff --git a/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/internal/KafkaInternalInstrumentationSpec.scala b/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/internal/KafkaInternalInstrumentationSpec.scala new file mode 100644 index 000000000..7bef33d93 --- /dev/null +++ b/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/internal/KafkaInternalInstrumentationSpec.scala @@ -0,0 +1,41 @@ +package kamon.instrumentation.kafka.internal + +import kamon.Kamon +import kamon.instrumentation.kafka.testutil.{SpanReportingTestScope, TestSpanReporting, TestTopicScope} +import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import org.apache.kafka.clients.CommonClientConfigs +import org.scalatest._ + +class KafkaInternalInstrumentationSpec extends WordSpec + with Matchers + with BeforeAndAfter + with BeforeAndAfterAll + with EmbeddedKafka + with TestTopicScope + with TestSpanReporting { + + // increase zk connection timeout to avoid failing tests in "slow" environments + implicit val defaultConfig: EmbeddedKafkaConfig = + EmbeddedKafkaConfig.apply(customBrokerProperties = EmbeddedKafkaConfig.apply().customBrokerProperties + + ("zookeeper.connection.timeout.ms" -> "20000") + + ("auto.create.topics.enable" -> "false") + + (CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG -> KafkaInternalInstrumentation.kafkaReporterValue()) + ) + + override protected def beforeAll(): Unit = EmbeddedKafka.start()(defaultConfig) + override def afterAll(): Unit = EmbeddedKafka.stop() + + "The Kafka Internal Instrumentation" should { + "have the reporter class value" in { + KafkaInternalInstrumentation.kafkaReporterValue() should be("kamon.instrumentation.kafka.internal.KafkaInternalInstrumentation") + } + + "export base metric" in new SpanReportingTestScope(reporter) with TestTopicScope { + publishStringMessageToKafka(testTopicName, "message")(defaultConfig) + + Kamon.status().metricRegistry().metrics.size should be > 0 + Kamon.status().metricRegistry().metrics.exists(metric => metric.name.contains("kafka")) should be (true) + Kamon.status().metricRegistry().metrics.exists(metric => metric.name.equals("kafka_controller_channel_metrics_connection_count")) should be (true) + } + } +}