Skip to content

Commit

Permalink
Refactors the queue statistics, exports them as JSON.
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrozev committed Mar 28, 2019
1 parent f388010 commit 9080ccd
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 68 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
</properties>

<dependencies>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/jitsi/utils/PacketQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public PacketQueue(
queue = new ArrayBlockingQueue<>(capacity);

queueStatistics
= enableStatistics ? new QueueStatistics(id) : null;
= enableStatistics ? QueueStatistics.get(id) : null;

if (packetHandler != null)
{
Expand Down Expand Up @@ -283,7 +283,7 @@ private void doAdd(T pkt)
{
if (queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
queueStatistics.drop(System.currentTimeMillis());
}
final int numDroppedPackets =
this.numDroppedPackets.incrementAndGet();
Expand Down
159 changes: 93 additions & 66 deletions src/main/java/org/jitsi/utils/QueueStatistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,117 +15,144 @@
*/
package org.jitsi.utils;

import java.util.logging.*;
import java.util.logging.*;
import org.json.simple.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class QueueStatistics
{
/**
* The scale to use for {@link RateStatistics}. This makes their output in
* units (e.g. packets) per second.
* Gets the {@link QueueStatistics} instance for a given ID.
* @param id the ID.
*/
public static QueueStatistics get(String id)
{
return statisticsMap.computeIfAbsent(id, x -> new QueueStatistics());
}

/**
* Gets a snapshot of all queue statistics in JSON format.
* @return
*/
public static JSONObject getStatsJson()
{
JSONObject stats = new JSONObject();
statisticsMap.forEach((id, queueStatistics) ->
stats.put(id, queueStatistics.getInstanceStats()));

return stats;
}

/**
* Maps an ID to a {@link QueueStatistics} instance.
*/
private static Map<String, QueueStatistics> statisticsMap
= new ConcurrentHashMap<>();

/**
* The scale to use for {@link RateStatistics}. 1000 means units
* (e.g. packets) per second.
*/
private static final int SCALE = 1000;

/**
* The interval (in number of calls to {@link #add(long)} or
* {@link #remove(long)}) at which the gathered statistics will be printed.
* The interval for which to calculate rates in milliseconds.
*/
private static final int DEFAULT_PRINT_INTERVAL = 500;
private static final int INTERVAL_MS = 5000;

/**
* Calculate the average rate of addition of packets in a 200ms window.
* Rate of addition of packets in pps.
*/
private final RateStatistics addRateStatistics = new RateStatistics(200, SCALE);
private final RateStatistics addRate
= new RateStatistics(INTERVAL_MS, SCALE);

/**
* Calculate the average rate of removal of packets in a 200ms window.
* Rate of removal of packets in pps.
*/
private final RateStatistics removeRateStatistics = new RateStatistics(200, SCALE);
private final RateStatistics removeRate
= new RateStatistics(INTERVAL_MS, SCALE);

/**
* The {@link Logger} instance used for logging output.
* Rate of packets being dropped in pps.
*/
private final Logger logger;
private final RateStatistics dropRate
= new RateStatistics(INTERVAL_MS, SCALE);

private int head = 0;
private int size = 0;
private int[] sizes = new int[DEFAULT_PRINT_INTERVAL];
private long[] timestamps = new long[DEFAULT_PRINT_INTERVAL];
private long[] addRates = new long[DEFAULT_PRINT_INTERVAL];
private long[] removeRates = new long[DEFAULT_PRINT_INTERVAL];
private int[] totalPacketsAddedHistory = new int[DEFAULT_PRINT_INTERVAL];
private int totalPacketsAdded = 0;
private String logHeader;
/**
* Total packets added to the queue.
*/
private AtomicInteger totalPacketsAdded = new AtomicInteger();

/**
* Total packets removed to the queue.
*/
private AtomicInteger totalPacketsRemoved = new AtomicInteger();

/**
* Total packets dropped from the queue.
*/
private AtomicInteger totalPacketsDropped = new AtomicInteger();

/**
* Initializes a new {@link QueueStatistics} instance.
*
* @param id Identifier to distinguish the log output of multiple
* {@link QueueStatistics} instances.
*/
public QueueStatistics(String id)
public QueueStatistics()
{
}

/**
* Gets a snapshot of the stats in JSON format.
*/
public JSONObject getInstanceStats()
{
logger = Logger.getLogger("QueueStatistics-" + id);
logHeader = "QueueStatistics-" + id + ": ";
JSONObject stats = new JSONObject();
long now = System.currentTimeMillis();
stats.put("added", totalPacketsAdded.get());
stats.put("removed", totalPacketsRemoved.get());
stats.put("dropped", totalPacketsDropped.get());
stats.put("add_rate", addRate.getRate(now));
stats.put("remove_rate", removeRate.getRate(now));
stats.put("drop_rate", dropRate.getRate(now));

// We let the users of this class decide whether to enable logging (by
// creating a QueueStatistic instance) or not.
logger.setLevel(Level.ALL);
return stats;
}

/**
* Registers the addition of a packet to the corresponding queue.
* Registers the addition of a packet.
* @param now the time (in milliseconds since the epoch) at which the
* packet was added.
*/
public synchronized void add(long now)
public void add(long now)
{
addRateStatistics.update(1, now);
size++;
totalPacketsAdded++;
update(now);
addRate.update(1, now);
totalPacketsAdded.incrementAndGet();
}

/**
* Registers the removal of a packet from the corresponding queue.
* Registers the removal of a packet.
* @param now the time (in milliseconds since the epoch) at which the
* packet was removed.
*/
public synchronized void remove(long now)
public void remove(long now)
{
removeRateStatistics.update(1, now);
size--;
update(now);
removeRate.update(1, now);
totalPacketsRemoved.incrementAndGet();
}

private synchronized void update(long now)
/**
* Registers that a packet was dropped.
* @param now the time (in milliseconds since the epoch) at which the
* packet was dropped.
*/
public void drop(long now)
{
if (head == sizes.length)
{
print();
head = 0;
}

sizes[head] = size;
timestamps[head] = now;
addRates[head] = addRateStatistics.getRate(now);
removeRates[head] = removeRateStatistics.getRate(now);
totalPacketsAddedHistory[head] = totalPacketsAdded;
head++;
dropRate.update(1, now);
totalPacketsDropped.incrementAndGet();
}

private void print()
{
StringBuilder s = new StringBuilder();
for (int i =0; i<sizes.length; i++)
{
s.append(logHeader).
append(timestamps[i]).append(' ').
append(sizes[i]).append(' ').
append(addRates[i]).append(' ').
append(removeRates[i]).append(' ').
append(totalPacketsAddedHistory[i]).append('\n');
}
logger.fine(s.toString());
}
}

0 comments on commit 9080ccd

Please sign in to comment.