From bda71d55c14e851380ffa4aff49d43a8b33287be Mon Sep 17 00:00:00 2001 From: Boris Grozev Date: Fri, 31 May 2019 14:51:38 +0100 Subject: [PATCH] Keeps the queue stats (if enabled) per-instance. Removes the global id-based stats. Tracks the dropped packets and exceptions separately. --- pom.xml | 5 + .../java/org/jitsi/utils/PacketQueue.java | 155 ++++++++++++------ .../jitsi/utils/stats/QueueStatistics.java | 32 +--- 3 files changed, 113 insertions(+), 79 deletions(-) diff --git a/pom.xml b/pom.xml index d578882c..4c84820b 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,11 @@ json-simple 1.1.1 + + org.jetbrains + annotations + 15.0 + junit diff --git a/src/main/java/org/jitsi/utils/PacketQueue.java b/src/main/java/org/jitsi/utils/PacketQueue.java index 24bd8d74..f2e5af83 100644 --- a/src/main/java/org/jitsi/utils/PacketQueue.java +++ b/src/main/java/org/jitsi/utils/PacketQueue.java @@ -18,19 +18,13 @@ import org.jitsi.utils.logging.*; import org.jitsi.utils.stats.*; import org.json.simple.*; +import org.jetbrains.annotations.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** - * An abstract queue of packets. This is meant to eventually be able to be used - * in the following classes (in ice4j and libjitsi) in place of their ad-hoc - * queue implementations (which is the reason the class is parameterized): - *
RTPConnectorOutputStream.Queue - *
PushSourceStreamImpl#readQ - *
OutputDataStreamImpl#writeQ - *
SinglePortHarvester.MySocket#queue - *
MultiplexingSocket#received (and the rest of Multiplex* classes). + * An abstract queue of packets. * * @author Boris Grozev * @author Yura Yaroshevich @@ -65,20 +59,6 @@ public static void setEnableStatisticsDefault(boolean enable) enableStatisticsDefault = enable; } - /** - * Returns true if a warning should be logged after a queue has dropped - * {@code numDroppedPackets} packets. - * @param numDroppedPackets the number of dropped packets. - * @return {@code true} if a warning should be logged. - */ - public static boolean logDroppedPacket(int numDroppedPackets) - { - return - numDroppedPackets == 1 || - (numDroppedPackets <= 1000 && numDroppedPackets % 100 == 0) || - numDroppedPackets % 1000 == 0; - } - /** * The underlying {@link BlockingQueue} which holds packets. * Used as synchronization object between {@link #close()}, {@link #get()} @@ -124,10 +104,10 @@ public static boolean logDroppedPacket(int numDroppedPackets) private final int capacity; /** - * The number of packets which were dropped from this {@link PacketQueue} as - * a result of a packet being added while the queue is at full capacity. + * Handles dropped packets and exceptions thrown while processing. */ - private final AtomicInteger numDroppedPackets = new AtomicInteger(); + @NotNull + private ErrorHandler errorHandler = new ErrorHandler(){}; /** * Initializes a new {@link PacketQueue} instance. @@ -215,11 +195,11 @@ public PacketQueue( enableStatistics = enableStatisticsDefault; } queueStatistics - = enableStatistics ? QueueStatistics.get(id) : null; + = enableStatistics ? new QueueStatistics() : null; if (packetHandler != null) { - asyncQueueHandler = new AsyncQueueHandler( + asyncQueueHandler = new AsyncQueueHandler<>( queue, new HandlerAdapter(packetHandler), id, @@ -315,13 +295,7 @@ private void doAdd(T pkt) { queueStatistics.drop(System.currentTimeMillis()); } - final int numDroppedPackets = - this.numDroppedPackets.incrementAndGet(); - if (logDroppedPacket(numDroppedPackets)) - { - logger.warn( - "Packets dropped (id=" + id + "): " + numDroppedPackets); - } + errorHandler.packetDropped(); // Call release on dropped packet to allow proper implementation // of object pooling by PacketQueue users @@ -500,6 +474,35 @@ protected void releasePacket(T pkt) { } + /** + * Gets a JSON representation of the parts of this object's state that + * are deemed useful for debugging. + */ + public JSONObject getDebugState() + { + JSONObject debugState = new JSONObject(); + debugState.put("id", id); + debugState.put("capacity", capacity); + debugState.put("copy", copy); + debugState.put("closed", closed); + debugState.put( + "statistics", + queueStatistics == null + ? null : queueStatistics.getStats()); + + return debugState; + } + + /** + * Sets the handler of errors (packets dropped or exceptions caught while + * processing). + * @param errorHandler the handler to set. + */ + public void setErrorHandler(@NotNull ErrorHandler errorHandler) + { + this.errorHandler = errorHandler; + } + /** * A simple interface to handle packets. * @param the type of the packets. @@ -565,6 +568,10 @@ public void handleItem(T item) { handler.handlePacket(item); } + catch (Throwable t) + { + errorHandler.packetHandlingFailed(t); + } finally { releasePacket(item); @@ -573,22 +580,74 @@ public void handleItem(T item) } /** - * Gets a JSON representation of the parts of this object's state that - * are deemed useful for debugging. + * An interface for handling the two types of error conditions from a queue. */ - public JSONObject getDebugState() + public interface ErrorHandler { - JSONObject debugState = new JSONObject(); - debugState.put("id", id); - debugState.put("capacity", capacity); - debugState.put("copy", copy); - debugState.put("closed", closed); - debugState.put( - "statistics", - queueStatistics == null - ? null : queueStatistics.getInstanceStats()); - debugState.put("num_dropped_packets", numDroppedPackets.get()); + /** + * Called when a packet is dropped from the queue because a new packet + * was added while it was full. + */ + default void packetDropped() {} - return debugState; + /** + * Called when handling of a packet produces an exception. + * @param t + */ + default void packetHandlingFailed(Throwable t) {} + } + + /** + * An {@link ErrorHandler} implementation which counts the number of + * dropped packets and exceptions. + */ + public class CountingErrorHandler implements ErrorHandler + { + /** + * The number of dropped packets. + */ + private final AtomicLong numPacketsDropped = new AtomicLong(); + + /** + * The number of exceptions. + */ + private final AtomicLong numExceptions = new AtomicLong(); + + /** + * {@inheritDoc} + */ + @Override + public void packetDropped() + { + numPacketsDropped.incrementAndGet(); + } + + /** + * {@inheritDoc} + */ + @Override + public void packetHandlingFailed(Throwable t) + { + numExceptions.incrementAndGet(); + logger.warn("Failed to handle a packet: ", t); + } + + /** + * Get the number of dropped packets. + * @return + */ + public long getNumPacketsDropped() + { + return numPacketsDropped.get(); + } + + /** + * Get the number of exceptions. + * @return + */ + public long getNumExceptions() + { + return numExceptions.get(); + } } } diff --git a/src/main/java/org/jitsi/utils/stats/QueueStatistics.java b/src/main/java/org/jitsi/utils/stats/QueueStatistics.java index 14cfda6b..27a1c821 100644 --- a/src/main/java/org/jitsi/utils/stats/QueueStatistics.java +++ b/src/main/java/org/jitsi/utils/stats/QueueStatistics.java @@ -17,40 +17,10 @@ import org.json.simple.*; -import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; public class QueueStatistics { - /** - * Gets the {@link QueueStatistics} instance for a given ID. - * @param id the ID. - */ - public static QueueStatistics get(String id) - { - return statisticsMap.computeIfAbsent(id, x -> new QueueStatistics()); - } - - /** - * Gets a snapshot of all queue statistics in JSON format. - * @return - */ - public static JSONObject getStatsJson() - { - JSONObject stats = new JSONObject(); - statisticsMap.forEach((id, queueStatistics) -> - stats.put(id, queueStatistics.getInstanceStats())); - - return stats; - } - - /** - * Maps an ID to a {@link QueueStatistics} instance. - */ - private static Map statisticsMap - = new ConcurrentHashMap<>(); - /** * The scale to use for {@link RateStatistics}. 1000 means units * (e.g. packets) per second. @@ -108,7 +78,7 @@ public QueueStatistics() /** * Gets a snapshot of the stats in JSON format. */ - public JSONObject getInstanceStats() + public JSONObject getStats() { JSONObject stats = new JSONObject(); long now = System.currentTimeMillis();