Skip to content

Commit 0d680ce

Browse files
author
Yaroslav Derman
committed
Merge branch 'master' into master-play-route-name-generator
2 parents b774d8c + d858ad1 commit 0d680ce

File tree

28 files changed

+302
-160
lines changed

28 files changed

+302
-160
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ jobs:
1111
- run: git fetch --depth=1 origin '+refs/tags/*:refs/tags/*'
1212
- uses: olafurpg/setup-scala@v10
1313
with:
14-
java-version: [email protected]242
14+
java-version: [email protected]252
1515
- name: Test
1616
run: csbt +test

build.sbt

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,12 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka"))
392392
)
393393

394394

395+
def akkaHttpVersion(scalaVersion: String) = scalaVersion match {
396+
case "2.11" => "10.1.12"
397+
case _ => "10.2.3"
398+
}
399+
400+
395401
lazy val `kamon-akka-http` = (project in file("instrumentation/kamon-akka-http"))
396402
.enablePlugins(JavaAgent)
397403
.disablePlugins(AssemblyPlugin)
@@ -401,18 +407,18 @@ lazy val `kamon-akka-http` = (project in file("instrumentation/kamon-akka-http")
401407
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test",
402408
libraryDependencies ++= Seq(
403409
kanelaAgent % "provided",
404-
"com.typesafe.akka" %% "akka-http" % "10.1.12" % "provided",
405-
"com.typesafe.akka" %% "akka-http2-support" % "10.1.12" % "provided",
406-
"com.typesafe.akka" %% "akka-stream" % "2.5.31" % "provided",
410+
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion(scalaBinaryVersion.value) % "provided",
411+
"com.typesafe.akka" %% "akka-http2-support" % akkaHttpVersion(scalaBinaryVersion.value) % "provided",
412+
"com.typesafe.akka" %% "akka-stream" % "2.5.32" % "provided",
407413

408414
scalatest % "test",
409415
slf4jApi % "test",
410416
slf4jnop % "test",
411417
okHttp % "test",
412-
"com.typesafe.akka" %% "akka-http-testkit" % "10.1.12" % "test",
418+
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion(scalaBinaryVersion.value) % "test",
413419
"de.heikoseeberger" %% "akka-http-json4s" % "1.27.0" % "test",
414420
"org.json4s" %% "json4s-native" % "3.6.7" % "test",
415-
)
421+
),
416422
)).dependsOn(`kamon-akka`, `kamon-testkit` % "test")
417423

418424

core/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,5 @@ object KamonWithTemporaryReporter extends App {
6161
Kamon.registerModule("dummy span reporter", new DummySpanReporter())
6262

6363
Thread.sleep(5000)
64-
Kamon.stopModules()
64+
Kamon.stop()
6565
}

core/kamon-core/src/main/scala/kamon/Init.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import com.typesafe.config.Config
2020
import kamon.status.InstrumentationStatus
2121
import org.slf4j.LoggerFactory
2222

