26
26
import java .net .InetSocketAddress ;
27
27
import java .util .Enumeration ;
28
28
import java .util .Objects ;
29
+ import java .util .Optional ;
29
30
import java .util .Properties ;
31
+ import java .util .concurrent .BlockingQueue ;
30
32
import java .util .concurrent .ConcurrentHashMap ;
31
33
import java .util .concurrent .ConcurrentMap ;
34
+ import java .util .concurrent .ExecutorService ;
35
+ import java .util .concurrent .LinkedBlockingQueue ;
36
+ import java .util .concurrent .RejectedExecutionException ;
37
+ import java .util .concurrent .ThreadFactory ;
38
+ import java .util .concurrent .ThreadPoolExecutor ;
39
+ import java .util .concurrent .TimeUnit ;
40
+ import java .util .concurrent .atomic .AtomicInteger ;
32
41
import java .util .function .BiConsumer ;
33
42
import javax .servlet .ServletException ;
34
43
import javax .servlet .http .HttpServletRequest ;
40
49
import org .apache .zookeeper .metrics .MetricsProviderLifeCycleException ;
41
50
import org .apache .zookeeper .metrics .Summary ;
42
51
import org .apache .zookeeper .metrics .SummarySet ;
52
+ import org .apache .zookeeper .server .RateLogger ;
43
53
import org .eclipse .jetty .server .Server ;
44
54
import org .eclipse .jetty .servlet .ServletContextHandler ;
45
55
import org .eclipse .jetty .servlet .ServletHolder ;
@@ -56,6 +66,26 @@ public class PrometheusMetricsProvider implements MetricsProvider {
56
66
private static final Logger LOG = LoggerFactory .getLogger (PrometheusMetricsProvider .class );
57
67
private static final String LABEL = "key" ;
58
68
private static final String [] LABELS = {LABEL };
69
+
70
+ /**
71
+ * Number of worker threads for reporting Prometheus summary metrics.
72
+ * Default value is 1.
73
+ * If the number is less than 1, the main thread will be used.
74
+ */
75
+ static final String NUM_WORKER_THREADS = "numWorkerThreads" ;
76
+
77
+ /**
78
+ * The max queue size for Prometheus summary metrics reporting task.
79
+ * Default value is 1000000.
80
+ */
81
+ static final String MAX_QUEUE_SIZE = "maxQueueSize" ;
82
+
83
+ /**
84
+ * The timeout in ms for Prometheus worker threads shutdown.
85
+ * Default value is 1000ms.
86
+ */
87
+ static final String WORKER_SHUTDOWN_TIMEOUT_MS = "workerShutdownTimeoutMs" ;
88
+
59
89
/**
60
90
* We are using the 'defaultRegistry'.
61
91
* <p>
@@ -64,23 +94,35 @@ public class PrometheusMetricsProvider implements MetricsProvider {
64
94
* </p>
65
95
*/
66
96
private final CollectorRegistry collectorRegistry = CollectorRegistry .defaultRegistry ;
97
+ private final RateLogger rateLogger = new RateLogger (LOG , 60 * 1000 );
67
98
private String host = "0.0.0.0" ;
68
99
private int port = 7000 ;
69
100
private boolean exportJvmInfo = true ;
70
101
private Server server ;
71
102
private final MetricsServletImpl servlet = new MetricsServletImpl ();
72
103
private final Context rootContext = new Context ();
104
+ private int numWorkerThreads = 1 ;
105
+ private int maxQueueSize = 1000000 ;
106
+ private long workerShutdownTimeoutMs = 1000 ;
107
+ private Optional <ExecutorService > executorOptional = Optional .empty ();
73
108
74
109
@ Override
75
110
public void configure (Properties configuration ) throws MetricsProviderLifeCycleException {
76
111
LOG .info ("Initializing metrics, configuration: {}" , configuration );
77
112
this .host = configuration .getProperty ("httpHost" , "0.0.0.0" );
78
113
this .port = Integer .parseInt (configuration .getProperty ("httpPort" , "7000" ));
79
114
this .exportJvmInfo = Boolean .parseBoolean (configuration .getProperty ("exportJvmInfo" , "true" ));
115
+ this .numWorkerThreads = Integer .parseInt (
116
+ configuration .getProperty (NUM_WORKER_THREADS , "1" ));
117
+ this .maxQueueSize = Integer .parseInt (
118
+ configuration .getProperty (MAX_QUEUE_SIZE , "1000000" ));
119
+ this .workerShutdownTimeoutMs = Long .parseLong (
120
+ configuration .getProperty (WORKER_SHUTDOWN_TIMEOUT_MS , "1000" ));
80
121
}
81
122
82
123
@ Override
83
124
public void start () throws MetricsProviderLifeCycleException {
125
+ this .executorOptional = createExecutor ();
84
126
try {
85
127
LOG .info ("Starting /metrics HTTP endpoint at host: {}, port: {}, exportJvmInfo: {}" ,
86
128
host , port , exportJvmInfo );
@@ -120,6 +162,7 @@ public MetricsContext getRootContext() {
120
162
121
163
@ Override
122
164
public void stop () {
165
+ shutdownExecutor ();
123
166
if (server != null ) {
124
167
try {
125
168
server .stop ();
@@ -331,7 +374,7 @@ private void unregister() {
331
374
332
375
}
333
376
334
- private class PrometheusSummary implements Summary {
377
+ class PrometheusSummary implements Summary {
335
378
336
379
private final io .prometheus .client .Summary inner ;
337
380
private final String name ;
@@ -355,16 +398,19 @@ public PrometheusSummary(String name, MetricsContext.DetailLevel level) {
355
398
356
399
@ Override
357
400
public void add (long delta ) {
401
+ reportMetrics (() -> observe (delta ));
402
+ }
403
+
404
+ void observe (final long delta ) {
358
405
try {
359
406
inner .observe (delta );
360
- } catch (IllegalArgumentException err ) {
407
+ } catch (final IllegalArgumentException err ) {
361
408
LOG .error ("invalid delta {} for metric {}" , delta , name , err );
362
409
}
363
410
}
364
-
365
411
}
366
412
367
- private class PrometheusLabelledSummary implements SummarySet {
413
+ class PrometheusLabelledSummary implements SummarySet {
368
414
369
415
private final io .prometheus .client .Summary inner ;
370
416
private final String name ;
@@ -390,9 +436,13 @@ public PrometheusLabelledSummary(String name, MetricsContext.DetailLevel level)
390
436
391
437
@ Override
392
438
public void add (String key , long value ) {
439
+ reportMetrics (() -> observe (key , value ));
440
+ }
441
+
442
+ void observe (final String key , final long value ) {
393
443
try {
394
444
inner .labels (key ).observe (value );
395
- } catch (IllegalArgumentException err ) {
445
+ } catch (final IllegalArgumentException err ) {
396
446
LOG .error ("invalid value {} for metric {} with key {}" , value , name , key , err );
397
447
}
398
448
}
@@ -410,4 +460,65 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se
410
460
super .doGet (req , resp );
411
461
}
412
462
}
463
+
464
+ private Optional <ExecutorService > createExecutor () {
465
+ if (numWorkerThreads < 1 ) {
466
+ LOG .info ("Executor service was not created as numWorkerThreads {} is less than 1" , numWorkerThreads );
467
+ return Optional .empty ();
468
+ }
469
+
470
+ final BlockingQueue <Runnable > queue = new LinkedBlockingQueue <>(maxQueueSize );
471
+ final ThreadPoolExecutor executor = new ThreadPoolExecutor (numWorkerThreads ,
472
+ numWorkerThreads ,
473
+ 0L ,
474
+ TimeUnit .MILLISECONDS ,
475
+ queue , new PrometheusWorkerThreadFactory ());
476
+ LOG .info ("Executor service was created with numWorkerThreads {} and maxQueueSize {}" ,
477
+ numWorkerThreads ,
478
+ maxQueueSize );
479
+ return Optional .of (executor );
480
+ }
481
+
482
+ private void shutdownExecutor () {
483
+ if (executorOptional .isPresent ()) {
484
+ LOG .info ("Shutdown executor service with timeout {}" , workerShutdownTimeoutMs );
485
+ final ExecutorService executor = executorOptional .get ();
486
+ executor .shutdown ();
487
+ try {
488
+ if (!executor .awaitTermination (workerShutdownTimeoutMs , TimeUnit .MILLISECONDS )) {
489
+ LOG .error ("Not all the Prometheus worker threads terminated properly after {} timeout" ,
490
+ workerShutdownTimeoutMs );
491
+ executor .shutdownNow ();
492
+ }
493
+ } catch (final Exception e ) {
494
+ LOG .error ("Error occurred while terminating Prometheus worker threads" , e );
495
+ executor .shutdownNow ();
496
+ }
497
+ }
498
+ }
499
+
500
+ private static class PrometheusWorkerThreadFactory implements ThreadFactory {
501
+ private static final AtomicInteger workerCounter = new AtomicInteger (1 );
502
+
503
+ @ Override
504
+ public Thread newThread (final Runnable runnable ) {
505
+ final String threadName = "PrometheusMetricsProviderWorker-" + workerCounter .getAndIncrement ();
506
+ final Thread thread = new Thread (runnable , threadName );
507
+ thread .setDaemon (true );
508
+ return thread ;
509
+ }
510
+ }
511
+
512
+ private void reportMetrics (final Runnable task ) {
513
+ if (executorOptional .isPresent ()) {
514
+ try {
515
+ executorOptional .get ().submit (task );
516
+ } catch (final RejectedExecutionException e ) {
517
+ rateLogger .rateLimitLog ("Prometheus metrics reporting task queue size exceeded the max" ,
518
+ String .valueOf (maxQueueSize ));
519
+ }
520
+ } else {
521
+ task .run ();
522
+ }
523
+ }
413
524
}
0 commit comments