Skip to content

Commit 7b31d68

Browse files
committed
Added quantiles to summary metric (same algorithm as in client_golang).
1 parent ff3c8d7 commit 7b31d68

File tree

7 files changed

+563
-13
lines changed

7 files changed

+563
-13
lines changed

NOTICE

+3
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ Boxever Ltd. (http://www.boxever.com/).
66

77
This product includes software developed at
88
SoundCloud Ltd. (http://soundcloud.com/).
9+
10+
This product includes software developed as part of the
11+
Ocelli project by Netflix Inc. (https://github.com/Netflix/ocelli/).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package io.prometheus.client;
2+
3+
// Copied from https://raw.githubusercontent.com/Netflix/ocelli/master/ocelli-core/src/main/java/netflix/ocelli/stats/CKMSQuantiles.java
4+
// Revision d0357b8bf5c17a173ce94d6b26823775b3f999f6 from Jan 21, 2015.
5+
//
6+
// This is the original code except for the following modifications:
7+
//
8+
// - Changed the type of the observed values from int to double.
9+
// - Removed the Quantiles interface and corresponding @Override annotations.
10+
// - Changed the package name.
11+
// - Make get() return NaN when no sample was observed.
12+
// - Make class package private
13+
14+
/*
15+
Copyright 2012 Andrew Wang ([email protected])
16+
17+
Licensed under the Apache License, Version 2.0 (the "License");
18+
you may not use this file except in compliance with the License.
19+
You may obtain a copy of the License at
20+
21+
http://www.apache.org/licenses/LICENSE-2.0
22+
23+
Unless required by applicable law or agreed to in writing, software
24+
distributed under the License is distributed on an "AS IS" BASIS,
25+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26+
See the License for the specific language governing permissions and
27+
limitations under the License.
28+
*/
29+
30+
import java.util.Arrays;
31+
import java.util.LinkedList;
32+
import java.util.ListIterator;
33+
34+
/**
35+
* Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
36+
* for streaming calculation of targeted high-percentile epsilon-approximate
37+
* quantiles.
38+
*
39+
* This is a generalization of the earlier work by Greenwald and Khanna (GK),
40+
* which essentially allows different error bounds on the targeted quantiles,
41+
* which allows for far more efficient calculation of high-percentiles.
42+
*
43+
*
44+
* See: Cormode, Korn, Muthukrishnan, and Srivastava
45+
* "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
46+
*
47+
* Greenwald and Khanna,
48+
* "Space-efficient online computation of quantile summaries" in SIGMOD 2001
49+
*
50+
*/
51+
class CKMSQuantiles {
52+
/**
53+
* Total number of items in stream.
54+
*/
55+
private int count = 0;
56+
57+
/**
58+
* Used for tracking incremental compression.
59+
*/
60+
private int compressIdx = 0;
61+
62+
/**
63+
* Current list of sampled items, maintained in sorted order with error
64+
* bounds.
65+
*/
66+
protected LinkedList<Item> sample;
67+
68+
/**
69+
* Buffers incoming items to be inserted in batch.
70+
*/
71+
private double[] buffer = new double[500];
72+
73+
private int bufferCount = 0;
74+
75+
/**
76+
* Array of Quantiles that we care about, along with desired error.
77+
*/
78+
private final Quantile quantiles[];
79+
80+
public CKMSQuantiles(Quantile[] quantiles) {
81+
this.quantiles = quantiles;
82+
this.sample = new LinkedList<Item>();
83+
}
84+
85+
/**
86+
* Add a new value from the stream.
87+
*
88+
* @param value
89+
*/
90+
public synchronized void insert(double value) {
91+
buffer[bufferCount] = value;
92+
bufferCount++;
93+
94+
if (bufferCount == buffer.length) {
95+
insertBatch();
96+
compress();
97+
}
98+
}
99+
100+
/**
101+
* Get the estimated value at the specified quantile.
102+
*
103+
* @param q
104+
* Queried quantile, e.g. 0.50 or 0.99.
105+
* @return Estimated value at that quantile.
106+
*/
107+
public synchronized double get(double q) {
108+
// clear the buffer
109+
insertBatch();
110+
compress();
111+
112+
if (sample.size() == 0) {
113+
return Double.NaN;
114+
}
115+
116+
int rankMin = 0;
117+
int desired = (int) (q * count);
118+
119+
ListIterator<Item> it = sample.listIterator();
120+
Item prev, cur;
121+
cur = it.next();
122+
while (it.hasNext()) {
123+
prev = cur;
124+
cur = it.next();
125+
126+
rankMin += prev.g;
127+
128+
if (rankMin + cur.g + cur.delta > desired
129+
+ (allowableError(desired) / 2)) {
130+
return prev.value;
131+
}
132+
}
133+
134+
// edge case of wanting max value
135+
return sample.getLast().value;
136+
}
137+
138+
/**
139+
* Specifies the allowable error for this rank, depending on which quantiles
140+
* are being targeted.
141+
*
142+
* This is the f(r_i, n) function from the CKMS paper. It's basically how
143+
* wide the range of this rank can be.
144+
*
145+
* @param rank
146+
* the index in the list of samples
147+
*/
148+
private double allowableError(int rank) {
149+
// NOTE: according to CKMS, this should be count, not size, but this
150+
// leads
151+
// to error larger than the error bounds. Leaving it like this is
152+
// essentially a HACK, and blows up memory, but does "work".
153+
// int size = count;
154+
int size = sample.size();
155+
double minError = size + 1;
156+
157+
for (Quantile q : quantiles) {
158+
double error;
159+
if (rank <= q.quantile * size) {
160+
error = q.u * (size - rank);
161+
} else {
162+
error = q.v * rank;
163+
}
164+
if (error < minError) {
165+
minError = error;
166+
}
167+
}
168+
169+
return minError;
170+
}
171+
172+
private boolean insertBatch() {
173+
if (bufferCount == 0) {
174+
return false;
175+
}
176+
177+
Arrays.sort(buffer, 0, bufferCount);
178+
179+
// Base case: no samples
180+
int start = 0;
181+
if (sample.size() == 0) {
182+
Item newItem = new Item(buffer[0], 1, 0);
183+
sample.add(newItem);
184+
start++;
185+
count++;
186+
}
187+
188+
ListIterator<Item> it = sample.listIterator();
189+
Item item = it.next();
190+
191+
for (int i = start; i < bufferCount; i++) {
192+
double v = buffer[i];
193+
while (it.nextIndex() < sample.size() && item.value < v) {
194+
item = it.next();
195+
}
196+
197+
// If we found that bigger item, back up so we insert ourselves
198+
// before it
199+
if (item.value > v) {
200+
it.previous();
201+
}
202+
203+
// We use different indexes for the edge comparisons, because of the
204+
// above
205+
// if statement that adjusts the iterator
206+
int delta;
207+
if (it.previousIndex() == 0 || it.nextIndex() == sample.size()) {
208+
delta = 0;
209+
}
210+
else {
211+
delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
212+
}
213+
214+
Item newItem = new Item(v, 1, delta);
215+
it.add(newItem);
216+
count++;
217+
item = newItem;
218+
}
219+
220+
bufferCount = 0;
221+
return true;
222+
}
223+
224+
/**
225+
* Try to remove extraneous items from the set of sampled items. This checks
226+
* if an item is unnecessary based on the desired error bounds, and merges
227+
* it with the adjacent item if it is.
228+
*/
229+
private void compress() {
230+
if (sample.size() < 2) {
231+
return;
232+
}
233+
234+
ListIterator<Item> it = sample.listIterator();
235+
int removed = 0;
236+
237+
Item prev = null;
238+
Item next = it.next();
239+
240+
while (it.hasNext()) {
241+
prev = next;
242+
next = it.next();
243+
244+
if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
245+
next.g += prev.g;
246+
// Remove prev. it.remove() kills the last thing returned.
247+
it.previous();
248+
it.previous();
249+
it.remove();
250+
// it.next() is now equal to next, skip it back forward again
251+
it.next();
252+
removed++;
253+
}
254+
}
255+
}
256+
257+
private class Item {
258+
public final double value;
259+
public int g;
260+
public final int delta;
261+
262+
public Item(double value, int lower_delta, int delta) {
263+
this.value = value;
264+
this.g = lower_delta;
265+
this.delta = delta;
266+
}
267+
268+
@Override
269+
public String toString() {
270+
return String.format("%d, %d, %d", value, g, delta);
271+
}
272+
}
273+
274+
public static class Quantile {
275+
public final double quantile;
276+
public final double error;
277+
public final double u;
278+
public final double v;
279+
280+
public Quantile(double quantile, double error) {
281+
this.quantile = quantile;
282+
this.error = error;
283+
u = 2.0 * error / (1.0 - quantile);
284+
v = 2.0 * error / quantile;
285+
}
286+
287+
@Override
288+
public String toString() {
289+
return String.format("Q{q=%.3f, eps=%.3f})", quantile, error);
290+
}
291+
}
292+
293+
}

simpleclient/src/main/java/io/prometheus/client/Collector.java

+3
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@ public static String doubleToGoString(double d) {
186186
if (d == Double.NEGATIVE_INFINITY) {
187187
return "-Inf";
188188
}
189+
if (Double.isNaN(d)) {
190+
return "NaN";
191+
}
189192
return Double.toString(d);
190193
}
191194
}

0 commit comments

Comments
 (0)