23+
import scala.concurrent.Future
24+
2325
/**
2426
* Provides APIs for handling common initialization tasks like starting modules, attaching instrumentation and
2527
* reconfiguring Kamon.
2628
*/
27-
trait Init { self: ModuleLoading with Configuration with CurrentStatus =>
29+
trait Init { self: ModuleLoading with Configuration with CurrentStatus with Metrics with Tracing =>
2830
private val _logger = LoggerFactory.getLogger(classOf[Init])
2931

3032
/**
@@ -44,6 +46,12 @@ trait Init { self: ModuleLoading with Configuration with CurrentStatus =>
4446
self.reconfigure(config)
4547
self.loadModules()
4648
}
49+
50+
def stop(): Future[Unit] = {
51+
self.clearRegistry()
52+
self.stopTracer()
53+
self.stopModules()
54+
}
4755

4856
/**
4957
* Tries to attach the Kanela instrumentation agent, if the Kamon Bundle dependency is available on the classpath. If

core/kamon-core/src/main/scala/kamon/Metrics.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ trait Metrics extends MetricBuilding { self: Configuration with Utilities =>
3232
protected def registry(): MetricRegistry =
3333
_metricRegistry
3434

35+
protected def clearRegistry(): Unit = {
36+
_metricRegistry.clear()
37+
}
3538
}

core/kamon-core/src/main/scala/kamon/Tracing.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,7 @@ trait Tracing { self: Configuration with Utilities with ContextStorage =>
8989
protected def tracer(): Tracer =
9090
_tracer
9191

92+
def stopTracer(): Unit = {
93+
_tracer.stop()
94+
}
9295
}

core/kamon-core/src/main/scala/kamon/Utilities.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.collection.concurrent.TrieMap
2828
*/
2929
trait Utilities { self: Configuration =>
3030
private val _clock = new Clock.Anchored()
31-
private val _scheduler = Executors.newScheduledThreadPool(1, numberedThreadFactory("kamon-scheduler", daemon = true))
31+
private val _scheduler = newScheduledThreadPool(1, numberedThreadFactory("kamon-scheduler", daemon = true))
3232
private val _filters = TrieMap.empty[String, Filter]
3333

3434
reconfigureUtilities(self.config())

core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import kamon.util.Clock
2727
import org.slf4j.LoggerFactory
2828

2929
import scala.collection.concurrent.TrieMap
30-
import scala.reflect.ClassTag
3130

3231
/**
3332
* Handles creation and snapshotting of metrics. If a metric is created twice, the very same instance will be returned.
@@ -204,4 +203,8 @@ class MetricRegistry(config: Config, scheduler: ScheduledExecutorService, clock:
204203
/** Returns the current status of all metrics contained in the registry */
205204
def status(): Status.MetricRegistry =
206205
Status.MetricRegistry(_metrics.values.map(_.status()).toSeq)
206+
207+
def clear(): Unit = {
208+
_metrics.values.foreach { metric => metric.status().instruments.foreach(i => metric.remove(i.tags)) }
209+
}
207210
}

core/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ package kamon
1818
package module
1919

2020
import java.time.{Duration, Instant}
21-
import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
21+
import java.util.concurrent.{CountDownLatch, Executors, ForkJoinPool, ScheduledFuture, TimeUnit}
2222
import java.util.concurrent.atomic.AtomicReference
23-
2423
import com.typesafe.config.Config
2524
import kamon.module.Module.Registration
2625
import kamon.status.Status
@@ -39,8 +38,9 @@ import scala.util.control.NonFatal
3938
class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry: MetricRegistry, tracer: Tracer) {
4039

4140
private val _logger = LoggerFactory.getLogger(classOf[ModuleRegistry])
42-
private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
43-
private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))
41+
private val _moduleRegistryEC: ExecutionContext = ExecutionContext.fromExecutor(newScheduledThreadPool(1, threadFactory("kamon-module-registry", daemon = true)))
42+
private val _metricsTickerExecutor = newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
43+
private val _spansTickerExecutor = newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))
4444

4545
private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
4646
private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
@@ -127,7 +127,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
127127
* spans available until the call to stop.
128128
*/
129129
def stopModules(): Future[Unit] = synchronized {
130-
implicit val cleanupExecutor = ExecutionContext.Implicits.global
130+
implicit val cleanupExecutor = _moduleRegistryEC
131131
stopReporterTickers()
132132

133133
var stoppedSignals: List[Future[Unit]] = Nil
@@ -137,11 +137,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
137137
true
138138
}
139139

140-
val latch = new CountDownLatch(stoppedSignals.size)
141-
stoppedSignals.foreach(f => f.onComplete(_ => latch.countDown()))
142-
143-
// TODO: Completely destroy modules that fail to stop within the 30 second timeout.
144-
Future(latch.await(30, TimeUnit.SECONDS))
140+
Future.sequence(stoppedSignals).map(_ => ())
145141
}
146142

147143

@@ -369,7 +365,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
369365
* context. The returned future completes when the module finishes its stop procedure.
370366
*/
371367
private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
372-
val cleanupExecutor = ExecutionContext.Implicits.global
368+
val cleanupExecutor = _moduleRegistryEC
373369

374370
// Remove the module from all registries
375371
_registeredModules = _registeredModules - entry.name

core/kamon-core/src/main/scala/kamon/package.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,25 @@
1515
*/
1616

1717
import java.util.concurrent.atomic.AtomicLong
18-
import java.util.concurrent.{Executors, ThreadFactory}
19-
18+
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadFactory, TimeUnit}
2019
import com.typesafe.config.{Config, ConfigUtil}
2120

2221
import scala.collection.concurrent.TrieMap
2322

2423
package object kamon {
2524

25+
def newScheduledThreadPool(corePoolSize: Int, threadFactory: ThreadFactory): ScheduledExecutorService = {
26+
val tp = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory)
27+
val timeout = 5000
28+
// Call in this order or it throws!
29+
tp.setKeepAliveTime(timeout, TimeUnit.MILLISECONDS)
30+
tp.allowCoreThreadTimeOut(true)
31+
32+
tp.setRemoveOnCancelPolicy(true)
33+
34+
tp
35+
}
36+
2637
/**
2738
* Creates a thread factory that assigns the specified name to all created Threads.
2839
*/

0 commit comments

Comments
 (0)