Skip to content

Eliminate lock in ConcurrencyLimitingRequestThrottler #2025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
/**
* Limits the number of concurrent requests executed by the driver.
*
* <p>Usage in non-blocking applications: beware that all built-in implementations of this interface
* use locks for internal coordination, and do not qualify as lock-free, with the obvious exception
* of {@code PassThroughRequestThrottler}. If your application enforces strict lock-freedom, then
* request throttling should not be enabled.
* <p>Usage in non-blocking applications: beware that some implementations of this interface use
* locks for internal coordination, and do not qualify as lock-free. If your application enforces
* strict lock-freedom, then you should use the {@code PassThroughRequestThrottler} or the {@code
* ConcurrencyLimitingRequestThrottler}.
*/
public interface RequestThrottler extends Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.ReentrantLock;
import net.jcip.annotations.GuardedBy;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -61,17 +60,12 @@ public class ConcurrencyLimitingRequestThrottler implements RequestThrottler {
private final String logPrefix;
private final int maxConcurrentRequests;
private final int maxQueueSize;

private final ReentrantLock lock = new ReentrantLock();

@GuardedBy("lock")
private int concurrentRequests;

@GuardedBy("lock")
private final Deque<Throttled> queue = new ArrayDeque<>();

@GuardedBy("lock")
private boolean closed;
private final AtomicInteger concurrentRequests = new AtomicInteger(0);
// CLQ is not O(1) for size(), as it forces a full iteration of the queue. So, we track
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, notable comment as would have questioned this otherwise 😛

// the size of the queue explicitly.
private final Deque<Throttled> queue = new ConcurrentLinkedDeque<>();
private final AtomicInteger queueSize = new AtomicInteger(0);
private volatile boolean closed = false;

public ConcurrencyLimitingRequestThrottler(DriverContext context) {
this.logPrefix = context.getSessionName();
Expand All @@ -88,50 +82,62 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) {

@Override
public void register(@NonNull Throttled request) {
boolean notifyReadyRequired = false;
if (closed) {
LOG.trace("[{}] Rejecting request after shutdown", logPrefix);
fail(request, "The session is shutting down");
return;
}

lock.lock();
try {
if (closed) {
LOG.trace("[{}] Rejecting request after shutdown", logPrefix);
fail(request, "The session is shutting down");
} else if (queue.isEmpty() && concurrentRequests < maxConcurrentRequests) {
// We have capacity for one more concurrent request
// Implementation note: Technically the "concurrent requests" or "queue size"
// could read transiently over the limit, but the queue itself will never grow
// beyond the limit since we always check for that condition and revert if
// over-limit. We do this instead of a CAS-loop to avoid the potential loop.

// If no backlog exists AND we get capacity, we can execute immediately
if (queueSize.get() == 0) {
// Take a claim first, and then check if we are OK to proceed
int newConcurrent = concurrentRequests.incrementAndGet();
if (newConcurrent <= maxConcurrentRequests) {
LOG.trace("[{}] Starting newly registered request", logPrefix);
concurrentRequests += 1;
notifyReadyRequired = true;
} else if (queue.size() < maxQueueSize) {
LOG.trace("[{}] Enqueuing request", logPrefix);
queue.add(request);
request.onThrottleReady(false);
return;
} else {
LOG.trace("[{}] Rejecting request because of full queue", logPrefix);
fail(
request,
String.format(
"The session has reached its maximum capacity "
+ "(concurrent requests: %d, queue size: %d)",
maxConcurrentRequests, maxQueueSize));
// We exceeded the limit, decrement the count and fall through to the queuing logic
concurrentRequests.decrementAndGet();
}
} finally {
lock.unlock();
}

// no need to hold the lock while allowing the task to progress
if (notifyReadyRequired) {
request.onThrottleReady(false);
// If we have a backlog, or we failed to claim capacity, try to enqueue
int newQueueSize = queueSize.incrementAndGet();
if (newQueueSize <= maxQueueSize) {
LOG.trace("[{}] Enqueuing request", logPrefix);
queue.offer(request);

// Double-check that we were still supposed to be enqueued; it is possible
// that the session was closed while we were enqueuing, it's also possible
// that it is right now removing the request, so we need to check both
if (closed) {
if (queue.remove(request)) {
queueSize.decrementAndGet();
LOG.trace("[{}] Rejecting late request after shutdown", logPrefix);
fail(request, "The session is shutting down");
}
}
} else {
LOG.trace("[{}] Rejecting request because of full queue", logPrefix);
queueSize.decrementAndGet();
fail(
request,
String.format(
"The session has reached its maximum capacity "
+ "(concurrent requests: %d, queue size: %d)",
maxConcurrentRequests, maxQueueSize));
}
}

@Override
public void signalSuccess(@NonNull Throttled request) {
Throttled nextRequest = null;
lock.lock();
try {
nextRequest = onRequestDoneAndDequeNext();
} finally {
lock.unlock();
}

Throttled nextRequest = onRequestDoneAndDequeNext();
if (nextRequest != null) {
nextRequest.onThrottleReady(true);
}
Expand All @@ -145,17 +151,13 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) {
@Override
public void signalTimeout(@NonNull Throttled request) {
Throttled nextRequest = null;
lock.lock();
try {
if (!closed) {
if (queue.remove(request)) { // The request timed out before it was active
LOG.trace("[{}] Removing timed out request from the queue", logPrefix);
} else {
nextRequest = onRequestDoneAndDequeNext();
}
if (!closed) {
if (queue.remove(request)) { // The request timed out before it was active
queueSize.decrementAndGet();
LOG.trace("[{}] Removing timed out request from the queue", logPrefix);
} else {
nextRequest = onRequestDoneAndDequeNext();
}
} finally {
lock.unlock();
}

if (nextRequest != null) {
Expand All @@ -166,35 +168,30 @@ public void signalTimeout(@NonNull Throttled request) {
@Override
public void signalCancel(@NonNull Throttled request) {
Throttled nextRequest = null;
lock.lock();
try {
if (!closed) {
if (queue.remove(request)) { // The request has been cancelled before it was active
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
} else {
nextRequest = onRequestDoneAndDequeNext();
}
if (!closed) {
if (queue.remove(request)) { // The request has been cancelled before it was active
queueSize.decrementAndGet();
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
} else {
nextRequest = onRequestDoneAndDequeNext();
}
} finally {
lock.unlock();
}

if (nextRequest != null) {
nextRequest.onThrottleReady(true);
}
}

@SuppressWarnings("GuardedBy") // this method is only called with the lock held
@Nullable
private Throttled onRequestDoneAndDequeNext() {
assert lock.isHeldByCurrentThread();
if (!closed) {
if (queue.isEmpty()) {
concurrentRequests -= 1;
Throttled nextRequest = queue.poll();
if (nextRequest == null) {
concurrentRequests.decrementAndGet();
} else {
queueSize.decrementAndGet();
LOG.trace("[{}] Starting dequeued request", logPrefix);
// don't touch concurrentRequests since we finished one but started another
return queue.poll();
return nextRequest;
}
}

Expand All @@ -204,45 +201,28 @@ private Throttled onRequestDoneAndDequeNext() {

@Override
public void close() {
lock.lock();
try {
closed = true;
LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, queue.size());
for (Throttled request : queue) {
fail(request, "The session is shutting down");
}
} finally {
lock.unlock();
closed = true;

LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, queueSize.get());
Throttled request;
while ((request = queue.poll()) != null) {
queueSize.decrementAndGet();
fail(request, "The session is shutting down");
}
}

public int getQueueSize() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
return queueSize.get();
}

@VisibleForTesting
int getConcurrentRequests() {
lock.lock();
try {
return concurrentRequests;
} finally {
lock.unlock();
}
return concurrentRequests.get();
}

@VisibleForTesting
Deque<Throttled> getQueue() {
lock.lock();
try {
return queue;
} finally {
lock.unlock();
}
return queue;
}

private static void fail(Throttled request, String message) {
Expand Down
14 changes: 6 additions & 8 deletions manual/core/non_blocking/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,13 @@ should not be used if strict lock-freedom is enforced.

[`SafeInitNodeStateListener`]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/metadata/SafeInitNodeStateListener.html

The same is valid for both built-in [request throttlers]:
The `RateLimitingRequestThrottler` is currently blocking. The `ConcurrencyLimitingRequestThrottler`
is lock-free.

* `ConcurrencyLimitingRequestThrottler`
* `RateLimitingRequestThrottler`

See the section about [throttling](../throttling) for details about these components. Again, they
use locks internally, and depending on how many requests are being executed in parallel, the thread
contention on these locks can be high: in short, if your application enforces strict lock-freedom,
then these components should not be used.
See the section about [throttling](../throttling) for details about these components. Depending on
how many requests are being executed in parallel, the thread contention on these locks can be high:
in short, if your application enforces strict lock-freedom, then you should not use the
`RateLimitingRequestThrottler`.

[request throttlers]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.html

Expand Down