Skip to content

Commit 49b9d91

Browse files
sebthompisv
andauthored
feat(jsonrpc): add JsonRpcRequestFuture with explicit protocol cancel (#907)
Fixes #423. Return a `JsonRpcRequestFuture` from `RemoteEndpoint.request(...)`, enabling `cancelRequest()` to send `$/cancelRequest` exactly once per request chain. Design highlights: - `cancelRequest()` sends the LSP `$/cancelRequest` exactly once for the entire request chain and does not cancel any future. - `getRoot()` returns the root request future; calling `cancel(boolean)` on the root ensures `cancelRequest()` is invoked exactly once. - `cancel(boolean)` on dependent stages cancels only that stage and does not send. - Protocol cancel emission is deduplicated across the chain under concurrency and skipped if the request is already completed. --------- Co-authored-by: Vladimir Piskarev <[email protected]>
1 parent 4e0cc84 commit 49b9d91

File tree

3 files changed

+473
-13
lines changed

3 files changed

+473
-13
lines changed
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
/**
2+
* Copyright (c) 2025 Vegard IT GmbH and others.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0,
7+
* or the Eclipse Distribution License v. 1.0 which is available at
8+
* http://www.eclipse.org/org/documents/edl-v10.php.
9+
*
10+
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
11+
*/
12+
package org.eclipse.lsp4j.jsonrpc;
13+
14+
import java.util.concurrent.CompletableFuture;
15+
import java.util.concurrent.CompletionStage;
16+
import java.util.concurrent.Executor;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import java.util.function.BiConsumer;
20+
import java.util.function.BiFunction;
21+
import java.util.function.Consumer;
22+
import java.util.function.Function;
23+
import java.util.function.Supplier;
24+
import java.util.logging.Level;
25+
import java.util.logging.Logger;
26+
27+
/**
28+
* A {@link CompletableFuture} representing an outbound JSON-RPC request.
29+
* <p>
30+
* Design highlights:
31+
* <ul>
32+
* <li>{@link #cancelRequest()} sends the LSP <code>$/cancelRequest</code> exactly once for the
33+
* entire request chain and does not cancel any future.</li>
34+
* <li>{@link #getRoot()} returns the root request future; calling {@link #cancel(boolean)} on the
35+
* root ensures {@link #cancelRequest()} is invoked exactly once.</li>
36+
* <li>{@link #cancel(boolean)} on dependent stages cancels only that stage and does not send.</li>
37+
* <li>Protocol cancel emission is deduplicated across the chain under concurrency and skipped if the
38+
* request is already completed.</li>
39+
* </ul>
40+
*/
41+
@SuppressWarnings("unchecked")
42+
public final class JsonRpcRequestFuture<T> extends CompletableFuture<T> {
43+
44+
private static final Logger LOG = Logger.getLogger(JsonRpcRequestFuture.class.getName());
45+
46+
private final Runnable cancelAction;
47+
private final AtomicBoolean cancelSent;
48+
private final JsonRpcRequestFuture<?> root;
49+
50+
public JsonRpcRequestFuture(final Runnable cancelAction) {
51+
root = this;
52+
this.cancelAction = cancelAction;
53+
cancelSent = new AtomicBoolean(false);
54+
}
55+
56+
private JsonRpcRequestFuture(final JsonRpcRequestFuture<?> root) {
57+
this.root = root;
58+
cancelAction = null;
59+
cancelSent = null;
60+
}
61+
62+
@Override
63+
public JsonRpcRequestFuture<Void> acceptEither(final CompletionStage<? extends T> other, final Consumer<? super T> action) {
64+
return (JsonRpcRequestFuture<Void>) super.acceptEither(other, action);
65+
}
66+
67+
@Override
68+
public JsonRpcRequestFuture<Void> acceptEitherAsync(final CompletionStage<? extends T> other, final Consumer<? super T> action) {
69+
return (JsonRpcRequestFuture<Void>) super.acceptEitherAsync(other, action);
70+
}
71+
72+
@Override
73+
public JsonRpcRequestFuture<Void> acceptEitherAsync(final CompletionStage<? extends T> other, final Consumer<? super T> action,
74+
final Executor executor) {
75+
return (JsonRpcRequestFuture<Void>) super.acceptEitherAsync(other, action, executor);
76+
}
77+
78+
@Override
79+
public <U> JsonRpcRequestFuture<U> applyToEither(final CompletionStage<? extends T> other, final Function<? super T, U> fn) {
80+
return (JsonRpcRequestFuture<U>) super.applyToEither(other, fn);
81+
}
82+
83+
@Override
84+
public <U> JsonRpcRequestFuture<U> applyToEitherAsync(final CompletionStage<? extends T> other, final Function<? super T, U> fn) {
85+
return (JsonRpcRequestFuture<U>) super.applyToEitherAsync(other, fn);
86+
}
87+
88+
@Override
89+
public <U> JsonRpcRequestFuture<U> applyToEitherAsync(final CompletionStage<? extends T> other, final Function<? super T, U> fn,
90+
final Executor executor) {
91+
return (JsonRpcRequestFuture<U>) super.applyToEitherAsync(other, fn, executor);
92+
}
93+
94+
@Override
95+
public boolean cancel(final boolean mayInterruptIfRunning) {
96+
// Root cancel ensures cancelRequest() behavior once before local cancellation
97+
if (root == this) {
98+
try {
99+
cancelRequest();
100+
} catch (final RuntimeException ex) {
101+
// catching potential exception to ensure local cancel still proceeds
102+
LOG.log(Level.WARNING, ex.getMessage(), ex);
103+
}
104+
}
105+
return super.cancel(mayInterruptIfRunning);
106+
}
107+
108+
/**
109+
* Returns the root future for this request chain. For the root, this returns {@code this}.
110+
*/
111+
public JsonRpcRequestFuture<?> getRoot() {
112+
return root;
113+
}
114+
115+
/**
116+
* Sends the protocol cancel (<code>$/cancelRequest</code>) exactly once for this request chain.
117+
* Does not cancel any future locally. Returns {@code true} if the notification was sent by this call,
118+
* or {@code false} if it was already sent before or the request is already completed.
119+
*/
120+
public boolean cancelRequest() {
121+
final JsonRpcRequestFuture<?> root = getRoot();
122+
if (root.isDone()) {
123+
return false;
124+
}
125+
if (root.cancelSent.compareAndSet(false, true)) {
126+
root.cancelAction.run();
127+
return true;
128+
}
129+
return false;
130+
}
131+
132+
@Override
133+
public JsonRpcRequestFuture<T> completeAsync(final Supplier<? extends T> supplier) {
134+
return (JsonRpcRequestFuture<T>) super.completeAsync(supplier);
135+
}
136+
137+
@Override
138+
public JsonRpcRequestFuture<T> completeAsync(final Supplier<? extends T> supplier, final Executor executor) {
139+
return (JsonRpcRequestFuture<T>) super.completeAsync(supplier, executor);
140+
}
141+
142+
@Override
143+
public JsonRpcRequestFuture<T> completeOnTimeout(final T value, final long timeout, final TimeUnit unit) {
144+
return (JsonRpcRequestFuture<T>) super.completeOnTimeout(value, timeout, unit);
145+
}
146+
147+
@Override
148+
public JsonRpcRequestFuture<T> copy() {
149+
return (JsonRpcRequestFuture<T>) super.copy();
150+
}
151+
152+
@Override
153+
public JsonRpcRequestFuture<T> exceptionally(final Function<Throwable, ? extends T> fn) {
154+
return (JsonRpcRequestFuture<T>) super.exceptionally(fn);
155+
}
156+
157+
@Override
158+
public <U> JsonRpcRequestFuture<U> handle(final BiFunction<? super T, Throwable, ? extends U> fn) {
159+
return (JsonRpcRequestFuture<U>) super.handle(fn);
160+
}
161+
162+
@Override
163+
public <U> JsonRpcRequestFuture<U> handleAsync(final BiFunction<? super T, Throwable, ? extends U> fn) {
164+
return (JsonRpcRequestFuture<U>) super.handleAsync(fn);
165+
}
166+
167+
@Override
168+
public <U> JsonRpcRequestFuture<U> handleAsync(final BiFunction<? super T, Throwable, ? extends U> fn, final Executor executor) {
169+
return (JsonRpcRequestFuture<U>) super.handleAsync(fn, executor);
170+
}
171+
172+
@Override
173+
public JsonRpcRequestFuture<T> newIncompleteFuture() {
174+
// Dependent stages share the same cancel state but are not roots
175+
return new JsonRpcRequestFuture<>(root);
176+
}
177+
178+
@Override
179+
public JsonRpcRequestFuture<T> orTimeout(final long timeout, final TimeUnit unit) {
180+
return (JsonRpcRequestFuture<T>) super.orTimeout(timeout, unit);
181+
}
182+
183+
@Override
184+
public JsonRpcRequestFuture<Void> runAfterBoth(final CompletionStage<?> other, final Runnable action) {
185+
return (JsonRpcRequestFuture<Void>) super.runAfterBoth(other, action);
186+
}
187+
188+
@Override
189+
public JsonRpcRequestFuture<Void> runAfterBothAsync(final CompletionStage<?> other, final Runnable action) {
190+
return (JsonRpcRequestFuture<Void>) super.runAfterBothAsync(other, action);
191+
}
192+
193+
@Override
194+
public JsonRpcRequestFuture<Void> runAfterBothAsync(final CompletionStage<?> other, final Runnable action, final Executor executor) {
195+
return (JsonRpcRequestFuture<Void>) super.runAfterBothAsync(other, action, executor);
196+
}
197+
198+
@Override
199+
public JsonRpcRequestFuture<Void> runAfterEither(final CompletionStage<?> other, final Runnable action) {
200+
return (JsonRpcRequestFuture<Void>) super.runAfterEither(other, action);
201+
}
202+
203+
@Override
204+
public JsonRpcRequestFuture<Void> runAfterEitherAsync(final CompletionStage<?> other, final Runnable action) {
205+
return (JsonRpcRequestFuture<Void>) super.runAfterEitherAsync(other, action);
206+
}
207+
208+
@Override
209+
public JsonRpcRequestFuture<Void> runAfterEitherAsync(final CompletionStage<?> other, final Runnable action, final Executor executor) {
210+
return (JsonRpcRequestFuture<Void>) super.runAfterEitherAsync(other, action, executor);
211+
}
212+
213+
@Override
214+
public JsonRpcRequestFuture<Void> thenAccept(final Consumer<? super T> action) {
215+
return (JsonRpcRequestFuture<Void>) super.thenAccept(action);
216+
}
217+
218+
@Override
219+
public JsonRpcRequestFuture<Void> thenAcceptAsync(final Consumer<? super T> action) {
220+
return (JsonRpcRequestFuture<Void>) super.thenAcceptAsync(action);
221+
}
222+
223+
@Override
224+
public JsonRpcRequestFuture<Void> thenAcceptAsync(final Consumer<? super T> action, final Executor executor) {
225+
return (JsonRpcRequestFuture<Void>) super.thenAcceptAsync(action, executor);
226+
}
227+
228+
@Override
229+
public <U> JsonRpcRequestFuture<Void> thenAcceptBoth(final CompletionStage<? extends U> other,
230+
final BiConsumer<? super T, ? super U> action) {
231+
return (JsonRpcRequestFuture<Void>) super.thenAcceptBoth(other, action);
232+
}
233+
234+
@Override
235+
public <U> JsonRpcRequestFuture<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
236+
final BiConsumer<? super T, ? super U> action) {
237+
return (JsonRpcRequestFuture<Void>) super.thenAcceptBothAsync(other, action);
238+
}
239+
240+
@Override
241+
public <U> JsonRpcRequestFuture<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
242+
final BiConsumer<? super T, ? super U> action, final Executor executor) {
243+
return (JsonRpcRequestFuture<Void>) super.thenAcceptBothAsync(other, action, executor);
244+
}
245+
246+
@Override
247+
public <U> JsonRpcRequestFuture<U> thenApply(final Function<? super T, ? extends U> fn) {
248+
return (JsonRpcRequestFuture<U>) super.thenApply(fn);
249+
}
250+
251+
@Override
252+
public <U> JsonRpcRequestFuture<U> thenApplyAsync(final Function<? super T, ? extends U> fn) {
253+
return (JsonRpcRequestFuture<U>) super.thenApplyAsync(fn);
254+
}
255+
256+
@Override
257+
public <U> JsonRpcRequestFuture<U> thenApplyAsync(final Function<? super T, ? extends U> fn, final Executor executor) {
258+
return (JsonRpcRequestFuture<U>) super.thenApplyAsync(fn, executor);
259+
}
260+
261+
@Override
262+
public <U, V> JsonRpcRequestFuture<V> thenCombine(final CompletionStage<? extends U> other,
263+
final BiFunction<? super T, ? super U, ? extends V> fn) {
264+
return (JsonRpcRequestFuture<V>) super.thenCombine(other, fn);
265+
}
266+
267+
@Override
268+
public <U, V> JsonRpcRequestFuture<V> thenCombineAsync(final CompletionStage<? extends U> other,
269+
final BiFunction<? super T, ? super U, ? extends V> fn) {
270+
return (JsonRpcRequestFuture<V>) super.thenCombineAsync(other, fn);
271+
}
272+
273+
@Override
274+
public <U, V> JsonRpcRequestFuture<V> thenCombineAsync(final CompletionStage<? extends U> other,
275+
final BiFunction<? super T, ? super U, ? extends V> fn, final Executor executor) {
276+
return (JsonRpcRequestFuture<V>) super.thenCombineAsync(other, fn, executor);
277+
}
278+
279+
@Override
280+
public <U> JsonRpcRequestFuture<U> thenCompose(final Function<? super T, ? extends CompletionStage<U>> fn) {
281+
return (JsonRpcRequestFuture<U>) super.thenCompose(fn);
282+
}
283+
284+
@Override
285+
public <U> JsonRpcRequestFuture<U> thenComposeAsync(final Function<? super T, ? extends CompletionStage<U>> fn) {
286+
return (JsonRpcRequestFuture<U>) super.thenComposeAsync(fn);
287+
}
288+
289+
@Override
290+
public <U> JsonRpcRequestFuture<U> thenComposeAsync(final Function<? super T, ? extends CompletionStage<U>> fn,
291+
final Executor executor) {
292+
return (JsonRpcRequestFuture<U>) super.thenComposeAsync(fn, executor);
293+
}
294+
295+
@Override
296+
public JsonRpcRequestFuture<Void> thenRun(final Runnable action) {
297+
return (JsonRpcRequestFuture<Void>) super.thenRun(action);
298+
}
299+
300+
@Override
301+
public JsonRpcRequestFuture<Void> thenRunAsync(final Runnable action) {
302+
return (JsonRpcRequestFuture<Void>) super.thenRunAsync(action);
303+
}
304+
305+
@Override
306+
public JsonRpcRequestFuture<Void> thenRunAsync(final Runnable action, final Executor executor) {
307+
return (JsonRpcRequestFuture<Void>) super.thenRunAsync(action, executor);
308+
}
309+
310+
@Override
311+
public JsonRpcRequestFuture<T> toCompletableFuture() {
312+
return (JsonRpcRequestFuture<T>) super.toCompletableFuture();
313+
}
314+
315+
@Override
316+
public JsonRpcRequestFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> action) {
317+
return (JsonRpcRequestFuture<T>) super.whenComplete(action);
318+
}
319+
320+
@Override
321+
public JsonRpcRequestFuture<T> whenCompleteAsync(final BiConsumer<? super T, ? super Throwable> action) {
322+
return (JsonRpcRequestFuture<T>) super.whenCompleteAsync(action);
323+
}
324+
325+
@Override
326+
public JsonRpcRequestFuture<T> whenCompleteAsync(final BiConsumer<? super T, ? super Throwable> action, final Executor executor) {
327+
return (JsonRpcRequestFuture<T>) super.whenCompleteAsync(action, executor);
328+
}
329+
}

org.eclipse.lsp4j.jsonrpc/src/main/java/org/eclipse/lsp4j/jsonrpc/RemoteEndpoint.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,9 @@ protected NotificationMessage createNotificationMessage(String method, Object pa
152152
* Send a request to the remote endpoint.
153153
*/
154154
@Override
155-
public CompletableFuture<Object> request(String method, Object parameter) {
155+
public JsonRpcRequestFuture<Object> request(String method, Object parameter) {
156156
final RequestMessage requestMessage = createRequestMessage(method, parameter);
157-
final var result = new CompletableFuture<>() {
158-
@Override
159-
public boolean cancel(boolean mayInterruptIfRunning) {
160-
sendCancelNotification(requestMessage.getRawId());
161-
return super.cancel(mayInterruptIfRunning);
162-
}
163-
};
157+
final var result = new JsonRpcRequestFuture<>(() -> sendCancelNotification(requestMessage.getRawId()));
164158
synchronized(sentRequestMap) {
165159
// Store request information so it can be handled when the response is received
166160
sentRequestMap.put(requestMessage.getId(), new PendingRequestInfo(requestMessage, result));

0 commit comments

Comments
 (0)