Skip to content

Commit f082ae9

Browse files
committed
Support trace context propagation on ListenableFuture returned by submitListenable.
1 parent 5a11a13 commit f082ae9

File tree

5 files changed

+420
-6
lines changed

5 files changed

+420
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Copyright 2017-2020 The OpenTracing Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License
10+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+
* or implied. See the License for the specific language governing permissions and limitations under
12+
* the License.
13+
*/
14+
package io.opentracing.contrib.spring.cloud.async.instrument;
15+
16+
import io.opentracing.Span;
17+
import io.opentracing.Tracer;
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.ExecutionException;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.TimeoutException;
22+
23+
import org.springframework.util.concurrent.FailureCallback;
24+
import org.springframework.util.concurrent.ListenableFuture;
25+
import org.springframework.util.concurrent.ListenableFutureCallback;
26+
import org.springframework.util.concurrent.SuccessCallback;
27+
28+
/**
29+
* @author MiNG
30+
*/
31+
public class TracedListenableFuture<T> implements ListenableFuture<T> {
32+
33+
private final ListenableFuture<T> delegate;
34+
private final Tracer tracer;
35+
private final Span span;
36+
37+
public TracedListenableFuture(ListenableFuture<T> delegate, Tracer tracer) {
38+
this(delegate, tracer, tracer.activeSpan());
39+
}
40+
41+
public TracedListenableFuture(ListenableFuture<T> delegate, Tracer tracer, Span span) {
42+
this.delegate = delegate;
43+
this.tracer = tracer;
44+
this.span = span;
45+
}
46+
47+
@Override
48+
public void addCallback(ListenableFutureCallback<? super T> callback) {
49+
delegate.addCallback(new TracedListenableFutureCallback<>(callback, tracer, span));
50+
}
51+
52+
@Override
53+
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
54+
delegate.addCallback(new TracedListenableFutureCallback<>(successCallback, failureCallback, tracer, span));
55+
}
56+
57+
@Override
58+
public CompletableFuture<T> completable() {
59+
return delegate.completable();
60+
}
61+
62+
@Override
63+
public boolean cancel(boolean mayInterruptIfRunning) {
64+
return delegate.cancel(mayInterruptIfRunning);
65+
}
66+
67+
@Override
68+
public boolean isCancelled() {
69+
return delegate.isCancelled();
70+
}
71+
72+
@Override
73+
public boolean isDone() {
74+
return delegate.isDone();
75+
}
76+
77+
@Override
78+
public T get() throws InterruptedException, ExecutionException {
79+
return delegate.get();
80+
}
81+
82+
@Override
83+
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
84+
return delegate.get(timeout, unit);
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Copyright 2017-2020 The OpenTracing Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License
10+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+
* or implied. See the License for the specific language governing permissions and limitations under
12+
* the License.
13+
*/
14+
package io.opentracing.contrib.spring.cloud.async.instrument;
15+
16+
import io.opentracing.Scope;
17+
import io.opentracing.Span;
18+
import io.opentracing.Tracer;
19+
import org.springframework.lang.Nullable;
20+
import org.springframework.util.Assert;
21+
import org.springframework.util.concurrent.FailureCallback;
22+
import org.springframework.util.concurrent.ListenableFutureCallback;
23+
import org.springframework.util.concurrent.SuccessCallback;
24+
25+
/**
26+
* @author MiNG
27+
*/
28+
public class TracedListenableFutureCallback<T> implements ListenableFutureCallback<T> {
29+
30+
private final SuccessCallback<T> successDelegate;
31+
private final FailureCallback failureDelegate;
32+
private final Tracer tracer;
33+
private final Span span;
34+
35+
public TracedListenableFutureCallback(ListenableFutureCallback<T> delegate, Tracer tracer) {
36+
this(delegate, delegate, tracer, tracer.activeSpan());
37+
}
38+
39+
public TracedListenableFutureCallback(ListenableFutureCallback<T> delegate, Tracer tracer, Span span) {
40+
this(delegate, delegate, tracer, span);
41+
}
42+
43+
public TracedListenableFutureCallback(SuccessCallback<T> successDelegate, FailureCallback failureDelegate, Tracer tracer) {
44+
this(successDelegate, failureDelegate, tracer, tracer.activeSpan());
45+
}
46+
47+
public TracedListenableFutureCallback(@Nullable SuccessCallback<T> successDelegate, @Nullable FailureCallback failureDelegate, Tracer tracer, Span span) {
48+
Assert.notNull(successDelegate, "'successDelegate' must not be null");
49+
Assert.notNull(failureDelegate, "'failureDelegate' must not be null");
50+
this.successDelegate = successDelegate;
51+
this.failureDelegate = failureDelegate;
52+
this.tracer = tracer;
53+
this.span = span;
54+
}
55+
56+
@Override
57+
public void onSuccess(T result) {
58+
try (Scope ignored = span == null ? null : tracer.scopeManager().activate(span)) {
59+
successDelegate.onSuccess(result);
60+
}
61+
}
62+
63+
@Override
64+
public void onFailure(Throwable ex) {
65+
try (Scope ignored = span == null ? null : tracer.scopeManager().activate(span)) {
66+
failureDelegate.onFailure(ex);
67+
}
68+
}
69+
}

instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskExecutor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017-2019 The OpenTracing Authors
2+
* Copyright 2017-2020 The OpenTracing Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
55
* in compliance with the License. You may obtain a copy of the License at
@@ -59,12 +59,12 @@ public <T> Future<T> submit(Callable<T> task) {
5959

6060
@Override
6161
public ListenableFuture<?> submitListenable(Runnable task) {
62-
return this.delegate.submitListenable(new TracedRunnable(task, tracer));
62+
return new TracedListenableFuture<>(this.delegate.submitListenable(new TracedRunnable(task, tracer)), tracer);
6363
}
6464

6565
@Override
6666
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
67-
return this.delegate.submitListenable(new TracedCallable<>(task, tracer));
67+
return new TracedListenableFuture<>(this.delegate.submitListenable(new TracedCallable<>(task, tracer)), tracer);
6868
}
6969

7070
@Override

instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskScheduler.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2017-2019 The OpenTracing Authors
2+
* Copyright 2017-2020 The OpenTracing Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
55
* in compliance with the License. You may obtain a copy of the License at
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ScheduledFuture;
2727
import java.util.concurrent.ScheduledThreadPoolExecutor;
2828
import java.util.concurrent.ThreadFactory;
29+
2930
import org.springframework.lang.Nullable;
3031
import org.springframework.scheduling.Trigger;
3132
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -113,12 +114,12 @@ public <T> Future<T> submit(Callable<T> task) {
113114

114115
@Override
115116
public ListenableFuture<?> submitListenable(Runnable task) {
116-
return delegate.submitListenable(new TracedRunnable(task, tracer));
117+
return new TracedListenableFuture<>(delegate.submitListenable(new TracedRunnable(task, tracer)), tracer);
117118
}
118119

119120
@Override
120121
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
121-
return delegate.submitListenable(new TracedCallable<>(task, tracer));
122+
return new TracedListenableFuture<>(delegate.submitListenable(new TracedCallable<>(task, tracer)), tracer);
122123
}
123124

124125
@Override

0 commit comments

Comments
 (0)