Skip to content

Commit dd2d410

Browse files
Merge pull request #922 from abersnaze/debug-updates
Changes made while integrating it with our internal system
2 parents 747f97e + 4258d77 commit dd2d410

File tree

5 files changed

+211
-100
lines changed

5 files changed

+211
-100
lines changed

rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,72 @@
44
import rx.Observer;
55
import rx.Subscriber;
66
import rx.functions.Action1;
7+
import rx.functions.Action2;
78
import rx.functions.Func1;
89
import rx.plugins.DebugNotification;
910

10-
public final class DebugSubscriber<T> extends Subscriber<T> {
11+
public final class DebugSubscriber<T, C> extends Subscriber<T> {
1112
private final Func1<T, T> onNextHook;
12-
final Action1<DebugNotification> events;
13-
final Observer<? super T> o;
14-
Operator<? extends T, ?> from = null;
15-
Operator<?, ? super T> to = null;
13+
private final Func1<DebugNotification, C> start;
14+
private final Action1<C> complete;
15+
private final Action2<C, Throwable> error;
16+
private final Observer<? super T> o;
17+
private Operator<? extends T, ?> from = null;
18+
private Operator<?, ? super T> to = null;
1619

1720
public DebugSubscriber(
1821
Func1<T, T> onNextHook,
19-
Action1<DebugNotification> _events,
22+
Func1<DebugNotification, C> start,
23+
Action1<C> complete,
24+
Action2<C, Throwable> error,
2025
Subscriber<? super T> _o,
2126
Operator<? extends T, ?> _out,
2227
Operator<?, ? super T> _in) {
2328
super(_o);
24-
this.events = _events;
29+
this.start = start;
30+
this.complete = complete;
31+
this.error = error;
2532
this.o = _o;
2633
this.onNextHook = onNextHook;
2734
this.from = _out;
2835
this.to = _in;
29-
this.add(new DebugSubscription<T>(this));
36+
this.add(new DebugSubscription<T, C>(this, start, complete, error));
3037
}
3138

3239
@Override
3340
public void onCompleted() {
34-
events.call(DebugNotification.createOnCompleted(o, from, to));
35-
o.onCompleted();
41+
final DebugNotification<T, C> n = DebugNotification.createOnCompleted(o, from, to);
42+
C context = start.call(n);
43+
try {
44+
o.onCompleted();
45+
complete.call(context);
46+
} catch (Throwable e) {
47+
error.call(context, e);
48+
}
3649
}
3750

3851
@Override
3952
public void onError(Throwable e) {
40-
events.call(DebugNotification.createOnError(o, from, e, to));
41-
o.onError(e);
53+
final DebugNotification<T, C> n = DebugNotification.createOnError(o, from, e, to);
54+
C context = start.call(n);
55+
try {
56+
o.onError(e);
57+
complete.call(context);
58+
} catch (Throwable e2) {
59+
error.call(context, e2);
60+
}
4261
}
4362

4463
@Override
4564
public void onNext(T t) {
46-
events.call(DebugNotification.createOnNext(o, from, t, to));
47-
o.onNext(onNextHook.call(t));
65+
final DebugNotification<T, C> n = DebugNotification.createOnNext(o, from, t, to);
66+
C context = start.call(n);
67+
try {
68+
o.onNext(onNextHook.call(t));
69+
complete.call(context);
70+
} catch (Throwable e) {
71+
error.call(context, e);
72+
}
4873
}
4974

5075
public Operator<? extends T, ?> getFrom() {

rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,34 @@
11
package rx.operators;
22

33
import rx.Subscription;
4+
import rx.functions.Action1;
5+
import rx.functions.Action2;
6+
import rx.functions.Func1;
47
import rx.plugins.DebugNotification;
58

6-
final class DebugSubscription<T> implements Subscription {
7-
private final DebugSubscriber<T> debugObserver;
9+
final class DebugSubscription<T, C> implements Subscription {
10+
private final DebugSubscriber<T, C> debugObserver;
11+
private final Func1<DebugNotification, C> start;
12+
private final Action1<C> complete;
13+
private final Action2<C, Throwable> error;
814

9-
DebugSubscription(DebugSubscriber<T> debugObserver) {
15+
DebugSubscription(DebugSubscriber<T, C> debugObserver, Func1<DebugNotification, C> start, Action1<C> complete, Action2<C, Throwable> error) {
1016
this.debugObserver = debugObserver;
17+
this.start = start;
18+
this.complete = complete;
19+
this.error = error;
1120
}
1221

1322
@Override
1423
public void unsubscribe() {
15-
debugObserver.events.call(DebugNotification.<T> createUnsubscribe(debugObserver.o, debugObserver.from, debugObserver.to));
16-
debugObserver.unsubscribe();
24+
final DebugNotification<T, C> n = DebugNotification.<T, C> createUnsubscribe(debugObserver.getActual(), debugObserver.getFrom(), debugObserver.getTo());
25+
C context = start.call(n);
26+
try {
27+
debugObserver.unsubscribe();
28+
complete.call(context);
29+
} catch (Throwable e) {
30+
error.call(context, e);
31+
}
1732
}
1833

1934
@Override

rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import rx.Subscriber;
77
import rx.Subscription;
88
import rx.functions.Action1;
9+
import rx.functions.Action2;
910
import rx.functions.Actions;
1011
import rx.functions.Func1;
1112
import rx.functions.Functions;
@@ -17,9 +18,11 @@
1718
*
1819
* @author gscampbell
1920
*/
20-
public class DebugHook extends RxJavaObservableExecutionHook {
21+
public class DebugHook<C> extends RxJavaObservableExecutionHook {
2122
private final Func1 onNextHook;
22-
private final Action1<DebugNotification> events;
23+
private final Func1<DebugNotification, C> start;
24+
private final Action1<C> complete;
25+
private final Action2<C, Throwable> error;
2326

2427
/**
2528
* Creates a new instance of the DebugHook RxJava plug-in that can be passed into
@@ -31,18 +34,26 @@ public class DebugHook extends RxJavaObservableExecutionHook {
3134
* @param events
3235
* This action is invoked as each notification is generated
3336
*/
34-
public DebugHook(Func1 onNextDataHook, Action1<DebugNotification> events) {
37+
public DebugHook(Func1 onNextDataHook, Func1<DebugNotification, C> start, Action1<C> complete, Action2<C, Throwable> error) {
38+
this.complete = complete;
39+
this.error = error;
3540
this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook;
36-
this.events = events == null ? Actions.empty() : events;
41+
this.start = (Func1<DebugNotification, C>) (start == null ? Actions.empty() : start);
3742
}
3843

3944
@Override
40-
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
45+
public <T> OnSubscribe<T> onSubscribeStart(final Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
4146
return new OnSubscribe<T>() {
4247
@Override
4348
public void call(Subscriber<? super T> o) {
44-
events.call(DebugNotification.createSubscribe(o, f));
45-
f.call(wrapOutbound(null, o));
49+
C context = start.call(DebugNotification.createSubscribe(o, observableInstance, f));
50+
try {
51+
f.call(wrapOutbound(null, o));
52+
complete.call(context);
53+
}
54+
catch(Throwable e) {
55+
error.call(context, e);
56+
}
4657
}
4758
};
4859
}
@@ -54,12 +65,7 @@ public <T> Subscription onSubscribeReturn(Observable<? extends T> observableInst
5465

5566
@Override
5667
public <T> OnSubscribe<T> onCreate(final OnSubscribe<T> f) {
57-
return new OnSubscribe<T>() {
58-
@Override
59-
public void call(Subscriber<? super T> o) {
60-
f.call(wrapInbound(null, o));
61-
}
62-
};
68+
return new OnCreateWrapper<T>(f);
6369
}
6470

6571
@Override
@@ -81,19 +87,36 @@ public <T> Subscription onAdd(Subscriber<T> subscriber, Subscription s) {
8187
private <R> Subscriber<? super R> wrapOutbound(Operator<? extends R, ?> bind, Subscriber<? super R> o) {
8288
if (o instanceof DebugSubscriber) {
8389
if (bind != null)
84-
((DebugSubscriber<R>) o).setFrom(bind);
90+
((DebugSubscriber<R, C>) o).setFrom(bind);
8591
return o;
8692
}
87-
return new DebugSubscriber<R>(onNextHook, events, o, bind, null);
93+
return new DebugSubscriber<R, C>(onNextHook, start, complete, error, o, bind, null);
8894
}
8995

9096
@SuppressWarnings("unchecked")
9197
private <T> Subscriber<? super T> wrapInbound(Operator<?, ? super T> bind, Subscriber<? super T> o) {
9298
if (o instanceof DebugSubscriber) {
9399
if (bind != null)
94-
((DebugSubscriber<T>) o).setTo(bind);
100+
((DebugSubscriber<T, C>) o).setTo(bind);
95101
return o;
96102
}
97-
return new DebugSubscriber<T>(onNextHook, events, o, null, bind);
103+
return new DebugSubscriber<T, C>(onNextHook, start, complete, error, o, null, bind);
104+
}
105+
106+
public final class OnCreateWrapper<T> implements OnSubscribe<T> {
107+
private final OnSubscribe<T> f;
108+
109+
private OnCreateWrapper(OnSubscribe<T> f) {
110+
this.f = f;
111+
}
112+
113+
@Override
114+
public void call(Subscriber<? super T> o) {
115+
f.call(wrapInbound(null, o));
116+
}
117+
118+
public OnSubscribe<T> getActual() {
119+
return f;
120+
}
98121
}
99122
}

rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,103 +1,119 @@
11
package rx.plugins;
22

3-
import rx.Notification;
3+
import rx.Observable;
44
import rx.Observable.OnSubscribe;
55
import rx.Observable.Operator;
66
import rx.Observer;
77
import rx.observers.SafeSubscriber;
88
import rx.operators.DebugSubscriber;
99

10-
public class DebugNotification<T> {
10+
public class DebugNotification<T, C> {
1111
public static enum Kind {
1212
OnNext, OnError, OnCompleted, Subscribe, Unsubscribe
1313
}
1414

15-
private final OnSubscribe<T> source;
15+
private final Observable<? extends T> source;
16+
private final OnSubscribe<T> sourceFunc;
1617
private final Operator<? extends T, ?> from;
1718
private final Kind kind;
18-
private final Notification<T> notification;
1919
private final Operator<?, ? super T> to;
20-
private final long nanoTime;
21-
private final long threadId;
22-
private Observer o;
20+
private final Throwable throwable;
21+
private final T value;
22+
private final Observer observer;
2323

24-
public static <T> DebugNotification<T> createSubscribe(Observer<? super T> o, OnSubscribe<T> source) {
24+
public static <T, C> DebugNotification<T, C> createSubscribe(Observer<? super T> o, Observable<? extends T> source, OnSubscribe<T> sourceFunc) {
2525
Operator<?, ? super T> to = null;
2626
Operator<? extends T, ?> from = null;
27+
if (o instanceof SafeSubscriber) {
28+
o = ((SafeSubscriber) o).getActual();
29+
}
2730
if (o instanceof DebugSubscriber) {
28-
to = ((DebugSubscriber<T>) o).getTo();
29-
from = ((DebugSubscriber<T>) o).getFrom();
31+
to = ((DebugSubscriber<T, C>) o).getTo();
32+
from = ((DebugSubscriber<T, C>) o).getFrom();
3033
o = ((DebugSubscriber) o).getActual();
3134
}
32-
return new DebugNotification<T>(o, from, Kind.Subscribe, null, to, source);
35+
if (sourceFunc instanceof DebugHook.OnCreateWrapper) {
36+
sourceFunc = ((DebugHook.OnCreateWrapper) sourceFunc).getActual();
37+
}
38+
return new DebugNotification<T, C>(o, from, Kind.Subscribe, null, null, to, source, sourceFunc);
3339
}
3440

35-
public static <T> DebugNotification<T> createOnNext(Observer<? super T> o, Operator<? extends T, ?> from, T t, Operator<?, ? super T> to) {
36-
return new DebugNotification<T>(o, from, Kind.OnNext, Notification.createOnNext(t), to, null);
41+
public static <T, C> DebugNotification<T, C> createOnNext(Observer<? super T> o, Operator<? extends T, ?> from, T t, Operator<?, ? super T> to) {
42+
return new DebugNotification<T, C>(o, from, Kind.OnNext, t, null, to, null, null);
3743
}
3844

39-
public static <T> DebugNotification<T> createOnError(Observer<? super T> o, Operator<? extends T, ?> from, Throwable e, Operator<?, ? super T> to) {
40-
return new DebugNotification<T>(o, from, Kind.OnError, Notification.<T> createOnError(e), to, null);
45+
public static <T, C> DebugNotification<T, C> createOnError(Observer<? super T> o, Operator<? extends T, ?> from, Throwable e, Operator<?, ? super T> to) {
46+
return new DebugNotification<T, C>(o, from, Kind.OnError, null, e, to, null, null);
4147
}
4248

43-
public static <T> DebugNotification<T> createOnCompleted(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
44-
return new DebugNotification<T>(o, from, Kind.OnCompleted, Notification.<T> createOnCompleted(), to, null);
49+
public static <T, C> DebugNotification<T, C> createOnCompleted(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
50+
return new DebugNotification<T, C>(o, from, Kind.OnCompleted, null, null, to, null, null);
4551
}
4652

47-
public static <T> DebugNotification<T> createUnsubscribe(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
48-
return new DebugNotification<T>(o, from, Kind.Unsubscribe, null, to, null);
53+
public static <T, C> DebugNotification<T, C> createUnsubscribe(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
54+
return new DebugNotification<T, C>(o, from, Kind.Unsubscribe, null, null, to, null, null);
4955
}
5056

51-
private DebugNotification(Observer o, Operator<? extends T, ?> from, Kind kind, Notification<T> notification, Operator<?, ? super T> to, OnSubscribe<T> source) {
52-
this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
57+
private DebugNotification(Observer o, Operator<? extends T, ?> from, Kind kind, T value, Throwable throwable, Operator<?, ? super T> to, Observable<? extends T> source, OnSubscribe<T> sourceFunc) {
58+
this.observer = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
5359
this.from = from;
5460
this.kind = kind;
55-
this.notification = notification;
61+
this.value = value;
62+
this.throwable = throwable;
5663
this.to = to;
5764
this.source = source;
58-
this.nanoTime = System.nanoTime();
59-
this.threadId = Thread.currentThread().getId();
65+
this.sourceFunc = sourceFunc;
66+
}
67+
68+
public Observer getObserver() {
69+
return observer;
6070
}
6171

6272
public Operator<? extends T, ?> getFrom() {
6373
return from;
6474
}
6575

66-
public Notification<T> getNotification() {
67-
return notification;
76+
public T getValue() {
77+
return value;
6878
}
6979

70-
public Operator<?, ? super T> getTo() {
71-
return to;
80+
public Throwable getThrowable() {
81+
return throwable;
7282
}
7383

74-
public long getNanoTime() {
75-
return nanoTime;
76-
}
77-
78-
public long getThreadId() {
79-
return threadId;
84+
public Operator<?, ? super T> getTo() {
85+
return to;
8086
}
8187

8288
public Kind getKind() {
8389
return kind;
8490
}
91+
92+
public Observable<? extends T> getSource() {
93+
return source;
94+
}
95+
96+
public OnSubscribe<T> getSourceFunc() {
97+
return sourceFunc;
98+
}
8599

86100
@Override
101+
/**
102+
* Does a very bad job of making JSON like string.
103+
*/
87104
public String toString() {
88105
final StringBuilder s = new StringBuilder("{");
89-
s.append(" \"nano\": ").append(nanoTime);
90-
s.append(", \"thread\": ").append(threadId);
91-
s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\"");
106+
s.append("\"observer\": \"").append(observer.getClass().getName()).append("@").append(Integer.toHexString(observer.hashCode())).append("\"");
92107
s.append(", \"type\": \"").append(kind).append("\"");
93-
if (notification != null) {
94-
if (notification.hasValue())
95-
s.append(", \"value\": \"").append(notification.getValue()).append("\"");
96-
if (notification.hasThrowable())
97-
s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
98-
}
108+
if (kind == Kind.OnNext)
109+
// not json safe
110+
s.append(", \"value\": \"").append(value).append("\"");
111+
if (kind == Kind.OnError)
112+
s.append(", \"exception\": \"").append(throwable.getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
99113
if (source != null)
100114
s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\"");
115+
if (sourceFunc != null)
116+
s.append(", \"sourceFunc\": \"").append(sourceFunc.getClass().getName()).append("@").append(Integer.toHexString(sourceFunc.hashCode())).append("\"");
101117
if (from != null)
102118
s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\"");
103119
if (to != null)

0 commit comments

Comments
 (0)