Skip to content

Commit 148378c

Browse files
authored
ScheduledRunnable to honor interrupt settings from Schedulers.from usage (#7745)
1 parent dc9764e commit 148378c

File tree

3 files changed

+163
-3
lines changed

3 files changed

+163
-3
lines changed

src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit
204204

205205
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
206206

207-
ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks);
207+
ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks, interruptibleWorker);
208208
tasks.add(sr);
209209

210210
if (executor instanceof ScheduledExecutorService) {

src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
2424

2525
private static final long serialVersionUID = -6120223772001106981L;
2626
final Runnable actual;
27+
final boolean interruptOnCancel;
2728

2829
/** Indicates that the parent tracking this task has been notified about its completion. */
2930
static final Object PARENT_DISPOSED = new Object();
@@ -41,12 +42,26 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
4142
/**
4243
* Creates a ScheduledRunnable by wrapping the given action and setting
4344
* up the optional parent.
45+
* The underlying future will be interrupted if the task is disposed asynchronously.
4446
* @param actual the runnable to wrap, not-null (not verified)
4547
* @param parent the parent tracking container or null if none
4648
*/
4749
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
50+
this(actual, parent, true);
51+
}
52+
53+
/**
54+
* Creates a ScheduledRunnable by wrapping the given action and setting
55+
* up the optional parent.
56+
* @param actual the runnable to wrap, not-null (not verified)
57+
* @param parent the parent tracking container or null if none
58+
* @param interruptOnCancel if true, the underlying future will be interrupted when disposing
59+
* this task from a different thread than it is running on.
60+
*/
61+
public ScheduledRunnable(Runnable actual, DisposableContainer parent, boolean interruptOnCancel) {
4862
super(3);
4963
this.actual = actual;
64+
this.interruptOnCancel = interruptOnCancel;
5065
this.lazySet(0, parent);
5166
}
5267

@@ -95,7 +110,7 @@ public void setFuture(Future<?> f) {
95110
return;
96111
}
97112
if (o == ASYNC_DISPOSED) {
98-
f.cancel(true);
113+
f.cancel(interruptOnCancel);
99114
return;
100115
}
101116
if (compareAndSet(FUTURE_INDEX, o, f)) {
@@ -114,7 +129,7 @@ public void dispose() {
114129
boolean async = get(THREAD_INDEX) != Thread.currentThread();
115130
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
116131
if (o != null) {
117-
((Future<?>)o).cancel(async);
132+
((Future<?>)o).cancel(async && interruptOnCancel);
118133
}
119134
break;
120135
}

src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java

+145
Original file line numberDiff line numberDiff line change
@@ -965,4 +965,149 @@ public void run() {
965965
exec.shutdown();
966966
}
967967
}
968+
969+
public static class TrackInterruptScheduledExecutor extends ScheduledThreadPoolExecutor {
970+
971+
public final AtomicBoolean interruptReceived = new AtomicBoolean();
972+
973+
public TrackInterruptScheduledExecutor() {
974+
super(10);
975+
}
976+
977+
@Override
978+
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
979+
return new TrackingScheduledFuture<V>(super.schedule(callable, delay, unit));
980+
}
981+
982+
class TrackingScheduledFuture<V> implements ScheduledFuture<V> {
983+
984+
ScheduledFuture<V> original;
985+
986+
TrackingScheduledFuture(ScheduledFuture<V> original) {
987+
this.original = original;
988+
}
989+
990+
@Override
991+
public long getDelay(TimeUnit unit) {
992+
return original.getDelay(unit);
993+
}
994+
995+
@Override
996+
public int compareTo(Delayed o) {
997+
return original.compareTo(o);
998+
}
999+
1000+
@Override
1001+
public boolean cancel(boolean mayInterruptIfRunning) {
1002+
if (mayInterruptIfRunning) {
1003+
interruptReceived.set(true);
1004+
}
1005+
return original.cancel(mayInterruptIfRunning);
1006+
}
1007+
1008+
@Override
1009+
public boolean isCancelled() {
1010+
return original.isCancelled();
1011+
}
1012+
1013+
@Override
1014+
public boolean isDone() {
1015+
return original.isDone();
1016+
}
1017+
1018+
@Override
1019+
public V get() throws InterruptedException, ExecutionException {
1020+
return original.get();
1021+
}
1022+
1023+
@Override
1024+
public V get(long timeout, TimeUnit unit)
1025+
throws InterruptedException, ExecutionException, TimeoutException {
1026+
return get(timeout, unit);
1027+
}
1028+
}
1029+
}
1030+
1031+
@Test
1032+
public void noInterruptBeforeRunningDelayedWorker() throws Throwable {
1033+
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();
1034+
1035+
try {
1036+
Scheduler sch = Schedulers.from(exec, false);
1037+
1038+
Worker worker = sch.createWorker();
1039+
1040+
Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS);
1041+
1042+
d.dispose();
1043+
1044+
int i = 150;
1045+
1046+
while (i-- > 0) {
1047+
assertFalse("Task interrupt detected", exec.interruptReceived.get());
1048+
Thread.sleep(10);
1049+
}
1050+
1051+
} finally {
1052+
exec.shutdownNow();
1053+
}
1054+
}
1055+
1056+
@Test
1057+
public void hasInterruptBeforeRunningDelayedWorker() throws Throwable {
1058+
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();
1059+
1060+
try {
1061+
Scheduler sch = Schedulers.from(exec, true);
1062+
1063+
Worker worker = sch.createWorker();
1064+
1065+
Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS);
1066+
1067+
d.dispose();
1068+
1069+
Thread.sleep(100);
1070+
assertTrue("Task interrupt detected", exec.interruptReceived.get());
1071+
1072+
} finally {
1073+
exec.shutdownNow();
1074+
}
1075+
}
1076+
1077+
@Test
1078+
public void noInterruptAfterRunningDelayedWorker() throws Throwable {
1079+
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();
1080+
1081+
try {
1082+
Scheduler sch = Schedulers.from(exec, false);
1083+
1084+
Worker worker = sch.createWorker();
1085+
AtomicBoolean taskRun = new AtomicBoolean();
1086+
1087+
Disposable d = worker.schedule(() -> {
1088+
taskRun.set(true);
1089+
try {
1090+
Thread.sleep(1000);
1091+
} catch (InterruptedException ex) {
1092+
exec.interruptReceived.set(true);
1093+
}
1094+
}, 100, TimeUnit.MILLISECONDS);
1095+
1096+
Thread.sleep(150);
1097+
;
1098+
d.dispose();
1099+
1100+
int i = 50;
1101+
1102+
while (i-- > 0) {
1103+
assertFalse("Task interrupt detected", exec.interruptReceived.get());
1104+
Thread.sleep(10);
1105+
}
1106+
1107+
assertTrue("Task run at all", taskRun.get());
1108+
1109+
} finally {
1110+
exec.shutdownNow();
1111+
}
1112+
}
9681113
}

0 commit comments

Comments
 (0)