Skip to content

Commit

Permalink
Keeps the queue stats (if enabled) per-instance.
Browse files Browse the repository at this point in the history
Removes the global id-based stats. Tracks the dropped packets
and exceptions separately.
  • Loading branch information
bgrozev committed Jun 3, 2019
1 parent 39d03b3 commit bda71d5
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 79 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>15.0</version>
</dependency>

<dependency>
<groupId>junit</groupId>
Expand Down
155 changes: 107 additions & 48 deletions src/main/java/org/jitsi/utils/PacketQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,13 @@
import org.jitsi.utils.logging.*;
import org.jitsi.utils.stats.*;
import org.json.simple.*;
import org.jetbrains.annotations.*;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

/**
* An abstract queue of packets. This is meant to eventually be able to be used
* in the following classes (in ice4j and libjitsi) in place of their ad-hoc
* queue implementations (which is the reason the class is parameterized):
* <br> RTPConnectorOutputStream.Queue
* <br> PushSourceStreamImpl#readQ
* <br> OutputDataStreamImpl#writeQ
* <br> SinglePortHarvester.MySocket#queue
* <br> MultiplexingSocket#received (and the rest of Multiplex* classes).
* An abstract queue of packets.
*
* @author Boris Grozev
* @author Yura Yaroshevich
Expand Down Expand Up @@ -65,20 +59,6 @@ public static void setEnableStatisticsDefault(boolean enable)
enableStatisticsDefault = enable;
}

/**
* Returns true if a warning should be logged after a queue has dropped
* {@code numDroppedPackets} packets.
* @param numDroppedPackets the number of dropped packets.
* @return {@code true} if a warning should be logged.
*/
public static boolean logDroppedPacket(int numDroppedPackets)
{
return
numDroppedPackets == 1 ||
(numDroppedPackets <= 1000 && numDroppedPackets % 100 == 0) ||
numDroppedPackets % 1000 == 0;
}

/**
* The underlying {@link BlockingQueue} which holds packets.
* Used as synchronization object between {@link #close()}, {@link #get()}
Expand Down Expand Up @@ -124,10 +104,10 @@ public static boolean logDroppedPacket(int numDroppedPackets)
private final int capacity;

/**
* The number of packets which were dropped from this {@link PacketQueue} as
* a result of a packet being added while the queue is at full capacity.
* Handles dropped packets and exceptions thrown while processing.
*/
private final AtomicInteger numDroppedPackets = new AtomicInteger();
@NotNull
private ErrorHandler errorHandler = new ErrorHandler(){};

/**
* Initializes a new {@link PacketQueue} instance.
Expand Down Expand Up @@ -215,11 +195,11 @@ public PacketQueue(
enableStatistics = enableStatisticsDefault;
}
queueStatistics
= enableStatistics ? QueueStatistics.get(id) : null;
= enableStatistics ? new QueueStatistics() : null;

if (packetHandler != null)
{
asyncQueueHandler = new AsyncQueueHandler<T>(
asyncQueueHandler = new AsyncQueueHandler<>(
queue,
new HandlerAdapter(packetHandler),
id,
Expand Down Expand Up @@ -315,13 +295,7 @@ private void doAdd(T pkt)
{
queueStatistics.drop(System.currentTimeMillis());
}
final int numDroppedPackets =
this.numDroppedPackets.incrementAndGet();
if (logDroppedPacket(numDroppedPackets))
{
logger.warn(
"Packets dropped (id=" + id + "): " + numDroppedPackets);
}
errorHandler.packetDropped();

// Call release on dropped packet to allow proper implementation
// of object pooling by PacketQueue users
Expand Down Expand Up @@ -500,6 +474,35 @@ protected void releasePacket(T pkt)
{
}

/**
* Gets a JSON representation of the parts of this object's state that
* are deemed useful for debugging.
*/
public JSONObject getDebugState()
{
JSONObject debugState = new JSONObject();
debugState.put("id", id);
debugState.put("capacity", capacity);
debugState.put("copy", copy);
debugState.put("closed", closed);
debugState.put(
"statistics",
queueStatistics == null
? null : queueStatistics.getStats());

return debugState;
}

