Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions instrumentation/kamon-kafka/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ kamon.instrumentation.kafka {
use-delayed-spans = no
}
}

internal {
metrics {
sample-interval.ms = 1000

# Sets the default metric prefix for the exported internal kafka metrics
# More details about the available metrics are available at https://docs.confluent.io/current/kafka/monitoring.html
prefix = "kafka"
}
}
}

kanela.modules {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package kamon.instrumentation.kafka.internal

import kamon.Kamon
import kamon.tag.TagSet
import org.apache.kafka.common.metrics.KafkaMetric

import scala.collection.JavaConverters._

case class KafkaGauge(kafkaMetric: KafkaMetric, prefix: String) {
private[internal] val tags = TagSet.from(
kafkaMetric.metricName.tags.asScala.mapValues(v => v.asInstanceOf[Any]).toMap
)
private[internal] val gauge = Kamon.gauge(name(), "").withTags(tags)

def name(): String = {
val mn = kafkaMetric.metricName
s"${prefix}_${mn.group}_${mn.name}".replaceAll("-", "_")
}

def metricValue: Long = kafkaMetric.metricValue match {
case d: java.lang.Double => d.toLong
case _ => 0L
}

def update(): Unit = gauge.update(metricValue)

def remove(): Unit = gauge.remove()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package kamon.instrumentation.kafka.internal

import java.util
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import kamon.Kamon
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}

import scala.concurrent.duration._
import scala.util.Try

/**
* Using kafka Client default metric listener to extract data.
* In order to have access to them, first you must register KafkaInternalInstrumentation as a listener
* using the "metric.reporters" property when creating a KafkaProducer or KafkaConsumer
*
* The key name is also available at org.apache.kafka.clients.CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
*
* More details about the exposed metrics: https://docs.confluent.io/current/kafka/monitoring.html
*/
class KafkaInternalInstrumentation extends MetricsReporter {

import KafkaInternalInstrumentation._

private[internal] var metricPrefix = ""
private[internal] val metrics: AtomicReference[List[KafkaGauge]] = new AtomicReference(List.empty)
private[internal] val updater: AtomicReference[Option[ScheduledFuture[_]]] = new AtomicReference(None)
private[internal] val doUpdate: Runnable = () => metrics.get().foreach(_.update())

def add(metric: KafkaMetric): Unit = {
val mn = metric.metricName
if (!metricBlacklist.contains(mn.name)) {
val bridge = KafkaGauge(metric, metricPrefix)
metrics.getAndUpdate((l: List[KafkaGauge]) => bridge +: l)
}
}

override def configure(configs: util.Map[String, _]): Unit = {
val configValue = for {
configValue <- Option(configs.get(reportIntervalKey))
interval <- Try(configValue.toString.toLong).toOption
} yield interval
val interval = configValue.getOrElse(defaultReportInterval)

updater.set(Some(Kamon.scheduler().scheduleAtFixedRate(doUpdate, interval, interval, TimeUnit.MILLISECONDS)))
metricPrefix = Option(configs.get(metricPrefixKey)).map(_.toString).getOrElse("kafka")
}

override def init(metrics: util.List[KafkaMetric]): Unit = metrics.forEach(add)

override def metricChange(metric: KafkaMetric): Unit = {
metricRemoval(metric)
add(metric)
}

override def metricRemoval(metric: KafkaMetric): Unit = {
val oldMetrics = metrics.getAndUpdate((l: List[KafkaGauge]) => l.filterNot(_.kafkaMetric == metric))
oldMetrics.find(_.kafkaMetric == metric).foreach(_.remove())
}

override def close(): Unit = updater.get().foreach(_.cancel(true))
}

object KafkaInternalInstrumentation {

private[internal] val metricBlacklist = Set("commit-id", "version")
private[internal] val defaultReportInterval = 1.second.toMillis

private[internal] val reportIntervalKey = "kamon.instrumentation.kafka.internal.metrics-sample-interval.ms"
private[internal] val metricPrefixKey = "kamon.instrumentation.kafka.internal.metrics.prefix"

def kafkaReporterValue(): String = {
KafkaInternalInstrumentation.getClass.getCanonicalName.split("\\$").last
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kamon.instrumentation.kafka.internal

import kamon.Kamon
import kamon.instrumentation.kafka.testutil.{SpanReportingTestScope, TestSpanReporting, TestTopicScope}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.CommonClientConfigs
import org.scalatest._

class KafkaInternalInstrumentationSpec extends WordSpec
with Matchers
with BeforeAndAfter
with BeforeAndAfterAll
with EmbeddedKafka
with TestTopicScope
with TestSpanReporting {

// increase zk connection timeout to avoid failing tests in "slow" environments
implicit val defaultConfig: EmbeddedKafkaConfig =
EmbeddedKafkaConfig.apply(customBrokerProperties = EmbeddedKafkaConfig.apply().customBrokerProperties
+ ("zookeeper.connection.timeout.ms" -> "20000")
+ ("auto.create.topics.enable" -> "false")
+ (CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG -> KafkaInternalInstrumentation.kafkaReporterValue())
)

override protected def beforeAll(): Unit = EmbeddedKafka.start()(defaultConfig)
override def afterAll(): Unit = EmbeddedKafka.stop()

"The Kafka Internal Instrumentation" should {
"have the reporter class value" in {
KafkaInternalInstrumentation.kafkaReporterValue() should be("kamon.instrumentation.kafka.internal.KafkaInternalInstrumentation")
}

"export base metric" in new SpanReportingTestScope(reporter) with TestTopicScope {
publishStringMessageToKafka(testTopicName, "message")(defaultConfig)

Kamon.status().metricRegistry().metrics.size should be > 0
Kamon.status().metricRegistry().metrics.exists(metric => metric.name.contains("kafka")) should be (true)
Kamon.status().metricRegistry().metrics.exists(metric => metric.name.equals("kafka_controller_channel_metrics_connection_count")) should be (true)
}
}
}