Skip to content

Commit

Permalink
ref: Moves the packet queue classes in their own package.
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrozev committed Jun 3, 2019
1 parent bda71d5 commit 6c316f4
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.jitsi.utils;
package org.jitsi.utils.queue;

import org.jitsi.utils.concurrent.*;
import org.jitsi.utils.logging.*;
Expand All @@ -32,7 +32,7 @@
*
* @author Yura Yaroshevich
*/
public final class AsyncQueueHandler<T>
final class AsyncQueueHandler<T>
{
/**
* The {@link java.util.logging.Logger} used by the
Expand Down Expand Up @@ -158,7 +158,7 @@ public void run()
* @param id optional identifier of current handler for debug purpose
* @param executor optional executor service to borrow threads from
*/
public AsyncQueueHandler(
AsyncQueueHandler(
BlockingQueue<T> queue,
Handler<T> handler,
String id,
Expand All @@ -180,7 +180,7 @@ public AsyncQueueHandler(
* handled on thread borrowed from {@link #executor} before temporary
* releasing thread and re-acquiring it from {@link #executor}.
*/
public AsyncQueueHandler(
AsyncQueueHandler(
BlockingQueue<T> queue,
Handler<T> handler,
String id,
Expand All @@ -207,7 +207,7 @@ public AsyncQueueHandler(
/**
* Attempts to stop execution of {@link #reader} if running
*/
public void cancel()
void cancel()
{
cancel(true);
}
Expand All @@ -216,7 +216,7 @@ public void cancel()
* Checks if {@link #reader} is running on one of {@link #executor}
* thread and if no submits execution of {@link #reader} on executor.
*/
public void handleQueueItemsUntilEmpty()
void handleQueueItemsUntilEmpty()
{
synchronized (syncRoot)
{
Expand Down Expand Up @@ -276,7 +276,7 @@ private void rescheduleReader()
* A simple interface to handle enqueued {@link T} items.
* @param <T> the type of the item.
*/
public interface Handler<T>
interface Handler<T>
{
/**
* Does something with an item.
Expand Down
83 changes: 83 additions & 0 deletions src/main/java/org/jitsi/utils/queue/CountingErrorHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright @ 2019 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.utils.queue;

import org.jitsi.utils.logging.*;

import java.util.concurrent.atomic.*;

/**
* An {@link ErrorHandler} implementation which counts the number of
* dropped packets and exceptions.
*
* @author Boris Grozev
*/
public class CountingErrorHandler implements ErrorHandler
{
/**
* The {@link Logger} used by the {@link PacketQueue} class and its
* instances for logging output.
*/
private static final Logger logger
= Logger.getLogger(PacketQueue.class.getName());

/**
* The number of dropped packets.
*/
private final AtomicLong numPacketsDropped = new AtomicLong();

/**
* The number of exceptions.
*/
private final AtomicLong numExceptions = new AtomicLong();

/**
* {@inheritDoc}
*/
@Override
public void packetDropped()
{
numPacketsDropped.incrementAndGet();
}

/**
* {@inheritDoc}
*/
@Override
public void packetHandlingFailed(Throwable t)
{
logger.warn("Failed to handle packet: ", t);
numExceptions.incrementAndGet();
}

/**
* Get the number of dropped packets.
* @return
*/
public long getNumPacketsDropped()
{
return numPacketsDropped.get();
}

/**
* Get the number of exceptions.
* @return
*/
public long getNumExceptions()
{
return numExceptions.get();
}
}
36 changes: 36 additions & 0 deletions src/main/java/org/jitsi/utils/queue/ErrorHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright @ 2019 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.utils.queue;

/**
* An interface for handling the two types of error conditions from a queue.
*
* @author Boris Grozev
*/
public interface ErrorHandler
{
/**
* Called when a packet is dropped from the queue because a new packet
* was added while it was full.
*/
default void packetDropped() {}

/**
* Called when handling of a packet produces an exception.
* @param t
*/
default void packetHandlingFailed(Throwable t) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.utils;
package org.jitsi.utils.queue;

import org.jitsi.utils.logging.*;
import org.jitsi.utils.stats.*;
import org.json.simple.*;
import org.jetbrains.annotations.*;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

/**
* An abstract queue of packets.
Expand Down Expand Up @@ -578,76 +576,4 @@ public void handleItem(T item)
}
}
}

/**
* An interface for handling the two types of error conditions from a queue.
*/
public interface ErrorHandler
{
/**
* Called when a packet is dropped from the queue because a new packet
* was added while it was full.
*/
default void packetDropped() {}

/**
* Called when handling of a packet produces an exception.
* @param t
*/
default void packetHandlingFailed(Throwable t) {}
}

/**
* An {@link ErrorHandler} implementation which counts the number of
* dropped packets and exceptions.
*/
public class CountingErrorHandler implements ErrorHandler
{
/**
* The number of dropped packets.
*/
private final AtomicLong numPacketsDropped = new AtomicLong();

/**
* The number of exceptions.
*/
private final AtomicLong numExceptions = new AtomicLong();

/**
* {@inheritDoc}
*/
@Override
public void packetDropped()
{
numPacketsDropped.incrementAndGet();
}

/**
* {@inheritDoc}
*/
@Override
public void packetHandlingFailed(Throwable t)
{
numExceptions.incrementAndGet();
logger.warn("Failed to handle a packet: ", t);
}

/**
* Get the number of dropped packets.
* @return
*/
public long getNumPacketsDropped()
{
return numPacketsDropped.get();
}

/**
* Get the number of exceptions.
* @return
*/
public long getNumExceptions()
{
return numExceptions.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.utils.stats;
package org.jitsi.utils.queue;

import org.jitsi.utils.stats.*;
import org.json.simple.*;

import java.util.concurrent.atomic.*;
Expand Down Expand Up @@ -97,7 +98,7 @@ public JSONObject getStats()
* @param now the time (in milliseconds since the epoch) at which the
* packet was added.
*/
public void add(long now)
void add(long now)
{
addRate.update(1, now);
totalPacketsAdded.incrementAndGet();
Expand All @@ -108,7 +109,7 @@ public void add(long now)
* @param now the time (in milliseconds since the epoch) at which the
* packet was removed.
*/
public void remove(long now)
void remove(long now)
{
removeRate.update(1, now);
totalPacketsRemoved.incrementAndGet();
Expand All @@ -119,7 +120,7 @@ public void remove(long now)
* @param now the time (in milliseconds since the epoch) at which the
* packet was dropped.
*/
public void drop(long now)
void drop(long now)
{
dropRate.update(1, now);
totalPacketsDropped.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.utils;
package org.jitsi.utils.queue;

import java.util.concurrent.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.utils;
package org.jitsi.utils.queue;

import org.junit.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.utils;
package org.jitsi.utils.queue;

import org.junit.*;

Expand Down

0 comments on commit 6c316f4

Please sign in to comment.