Skip to content

Commit 640c4e4

Browse files
GH-8: Refactor how various operations are retried
This commit replaces RetryableException with a more consistent pattern using a RetryableContext. This allows for operations to specify various aspects about how the retry should be handled. Specifically: - the total timeout for the operation - the waiting interval between operation attempts - the returning result for a failed operation - the specific means to handle the interval - either sleep()ing or wait()ing This change also fixes possible hangs during bucket moves as it ensures that no locks are being held while the operation waits.
1 parent 6f26175 commit 640c4e4

16 files changed

+375
-218
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ dependencies {
6161

6262
implementation("com.vmware.gemfire:gemfire-membership:${gemfireVersion}")
6363
implementation("com.vmware.gemfire:gemfire-logging:${gemfireVersion}")
64-
implementation("com.vmware.gemfire:gemfire-log4j:${gemfireVersion}")
64+
implementation("com.vmware.gemfire:gemfire-logging:${gemfireVersion}")
6565

6666
distribution('org.apache.commons:commons-collections4:4.4')
6767

src/main/java/dev/gemfire/dtype/internal/AbstractDType.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.io.IOException;
1515
import java.io.UncheckedIOException;
1616
import java.util.Objects;
17+
import java.util.concurrent.atomic.AtomicReference;
1718

1819
import dev.gemfire.dtype.DType;
1920

@@ -34,6 +35,9 @@ public abstract class AbstractDType implements Delta, DataSerializable, DType {
3435
private transient DTypeCollectionsFunction deltaOperation = null;
3536
private transient OperationPerformer operationPerformer;
3637

38+
protected static final ThreadLocal<AtomicReference<RetryableContext>> retryableContext =
39+
ThreadLocal.withInitial(AtomicReference::new);
40+
3741
public AbstractDType() {}
3842

3943
public AbstractDType(String name) {
@@ -158,6 +162,10 @@ public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {
158162
}
159163
}
160164

165+
public void clearThreadLocal() {
166+
retryableContext.remove();
167+
}
168+
161169
protected static byte[] serialize(Object o) {
162170
HeapDataOutputStream heap = new HeapDataOutputStream(0);
163171
try {

src/main/java/dev/gemfire/dtype/internal/CollectionsBackendFunction.java

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
package dev.gemfire.dtype.internal;
66

7-
import static dev.gemfire.dtype.internal.OperationType.*;
7+
import static dev.gemfire.dtype.internal.OperationType.QUERY;
8+
import static dev.gemfire.dtype.internal.OperationType.UPDATE;
89

910
import java.util.concurrent.Callable;
1011

@@ -52,38 +53,43 @@ public void execute(FunctionContext<Object> context) {
5253
return innerResult;
5354
};
5455

55-
Object result = null;
56-
int retrySleepTime;
57-
long startTime = System.currentTimeMillis();
58-
do {
59-
retrySleepTime = 0;
60-
synchronized (entry) {
61-
try {
62-
result = ((PartitionedRegion) region).computeWithPrimaryLocked(name, wrappingFn);
63-
} catch (RetryableException rex) {
64-
retrySleepTime = rex.getRetrySleepTime();
65-
long elapsedTime = System.currentTimeMillis() - startTime;
66-
if (elapsedTime > rex.getMaxTimeToRetryMs()) {
67-
result = rex.getFailingResult();
68-
break;
56+
Object result;
57+
RetryableContext rex;
58+
try {
59+
for (;;) {
60+
synchronized (entry) {
61+
try {
62+
result = ((PartitionedRegion) region).computeWithPrimaryLocked(name, wrappingFn);
63+
if (result instanceof RetryableContext) {
64+
rex = (RetryableContext) result;
65+
if (rex.isTimedOut()) {
66+
result = rex.getFailingResult();
67+
break;
68+
}
69+
// If we're waiting we need to do so while synchronized on 'entry'
70+
rex.wait(entry);
71+
} else {
72+
break;
73+
}
74+
} catch (PrimaryBucketLockException | BucketMovedException | RegionDestroyedException ex) {
75+
throw ex;
76+
} catch (Exception ex) {
77+
context.getResultSender().sendException(ex);
78+
return;
6979
}
70-
} catch (PrimaryBucketLockException | BucketMovedException | RegionDestroyedException ex) {
71-
throw ex;
72-
} catch (Exception ex) {
73-
context.getResultSender().sendException(ex);
74-
return;
7580
}
76-
}
77-
if (retrySleepTime > 0) {
7881
try {
79-
Thread.sleep(retrySleepTime);
82+
// If we're sleeping we cannot be holding any locks
83+
rex.sleep();
8084
} catch (InterruptedException e) {
8185
Thread.currentThread().interrupt();
8286
context.getResultSender().sendException(new UncheckedInterruptedException(e));
8387
break;
8488
}
8589
}
86-
} while (retrySleepTime > 0);
90+
} finally {
91+
entry.clearThreadLocal();
92+
}
8793

8894
context.getResultSender().lastResult(result);
8995
}

src/main/java/dev/gemfire/dtype/internal/DBlockingQueueImpl.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,16 @@ public class DBlockingQueueImpl<E> extends AbstractDType implements DBlockingQue
4545
private final DTypeCollectionsFunction TAKE_FIRST_FN = x -> {
4646
E result = ((DBlockingQueueImpl<E>) x).deque.pollFirst();
4747
if (result == null) {
48-
throw new RetryableException(100);
48+
return retryableContext.get()
49+
.updateAndGet(y -> y == null ? new SleepingRetryable(100) : y);
4950
}
5051
return result;
5152
};
5253
private final DTypeCollectionsFunction TAKE_LAST_FN = x -> {
5354
E result = ((DBlockingQueueImpl<E>) x).deque.pollLast();
5455
if (result == null) {
55-
throw new RetryableException(100);
56+
return retryableContext.get()
57+
.updateAndGet(y -> y == null ? new SleepingRetryable(100) : y);
5658
}
5759
return result;
5860
};
@@ -171,7 +173,8 @@ public E peekLast() {
171173
public void putFirst(E e) throws InterruptedException {
172174
DTypeCollectionsFunction fn = x -> {
173175
if (!((DBlockingQueueImpl<E>) x).deque.offerFirst(e)) {
174-
throw new RetryableException(100);
176+
return retryableContext.get()
177+
.updateAndGet(y -> y == null ? new SleepingRetryable(100) : y);
175178
}
176179
return null;
177180
};
@@ -184,7 +187,8 @@ public void putLast(E e) throws InterruptedException {
184187
byte[] arg = serialize(e);
185188
DTypeCollectionsFunction fn = x -> {
186189
if (!((DBlockingQueueImpl<E>) x).deque.offerLast(deserialize(arg))) {
187-
throw new RetryableException(100);
190+
return retryableContext.get()
191+
.updateAndGet(y -> y == null ? new SleepingRetryable(100) : y);
188192
}
189193
return null;
190194
};
@@ -199,7 +203,8 @@ public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedEx
199203
if (((DBlockingQueueImpl<E>) x).deque.offerFirst(deserialize(arg))) {
200204
return true;
201205
}
202-
throw new RetryableException(100, timeout, unit, () -> false);
206+
return retryableContext.get()
207+
.updateAndGet(y -> y == null ? new SleepingRetryable(100, timeout, unit, () -> false) : y);
203208
};
204209
return updateInterruptibly(fn, CollectionsBackendFunction.ID);
205210
}
@@ -212,7 +217,8 @@ public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedExc
212217
if (((DBlockingQueueImpl<E>) x).deque.offerLast(deserialize(arg))) {
213218
return true;
214219
}
215-
throw new RetryableException(100, timeout, unit, () -> false);
220+
return retryableContext.get()
221+
.updateAndGet(y -> y == null ? new SleepingRetryable(100, timeout, unit, () -> false) : y);
216222
};
217223
return updateInterruptibly(fn, CollectionsBackendFunction.ID);
218224
}
@@ -235,7 +241,8 @@ public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
235241
DTypeCollectionsFunction fn = x -> {
236242
E result = ((DBlockingQueueImpl<E>) x).deque.pollFirst();
237243
if (result == null) {
238-
throw new RetryableException(100, timeout, unit);
244+
return retryableContext.get()
245+
.updateAndGet(y -> y == null ? new SleepingRetryable(100, timeout, unit) : y);
239246
}
240247
return result;
241248
};
@@ -247,7 +254,8 @@ public E pollLast(long timeout, TimeUnit unit) throws InterruptedException {
247254
DTypeCollectionsFunction fn = x -> {
248255
E result = ((DBlockingQueueImpl<E>) x).deque.pollLast();
249256
if (result == null) {
250-
throw new RetryableException(100, timeout, unit);
257+
return retryableContext.get()
258+
.updateAndGet(y -> y == null ? new SleepingRetryable(100, timeout, unit) : y);
251259
}
252260
return result;
253261
};
@@ -294,7 +302,8 @@ public void put(E e) throws InterruptedException {
294302
byte[] arg = serialize(e);
295303
DTypeCollectionsFunction fn = x -> {
296304
if (!((DBlockingQueueImpl<E>) x).deque.offerLast(deserialize(arg))) {
297-
throw new RetryableException(100);
305+
return retryableContext.get()
306+
.updateAndGet(y -> y == null ? new SleepingRetryable(100) : y);
298307
}
299308
return null;
300309
};
@@ -309,7 +318,8 @@ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedExcepti
309318
if (((DBlockingQueueImpl<E>) x).deque.offer(deserialize(arg))) {
310319
return true;
311320
}
312-
throw new RetryableException(100, timeout, unit, () -> false);
321+
return retryableContext.get()
322+
.updateAndGet(y -> y == null ? new SleepingRetryable(100, timeout, unit, () -> false) : y);
313323
};
314324
return updateInterruptibly(fn, CollectionsBackendFunction.ID);
315325
}
@@ -337,7 +347,8 @@ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
337347
DTypeCollectionsFunction fn = x -> {
338348
E result = ((DBlockingQueueImpl<E>) x).deque.poll();
339349
if (result == null) {
340-
throw new RetryableException(100, timeout, unit);
350+
return retryableContext.get()
351+
.updateAndGet(y -> y == null ? new SleepingRetryable(100, timeout, unit) : y);
341352
}
342353
return result;
343354
};

src/main/java/dev/gemfire/dtype/internal/DCountDownLatchImpl.java

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,23 @@ public class DCountDownLatchImpl extends AbstractDType implements DCountDownLatc
1414
private boolean isDestroyed = false;
1515
private int waiters = 0;
1616

17-
private final DTypeCollectionsFunction AWAIT_FN = x -> {
17+
private static final DTypeCollectionsFunction AWAIT_FN = x -> {
1818
DCountDownLatchImpl latch = (DCountDownLatchImpl) x;
19-
while (latch.count > 0) {
20-
latch.ensureUsable();
21-
try {
19+
latch.ensureUsable();
20+
if (latch.count > 0) {
21+
if (retryableContext.get().get() == null) {
2222
latch.waiters++;
23-
latch.wait(100);
24-
} catch (InterruptedException e) {
25-
throw new RuntimeException(e);
26-
} finally {
27-
latch.waiters--;
2823
}
24+
return retryableContext.get()
25+
.updateAndGet(y -> y == null ? new WaitingRetryable(100) : y);
26+
}
27+
if (retryableContext.get().get() != null) {
28+
latch.waiters--;
2929
}
3030
return null;
3131
};
3232

33-
private final DTypeCollectionsFunction COUNTDOWN_FN = x -> {
33+
private static final DTypeCollectionsFunction COUNTDOWN_FN = x -> {
3434
DCountDownLatchImpl latch = (DCountDownLatchImpl) x;
3535
latch.ensureUsable();
3636
if (latch.count > 0) {
@@ -66,31 +66,18 @@ public void await() {
6666
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
6767
DTypeCollectionsFunction fn = x -> {
6868
DCountDownLatchImpl latch = (DCountDownLatchImpl) x;
69-
if (latch.count == 0) {
70-
return true;
71-
}
72-
73-
long overallTimeoutMs = unit.toMillis(timeout);
74-
long waitTimeoutMs = overallTimeoutMs;
75-
long start = System.currentTimeMillis();
76-
77-
while (latch.count > 0 && System.currentTimeMillis() - start < overallTimeoutMs) {
78-
latch.ensureUsable();
79-
try {
69+
latch.ensureUsable();
70+
if (latch.count > 0) {
71+
if (retryableContext.get().get() == null) {
8072
latch.waiters++;
81-
long waitStart = System.currentTimeMillis();
82-
latch.wait(waitTimeoutMs);
83-
overallTimeoutMs -= waitStart;
84-
if (latch.count == 0) {
85-
return true;
86-
}
87-
} catch (InterruptedException e) {
88-
throw new UncheckedInterruptedException(e);
89-
} finally {
90-
latch.waiters--;
9173
}
74+
return retryableContext.get()
75+
.updateAndGet(y -> y == null ? new WaitingRetryable(100, timeout, unit, () -> false) : y);
9276
}
93-
return false;
77+
if (retryableContext.get().get() != null) {
78+
latch.waiters--;
79+
}
80+
return true;
9481
};
9582
return noDeltaUpdateInterruptibly(fn, CollectionsBackendFunction.ID);
9683
}
@@ -133,7 +120,7 @@ public int getWaiters() {
133120

134121
@Override
135122
public String toString() {
136-
return String.format("DCountDownLatchImpl{name=%s, count=%d}", getName(), count);
123+
return String.format("DCountDownLatchImpl{name=%s, count=%d, waiters=%d}", getName(), count, waiters);
137124
}
138125

139126
@Override

src/main/java/dev/gemfire/dtype/internal/DSemaphoreBackend.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class DSemaphoreBackend extends AbstractDType {
3030
// Map of client member names (uniqueTags) and corresponding permits held for this semaphore
3131
private Map<String, Integer> permitHolders = new HashMap<>();
3232
private boolean isInitialized = false;
33-
int queueLength = 0;
33+
private int queueLength = 0;
3434
// This is set when serialization has transferred state to another member and requires tracking
3535
// to be re-established.
3636
private boolean requiresRecovery;
@@ -78,7 +78,17 @@ public int getQueueLength() {
7878
* to acquire the requested permits
7979
*/
8080
public Object acquire(DTypeFunctionContext context, int permits) {
81-
return _acquire(context, permits) ? null : AbstractDType.RETRY_TOKEN;
81+
if (_acquire(context, permits)) {
82+
if (retryableContext.get().get() != null) {
83+
queueLength--;
84+
}
85+
return true;
86+
}
87+
if (retryableContext.get().get() == null) {
88+
queueLength++;
89+
}
90+
return retryableContext.get()
91+
.updateAndGet(y -> y == null ? new WaitingRetryable(100) : y);
8292
}
8393

8494
private boolean _acquire(DTypeFunctionContext context, int permits) {

0 commit comments

Comments
 (0)