diff --git a/pom.xml b/pom.xml index 95866ded..13eb4936 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,12 @@ + + com.googlecode.json-simple + json-simple + 1.1.1 + + junit junit diff --git a/src/main/java/org/jitsi/utils/PacketQueue.java b/src/main/java/org/jitsi/utils/PacketQueue.java index e4ad55b9..7a04b6d5 100644 --- a/src/main/java/org/jitsi/utils/PacketQueue.java +++ b/src/main/java/org/jitsi/utils/PacketQueue.java @@ -185,7 +185,7 @@ public PacketQueue( queue = new ArrayBlockingQueue<>(capacity); queueStatistics - = enableStatistics ? new QueueStatistics(id) : null; + = enableStatistics ? QueueStatistics.get(id) : null; if (packetHandler != null) { @@ -283,7 +283,7 @@ private void doAdd(T pkt) { if (queueStatistics != null) { - queueStatistics.remove(System.currentTimeMillis()); + queueStatistics.drop(System.currentTimeMillis()); } final int numDroppedPackets = this.numDroppedPackets.incrementAndGet(); diff --git a/src/main/java/org/jitsi/utils/QueueStatistics.java b/src/main/java/org/jitsi/utils/QueueStatistics.java index 440ccb2d..368872fb 100644 --- a/src/main/java/org/jitsi/utils/QueueStatistics.java +++ b/src/main/java/org/jitsi/utils/QueueStatistics.java @@ -15,47 +15,85 @@ */ package org.jitsi.utils; -import java.util.logging.*; -import java.util.logging.*; +import org.json.simple.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; public class QueueStatistics { /** - * The scale to use for {@link RateStatistics}. This makes their output in - * units (e.g. packets) per second. + * Gets the {@link QueueStatistics} instance for a given ID. + * @param id the ID. + */ + public static QueueStatistics get(String id) + { + return statisticsMap.computeIfAbsent(id, x -> new QueueStatistics()); + } + + /** + * Gets a snapshot of all queue statistics in JSON format. + * @return + */ + public static JSONObject getStatsJson() + { + JSONObject stats = new JSONObject(); + statisticsMap.forEach((id, queueStatistics) -> + stats.put(id, queueStatistics.getInstanceStats())); + + return stats; + } + + /** + * Maps an ID to a {@link QueueStatistics} instance. + */ + private static Map statisticsMap + = new ConcurrentHashMap<>(); + + /** + * The scale to use for {@link RateStatistics}. 1000 means units + * (e.g. packets) per second. */ private static final int SCALE = 1000; /** - * The interval (in number of calls to {@link #add(long)} or - * {@link #remove(long)}) at which the gathered statistics will be printed. + * The interval for which to calculate rates in milliseconds. */ - private static final int DEFAULT_PRINT_INTERVAL = 500; + private static final int INTERVAL_MS = 5000; /** - * Calculate the average rate of addition of packets in a 200ms window. + * Rate of addition of packets in pps. */ - private final RateStatistics addRateStatistics = new RateStatistics(200, SCALE); + private final RateStatistics addRate + = new RateStatistics(INTERVAL_MS, SCALE); /** - * Calculate the average rate of removal of packets in a 200ms window. + * Rate of removal of packets in pps. */ - private final RateStatistics removeRateStatistics = new RateStatistics(200, SCALE); + private final RateStatistics removeRate + = new RateStatistics(INTERVAL_MS, SCALE); /** - * The {@link Logger} instance used for logging output. + * Rate of packets being dropped in pps. */ - private final Logger logger; + private final RateStatistics dropRate + = new RateStatistics(INTERVAL_MS, SCALE); - private int head = 0; - private int size = 0; - private int[] sizes = new int[DEFAULT_PRINT_INTERVAL]; - private long[] timestamps = new long[DEFAULT_PRINT_INTERVAL]; - private long[] addRates = new long[DEFAULT_PRINT_INTERVAL]; - private long[] removeRates = new long[DEFAULT_PRINT_INTERVAL]; - private int[] totalPacketsAddedHistory = new int[DEFAULT_PRINT_INTERVAL]; - private int totalPacketsAdded = 0; - private String logHeader; + /** + * Total packets added to the queue. + */ + private AtomicInteger totalPacketsAdded = new AtomicInteger(); + + /** + * Total packets removed to the queue. + */ + private AtomicInteger totalPacketsRemoved = new AtomicInteger(); + + /** + * Total packets dropped from the queue. + */ + private AtomicInteger totalPacketsDropped = new AtomicInteger(); /** * Initializes a new {@link QueueStatistics} instance. @@ -63,69 +101,58 @@ public class QueueStatistics * @param id Identifier to distinguish the log output of multiple * {@link QueueStatistics} instances. */ - public QueueStatistics(String id) + public QueueStatistics() + { + } + + /** + * Gets a snapshot of the stats in JSON format. + */ + public JSONObject getInstanceStats() { - logger = Logger.getLogger("QueueStatistics-" + id); - logHeader = "QueueStatistics-" + id + ": "; + JSONObject stats = new JSONObject(); + long now = System.currentTimeMillis(); + stats.put("added", totalPacketsAdded.get()); + stats.put("removed", totalPacketsRemoved.get()); + stats.put("dropped", totalPacketsDropped.get()); + stats.put("add_rate", addRate.getRate(now)); + stats.put("remove_rate", removeRate.getRate(now)); + stats.put("drop_rate", dropRate.getRate(now)); - // We let the users of this class decide whether to enable logging (by - // creating a QueueStatistic instance) or not. - logger.setLevel(Level.ALL); + return stats; } /** - * Registers the addition of a packet to the corresponding queue. + * Registers the addition of a packet. * @param now the time (in milliseconds since the epoch) at which the * packet was added. */ - public synchronized void add(long now) + public void add(long now) { - addRateStatistics.update(1, now); - size++; - totalPacketsAdded++; - update(now); + addRate.update(1, now); + totalPacketsAdded.incrementAndGet(); } /** - * Registers the removal of a packet from the corresponding queue. + * Registers the removal of a packet. * @param now the time (in milliseconds since the epoch) at which the * packet was removed. */ - public synchronized void remove(long now) + public void remove(long now) { - removeRateStatistics.update(1, now); - size--; - update(now); + removeRate.update(1, now); + totalPacketsRemoved.incrementAndGet(); } - private synchronized void update(long now) + /** + * Registers that a packet was dropped. + * @param now the time (in milliseconds since the epoch) at which the + * packet was dropped. + */ + public void drop(long now) { - if (head == sizes.length) - { - print(); - head = 0; - } - - sizes[head] = size; - timestamps[head] = now; - addRates[head] = addRateStatistics.getRate(now); - removeRates[head] = removeRateStatistics.getRate(now); - totalPacketsAddedHistory[head] = totalPacketsAdded; - head++; + dropRate.update(1, now); + totalPacketsDropped.incrementAndGet(); } - private void print() - { - StringBuilder s = new StringBuilder(); - for (int i =0; i