/**
* Sets the handler of errors (packets dropped or exceptions caught while
* processing).
* @param errorHandler the handler to set.
*/
public void setErrorHandler(@NotNull ErrorHandler errorHandler)
{
this.errorHandler = errorHandler;
}

/**
* A simple interface to handle packets.
* @param <T> the type of the packets.
Expand Down Expand Up @@ -565,6 +568,10 @@ public void handleItem(T item)
{
handler.handlePacket(item);
}
catch (Throwable t)
{
errorHandler.packetHandlingFailed(t);
}
finally
{
releasePacket(item);
Expand All @@ -573,22 +580,74 @@ public void handleItem(T item)
}

/**
* Gets a JSON representation of the parts of this object's state that
* are deemed useful for debugging.
* An interface for handling the two types of error conditions from a queue.
*/
public JSONObject getDebugState()
public interface ErrorHandler
{
JSONObject debugState = new JSONObject();
debugState.put("id", id);
debugState.put("capacity", capacity);
debugState.put("copy", copy);
debugState.put("closed", closed);
debugState.put(
"statistics",
queueStatistics == null
? null : queueStatistics.getInstanceStats());
debugState.put("num_dropped_packets", numDroppedPackets.get());
/**
* Called when a packet is dropped from the queue because a new packet
* was added while it was full.
*/
default void packetDropped() {}

return debugState;
/**
* Called when handling of a packet produces an exception.
* @param t
*/
default void packetHandlingFailed(Throwable t) {}
}

/**
* An {@link ErrorHandler} implementation which counts the number of
* dropped packets and exceptions.
*/
public class CountingErrorHandler implements ErrorHandler
{
/**
* The number of dropped packets.
*/
private final AtomicLong numPacketsDropped = new AtomicLong();

/**
* The number of exceptions.
*/
private final AtomicLong numExceptions = new AtomicLong();

/**
* {@inheritDoc}
*/
@Override
public void packetDropped()
{
numPacketsDropped.incrementAndGet();
}

/**
* {@inheritDoc}
*/
@Override
public void packetHandlingFailed(Throwable t)
{
numExceptions.incrementAndGet();
logger.warn("Failed to handle a packet: ", t);
}

/**
* Get the number of dropped packets.
* @return
*/
public long getNumPacketsDropped()
{
return numPacketsDropped.get();
}

/**
* Get the number of exceptions.
* @return
*/
public long getNumExceptions()
{
return numExceptions.get();
}
}
}
32 changes: 1 addition & 31 deletions src/main/java/org/jitsi/utils/stats/QueueStatistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,10 @@

import org.json.simple.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class QueueStatistics
{
/**
* Gets the {@link QueueStatistics} instance for a given ID.
* @param id the ID.
*/
public static QueueStatistics get(String id)
{
return statisticsMap.computeIfAbsent(id, x -> new QueueStatistics());
}

/**
* Gets a snapshot of all queue statistics in JSON format.
* @return
*/
public static JSONObject getStatsJson()
{
JSONObject stats = new JSONObject();
statisticsMap.forEach((id, queueStatistics) ->
stats.put(id, queueStatistics.getInstanceStats()));

return stats;
}

/**
* Maps an ID to a {@link QueueStatistics} instance.
*/
private static Map<String, QueueStatistics> statisticsMap
= new ConcurrentHashMap<>();

/**
* The scale to use for {@link RateStatistics}. 1000 means units
* (e.g. packets) per second.
Expand Down Expand Up @@ -108,7 +78,7 @@ public QueueStatistics()
/**
* Gets a snapshot of the stats in JSON format.
*/
public JSONObject getInstanceStats()
public JSONObject getStats()
{
JSONObject stats = new JSONObject();
long now = System.currentTimeMillis();
Expand Down

0 comments on commit bda71d5

Please sign in to comment.