Skip to content

Commit c41e68a

Browse files
Prefer to use a special RETRY_TOKEN to trigger retrying a semaphore acquire
Previously this would throw a RetryException, but this seems cleaner. Throwing exceptions to control flow is typically not good, especially because normal exceptions would also require filling in a stack trace which adds additional resource consumption. In this case the RetryException does not do that, but still this approach should be better.
1 parent acade07 commit c41e68a

File tree

5 files changed

+16
-15
lines changed

5 files changed

+16
-15
lines changed

src/main/java/dev/gemfire/dtype/internal/AbstractDType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
public abstract class AbstractDType implements Delta, DataSerializable, DType {
2929

30+
static final Object RETRY_TOKEN = new Object();
31+
3032
private String name;
3133
private transient Region<String, Object> region;
3234
private transient DTypeCollectionsFunction deltaOperation = null;

src/main/java/dev/gemfire/dtype/internal/CollectionsBackendFunction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@ public class CollectionsBackendFunction implements Function<Object> {
2222
public static final String ID = "dtype-collections-function";
2323

2424
@Override
25-
@SuppressWarnings("unchecked")
2625
public void execute(FunctionContext<Object> context) {
2726
Object[] args = (Object[]) context.getArguments();
2827
String name = (String) args[0];
2928
String memberTag = (String) args[1];
3029
DTypeCollectionsFunction fn = (DTypeCollectionsFunction) args[2];
3130
OperationType operationType = (OperationType) args[3];
3231

33-
Region<String, AbstractDType> region = ((RegionFunctionContext) context).getDataSet();
32+
Region<String, AbstractDType> region = ((RegionFunctionContext<?>) context).getDataSet();
3433
AbstractDType entry = region.get(name);
3534

3635
Callable<Object> wrappingFn = () -> {

src/main/java/dev/gemfire/dtype/internal/DSemaphoreBackend.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,17 @@ public int getQueueLength() {
6868
}
6969

7070
/**
71-
* Acquire permits or throw an exception if unable to do so. The exception is processed in the
72-
* {@link DSemaphoreBackendFunction} where the request is retried.
71+
* Acquire permits or return a special 'retry token' if unable to do so. The
72+
* token is processed in the {@link DSemaphoreBackendFunction} where the
73+
* request is retried.
7374
*
7475
* @param context the relevant context for DSemaphore calls (DSemaphoreFunctionContext)
7576
* @param permits the number of permits to acquire
77+
* @return null if successful or {@link AbstractDType#RETRY_TOKEN} if unable
78+
* to acquire the requested permits
7679
*/
77-
public void acquire(DTypeFunctionContext context, int permits) {
78-
if (!_acquire(context, permits)) {
79-
throw new RetryableException(0);
80-
}
80+
public Object acquire(DTypeFunctionContext context, int permits) {
81+
return _acquire(context, permits) ? null : AbstractDType.RETRY_TOKEN;
8182
}
8283

8384
private boolean _acquire(DTypeFunctionContext context, int permits) {

src/main/java/dev/gemfire/dtype/internal/DSemaphoreBackendFunction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import java.util.concurrent.Callable;
1010

11+
import dev.gemfire.dtype.DType;
1112
import org.apache.logging.log4j.Logger;
1213

1314
import org.apache.geode.cache.Region;
@@ -63,9 +64,10 @@ public void execute(FunctionContext<Object> context) {
6364
synchronized (finalEntry) {
6465
try {
6566
result = ((PartitionedRegion) region).computeWithPrimaryLocked(name, wrappingFn);
66-
break;
67-
} catch (RetryableException rex) {
68-
// This exception is only thrown when an acquire() fails. We need to ensure that we wait()
67+
if (result != AbstractDType.RETRY_TOKEN) {
68+
break;
69+
}
70+
// This path is only reached when an acquire() fails. We need to ensure that we wait()
6971
// without holding the read lock on the region otherwise threads could get stuck. For
7072
// example, during bucket movement.
7173
if (logger.isDebugEnabled()) {

src/main/java/dev/gemfire/dtype/internal/DSemaphoreImpl.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,7 @@ public void acquire() {
4242
@Override
4343
public void acquire(int permits) {
4444
validatePermits(permits);
45-
DTypeContextualFunction fn = (sem, ctx) -> {
46-
((DSemaphoreBackend) sem).acquire(ctx, permits);
47-
return null;
48-
};
45+
DTypeContextualFunction fn = (sem, ctx) -> ((DSemaphoreBackend) sem).acquire(ctx, permits);
4946
update(fn, DSemaphoreBackendFunction.ID);
5047
}
5148

0 commit comments

Comments
 (0)