From bf694bda8db374e0f68a6100d91030c7abc88eca Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 3 Feb 2025 13:46:32 -0800 Subject: [PATCH] Eliminate lock in ConcurrencyLimitingRequestThrottler Following from 6d3ba47 this changes the throttler to a complete lock-free implementation. Update the related comments and README now that it is lock-free. --- .../session/throttling/RequestThrottler.java | 8 +- .../ConcurrencyLimitingRequestThrottler.java | 180 ++++++++---------- manual/core/non_blocking/README.md | 14 +- 3 files changed, 90 insertions(+), 112 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java b/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java index 7e2b41ebbdb..73d347d533e 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java @@ -23,10 +23,10 @@ /** * Limits the number of concurrent requests executed by the driver. * - *

Usage in non-blocking applications: beware that all built-in implementations of this interface - * use locks for internal coordination, and do not qualify as lock-free, with the obvious exception - * of {@code PassThroughRequestThrottler}. If your application enforces strict lock-freedom, then - * request throttling should not be enabled. + *

Usage in non-blocking applications: beware that some implementations of this interface use + * locks for internal coordination, and do not qualify as lock-free. If your application enforces + * strict lock-freedom, then you should use the {@code PassThroughRequestThrottler} or the {@code + * ConcurrencyLimitingRequestThrottler}. */ public interface RequestThrottler extends Closeable { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java index ffe0ffe9650..8146c5b113a 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java @@ -26,10 +26,9 @@ import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; -import java.util.ArrayDeque; import java.util.Deque; -import java.util.concurrent.locks.ReentrantLock; -import net.jcip.annotations.GuardedBy; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,17 +60,12 @@ public class ConcurrencyLimitingRequestThrottler implements RequestThrottler { private final String logPrefix; private final int maxConcurrentRequests; private final int maxQueueSize; - - private final ReentrantLock lock = new ReentrantLock(); - - @GuardedBy("lock") - private int concurrentRequests; - - @GuardedBy("lock") - private final Deque queue = new ArrayDeque<>(); - - @GuardedBy("lock") - private boolean closed; + private final AtomicInteger concurrentRequests = new AtomicInteger(0); + // CLQ is not O(1) for size(), as it forces a full iteration of the queue. So, we track + // the size of the queue explicitly. + private final Deque queue = new ConcurrentLinkedDeque<>(); + private final AtomicInteger queueSize = new AtomicInteger(0); + private volatile boolean closed = false; public ConcurrencyLimitingRequestThrottler(DriverContext context) { this.logPrefix = context.getSessionName(); @@ -88,50 +82,62 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) { @Override public void register(@NonNull Throttled request) { - boolean notifyReadyRequired = false; + if (closed) { + LOG.trace("[{}] Rejecting request after shutdown", logPrefix); + fail(request, "The session is shutting down"); + return; + } - lock.lock(); - try { - if (closed) { - LOG.trace("[{}] Rejecting request after shutdown", logPrefix); - fail(request, "The session is shutting down"); - } else if (queue.isEmpty() && concurrentRequests < maxConcurrentRequests) { - // We have capacity for one more concurrent request + // Implementation note: Technically the "concurrent requests" or "queue size" + // could read transiently over the limit, but the queue itself will never grow + // beyond the limit since we always check for that condition and revert if + // over-limit. We do this instead of a CAS-loop to avoid the potential loop. + + // If no backlog exists AND we get capacity, we can execute immediately + if (queueSize.get() == 0) { + // Take a claim first, and then check if we are OK to proceed + int newConcurrent = concurrentRequests.incrementAndGet(); + if (newConcurrent <= maxConcurrentRequests) { LOG.trace("[{}] Starting newly registered request", logPrefix); - concurrentRequests += 1; - notifyReadyRequired = true; - } else if (queue.size() < maxQueueSize) { - LOG.trace("[{}] Enqueuing request", logPrefix); - queue.add(request); + request.onThrottleReady(false); + return; } else { - LOG.trace("[{}] Rejecting request because of full queue", logPrefix); - fail( - request, - String.format( - "The session has reached its maximum capacity " - + "(concurrent requests: %d, queue size: %d)", - maxConcurrentRequests, maxQueueSize)); + // We exceeded the limit, decrement the count and fall through to the queuing logic + concurrentRequests.decrementAndGet(); } - } finally { - lock.unlock(); } - // no need to hold the lock while allowing the task to progress - if (notifyReadyRequired) { - request.onThrottleReady(false); + // If we have a backlog, or we failed to claim capacity, try to enqueue + int newQueueSize = queueSize.incrementAndGet(); + if (newQueueSize <= maxQueueSize) { + LOG.trace("[{}] Enqueuing request", logPrefix); + queue.offer(request); + + // Double-check that we were still supposed to be enqueued; it is possible + // that the session was closed while we were enqueuing, it's also possible + // that it is right now removing the request, so we need to check both + if (closed) { + if (queue.remove(request)) { + queueSize.decrementAndGet(); + LOG.trace("[{}] Rejecting late request after shutdown", logPrefix); + fail(request, "The session is shutting down"); + } + } + } else { + LOG.trace("[{}] Rejecting request because of full queue", logPrefix); + queueSize.decrementAndGet(); + fail( + request, + String.format( + "The session has reached its maximum capacity " + + "(concurrent requests: %d, queue size: %d)", + maxConcurrentRequests, maxQueueSize)); } } @Override public void signalSuccess(@NonNull Throttled request) { - Throttled nextRequest = null; - lock.lock(); - try { - nextRequest = onRequestDoneAndDequeNext(); - } finally { - lock.unlock(); - } - + Throttled nextRequest = onRequestDoneAndDequeNext(); if (nextRequest != null) { nextRequest.onThrottleReady(true); } @@ -145,17 +151,13 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) { @Override public void signalTimeout(@NonNull Throttled request) { Throttled nextRequest = null; - lock.lock(); - try { - if (!closed) { - if (queue.remove(request)) { // The request timed out before it was active - LOG.trace("[{}] Removing timed out request from the queue", logPrefix); - } else { - nextRequest = onRequestDoneAndDequeNext(); - } + if (!closed) { + if (queue.remove(request)) { // The request timed out before it was active + queueSize.decrementAndGet(); + LOG.trace("[{}] Removing timed out request from the queue", logPrefix); + } else { + nextRequest = onRequestDoneAndDequeNext(); } - } finally { - lock.unlock(); } if (nextRequest != null) { @@ -166,17 +168,13 @@ public void signalTimeout(@NonNull Throttled request) { @Override public void signalCancel(@NonNull Throttled request) { Throttled nextRequest = null; - lock.lock(); - try { - if (!closed) { - if (queue.remove(request)) { // The request has been cancelled before it was active - LOG.trace("[{}] Removing cancelled request from the queue", logPrefix); - } else { - nextRequest = onRequestDoneAndDequeNext(); - } + if (!closed) { + if (queue.remove(request)) { // The request has been cancelled before it was active + queueSize.decrementAndGet(); + LOG.trace("[{}] Removing cancelled request from the queue", logPrefix); + } else { + nextRequest = onRequestDoneAndDequeNext(); } - } finally { - lock.unlock(); } if (nextRequest != null) { @@ -184,17 +182,16 @@ public void signalCancel(@NonNull Throttled request) { } } - @SuppressWarnings("GuardedBy") // this method is only called with the lock held @Nullable private Throttled onRequestDoneAndDequeNext() { - assert lock.isHeldByCurrentThread(); if (!closed) { - if (queue.isEmpty()) { - concurrentRequests -= 1; + Throttled nextRequest = queue.poll(); + if (nextRequest == null) { + concurrentRequests.decrementAndGet(); } else { + queueSize.decrementAndGet(); LOG.trace("[{}] Starting dequeued request", logPrefix); - // don't touch concurrentRequests since we finished one but started another - return queue.poll(); + return nextRequest; } } @@ -204,45 +201,28 @@ private Throttled onRequestDoneAndDequeNext() { @Override public void close() { - lock.lock(); - try { - closed = true; - LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, queue.size()); - for (Throttled request : queue) { - fail(request, "The session is shutting down"); - } - } finally { - lock.unlock(); + closed = true; + + LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, queueSize.get()); + Throttled request; + while ((request = queue.poll()) != null) { + queueSize.decrementAndGet(); + fail(request, "The session is shutting down"); } } public int getQueueSize() { - lock.lock(); - try { - return queue.size(); - } finally { - lock.unlock(); - } + return queueSize.get(); } @VisibleForTesting int getConcurrentRequests() { - lock.lock(); - try { - return concurrentRequests; - } finally { - lock.unlock(); - } + return concurrentRequests.get(); } @VisibleForTesting Deque getQueue() { - lock.lock(); - try { - return queue; - } finally { - lock.unlock(); - } + return queue; } private static void fail(Throttled request, String message) { diff --git a/manual/core/non_blocking/README.md b/manual/core/non_blocking/README.md index 7abe9d856a3..f320ffd13d2 100644 --- a/manual/core/non_blocking/README.md +++ b/manual/core/non_blocking/README.md @@ -152,15 +152,13 @@ should not be used if strict lock-freedom is enforced. [`SafeInitNodeStateListener`]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/metadata/SafeInitNodeStateListener.html -The same is valid for both built-in [request throttlers]: +The `RateLimitingRequestThrottler` is currently blocking. The `ConcurrencyLimitingRequestThrottler` +is lock-free. -* `ConcurrencyLimitingRequestThrottler` -* `RateLimitingRequestThrottler` - -See the section about [throttling](../throttling) for details about these components. Again, they -use locks internally, and depending on how many requests are being executed in parallel, the thread -contention on these locks can be high: in short, if your application enforces strict lock-freedom, -then these components should not be used. +See the section about [throttling](../throttling) for details about these components. Depending on +how many requests are being executed in parallel, the thread contention on these locks can be high: +in short, if your application enforces strict lock-freedom, then you should not use the +`RateLimitingRequestThrottler`. [request throttlers]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.html