Skip to content

Commit

Permalink
[CELEBORN-1743] Resolve the metrics data interruption and the job fai…
Browse files Browse the repository at this point in the history
…lure caused by locked resources

### What changes were proposed in this pull request?
Remove the  ConcurrentLinkedQueue and lock in AbstractSource which might cause the metrics data interruption and job fail.

### Why are the changes needed?
Current problems:[jira CELEBORN-1743](https://issues.apache.org/jira/browse/CELEBORN-1743)
the lock in [[CELEBORN-1453]](#2548) might block the thread.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test
same result with CELEBORN-1453
![image](https://github.com/user-attachments/assets/3e3a4c53-1cf6-48f6-8c37-67d875d675af)

Closes #2956 from zaynt4606/clb1743.

Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
zaynt4606 authored and RexXiong committed Dec 4, 2024
1 parent 782393a commit cc04d13
Showing 1 changed file with 85 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import com.codahale.metrics._
Expand Down Expand Up @@ -59,8 +60,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String)

val metricsCapacity: Int = conf.metricsCapacity

val innerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]()

val timerSupplier = new TimerSupplier(metricsSlidingWindowSize)

val metricsCleaner: ScheduledExecutorService =
Expand All @@ -79,12 +78,26 @@ abstract class AbstractSource(conf: CelebornConf, role: String)

val applicationLabel = "applicationId"

val timerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]()

protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()

protected val namedTimers
: ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] =
JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]()

protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
JavaUtils.newConcurrentHashMap[String, NamedCounter]()

protected val namedMeters: ConcurrentHashMap[String, NamedMeter] =
JavaUtils.newConcurrentHashMap[String, NamedMeter]()

def addTimerMetrics(namedTimer: NamedTimer): Unit = {
val timerMetricsString = getTimerMetrics(namedTimer)
timerMetrics.add(timerMetricsString)
}

def addGauge[T](
name: String,
labels: Map[String, String],
Expand Down Expand Up @@ -145,10 +158,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
addMeter(name, Map.empty[String, String], meter)
}

protected val namedTimers
: ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] =
JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]()

def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String])

def addTimer(name: String, labels: Map[String, String]): Unit = {
Expand All @@ -165,9 +174,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
})
}

protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
JavaUtils.newConcurrentHashMap[String, NamedCounter]()

def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String])

def addCounter(name: String, labels: Map[String, String]): Unit = {
Expand Down Expand Up @@ -197,6 +203,18 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
namedTimers.values().asScala.toList.map(_._1)
}

def getAndClearTimerMetrics(): List[String] = {
timerMetrics.synchronized {
var timerMetricsSize = timerMetrics.size()
val timerMetricsList = ArrayBuffer[String]()
while (timerMetricsSize > 0) {
timerMetricsList.append(timerMetrics.poll())
timerMetricsSize = timerMetricsSize - 1
}
timerMetricsList.toList
}
}

def gaugeExists(name: String, labels: Map[String, String]): Boolean = {
namedGauges.containsKey(metricNameWithCustomizedLabels(name, labels))
}
Expand Down Expand Up @@ -282,7 +300,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
case Some(t) =>
namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS)
if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) {
recordTimer(namedTimer)
addTimerMetrics(namedTimer)
}
case None =>
}
Expand Down Expand Up @@ -347,31 +365,22 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES)
}

private def updateInnerMetrics(str: String): Unit = {
innerMetrics.synchronized {
if (innerMetrics.size() >= metricsCapacity) {
innerMetrics.remove()
}
innerMetrics.offer(str)
}
}

def recordCounter(nc: NamedCounter): Unit = {
def getCounterMetrics(nc: NamedCounter): String = {
val timestamp = System.currentTimeMillis
val label = nc.labelString
updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n")
val str = s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n"
str
}

def recordGauge(ng: NamedGauge[_]): Unit = {
def getGaugeMetrics(ng: NamedGauge[_]): String = {
val timestamp = System.currentTimeMillis
val sb = new StringBuilder
val label = ng.labelString
sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue} $timestamp\n")

updateInnerMetrics(sb.toString())
sb.toString()
}

def recordMeter(nm: NamedMeter): Unit = {
def getMeterMetrics(nm: NamedMeter): String = {
val timestamp = System.currentTimeMillis
val sb = new StringBuilder
val label = nm.labelString
Expand All @@ -383,11 +392,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
s"${normalizeKey(nm.name)}FiveMinuteRate$label ${nm.meter.getFiveMinuteRate} $timestamp\n")
sb.append(
s"${normalizeKey(nm.name)}FifteenMinuteRate$label ${nm.meter.getFifteenMinuteRate} $timestamp\n")

updateInnerMetrics(sb.toString())
sb.toString()
}

def recordHistogram(nh: NamedHistogram): Unit = {
def getHistogramMetrics(nh: NamedHistogram): String = {
val timestamp = System.currentTimeMillis
val sb = new mutable.StringBuilder
val snapshot = nh.histogram.getSnapshot
Expand All @@ -409,11 +417,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n")
sb.append(s"${prefix}999thPercentile$label" +
s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n")

updateInnerMetrics(sb.toString())
sb.toString()
}

def recordTimer(nt: NamedTimer): Unit = {
def getTimerMetrics(nt: NamedTimer): String = {
val timestamp = System.currentTimeMillis
val sb = new mutable.StringBuilder
val snapshot = nt.timer.getSnapshot
Expand All @@ -435,32 +442,61 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n")
sb.append(s"${prefix}999thPercentile$label" +
s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n")
sb.toString()
}

updateInnerMetrics(sb.toString())
def getAllMetricsNum: Int = {
val sum = timerMetrics.size() +
namedTimers.size() +
namedMeters.size() +
namedGauges.size() +
namedCounters.size()
sum
}

override def getMetrics(): String = {
innerMetrics.synchronized {
counters().foreach(c => recordCounter(c))
gauges().foreach(g => recordGauge(g))
meters().foreach(m => recordMeter(m))
histograms().foreach(h => {
recordHistogram(h)
var leftMetricsNum = metricsCapacity
val sb = new mutable.StringBuilder
leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb)
if (leftMetricsNum <= 0) {
logWarning(
s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum")
}
sb.toString()
}

private def fillInnerMetricsSnapshot(
metricList: List[AnyRef],
leftNum: Int,
sb: mutable.StringBuilder): Int = {
if (leftNum <= 0) {
return 0
}
val addList = metricList.take(leftNum)
addList.foreach {
case c: NamedCounter =>
sb.append(getCounterMetrics(c))
case g: NamedGauge[_] =>
sb.append(getGaugeMetrics(g))
case m: NamedMeter =>
sb.append(getMeterMetrics(m))
case h: NamedHistogram =>
sb.append(getHistogramMetrics(h))
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
})
timers().foreach(t => {
recordTimer(t)
case t: NamedTimer =>
sb.append(getTimerMetrics(t))
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
})
val sb = new mutable.StringBuilder
while (!innerMetrics.isEmpty) {
sb.append(innerMetrics.poll())
}
innerMetrics.clear()
sb.toString()
case s =>
sb.append(s.toString)
}
leftNum - addList.size
}

override def destroy(): Unit = {
Expand All @@ -469,7 +505,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
namedGauges.clear()
namedMeters.clear()
namedTimers.clear()
innerMetrics.clear()
timerMetrics.clear()
metricRegistry.removeMatching(new MetricFilter {
override def matches(s: String, metric: Metric): Boolean = true
})
Expand Down

0 comments on commit cc04d13

Please sign in to comment.