Skip to content

Commit c1bd534

Browse files
Merge pull request #924 from benjchristensen/lift-error-handling
Localized Operator Error Handling
2 parents dd2d410 + f18e4d2 commit c1bd534

File tree

4 files changed

+111
-5
lines changed

4 files changed

+111
-5
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,17 @@ public <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
266266
return new Observable<R>(new OnSubscribe<R>() {
267267
@Override
268268
public void call(Subscriber<? super R> o) {
269-
f.call(hook.onLift(lift).call(o));
269+
try {
270+
f.call(hook.onLift(lift).call(o));
271+
} catch (Throwable e) {
272+
// localized capture of errors rather than it skipping all operators
273+
// and ending up in the try/catch of the subscribe method which then
274+
// prevents onErrorResumeNext and other similar approaches to error handling
275+
if (e instanceof OnErrorNotImplementedException) {
276+
throw (OnErrorNotImplementedException) e;
277+
}
278+
o.onError(e);
279+
}
270280
}
271281
});
272282
}

rxjava-core/src/main/java/rx/exceptions/OnErrorThrowable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public static OnErrorThrowable from(Throwable t) {
5959
* @return Throwable e passed in
6060
*/
6161
public static Throwable addValueAsLastCause(Throwable e, Object value) {
62+
Throwable lastCause = Exceptions.getFinalCause(e);
63+
if (lastCause != null && lastCause instanceof OnNextValue) {
64+
// purposefully using == for object reference check
65+
if (((OnNextValue) lastCause).getValue() == value) {
66+
// don't add another
67+
return e;
68+
}
69+
}
6270
Exceptions.addCause(e, new OnNextValue(value));
6371
return e;
6472
}

rxjava-core/src/test/java/rx/operators/OperatorMapTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import rx.Observable;
3030
import rx.Observer;
3131
import rx.Subscriber;
32+
import rx.exceptions.OnErrorNotImplementedException;
3233
import rx.functions.Action1;
3334
import rx.functions.Func1;
3435
import rx.functions.Func2;
@@ -171,7 +172,7 @@ public String call(String s) {
171172
public void call(Throwable t1) {
172173
t1.printStackTrace();
173174
}
174-
175+
175176
});
176177

177178
m.subscribe(stringObserver);
@@ -255,7 +256,7 @@ public Integer call(Integer i) {
255256
}).toBlockingObservable().single();
256257
}
257258

258-
@Test(expected = RuntimeException.class)
259+
@Test(expected = OnErrorNotImplementedException.class)
259260
public void verifyExceptionIsThrownIfThereIsNoExceptionHandler() {
260261

261262
Observable.OnSubscribe<Object> creator = new Observable.OnSubscribe<Object>() {
@@ -273,7 +274,6 @@ public void call(Subscriber<? super Object> observer) {
273274

274275
@Override
275276
public Observable<Object> call(Object object) {
276-
277277
return Observable.from(object);
278278
}
279279
};
@@ -299,7 +299,12 @@ public void call(Object object) {
299299
}
300300
};
301301

302-
Observable.create(creator).flatMap(manyMapper).map(mapper).subscribe(onNext);
302+
try {
303+
Observable.create(creator).flatMap(manyMapper).map(mapper).subscribe(onNext);
304+
} catch (RuntimeException e) {
305+
e.printStackTrace();
306+
throw e;
307+
}
303308
}
304309

305310
private static Map<String, String> getMap(String prefix) {

rxjava-core/src/test/java/rx/operators/OperatorOnErrorResumeNextViaFunctionTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import org.mockito.Mockito;
2727

2828
import rx.Observable;
29+
import rx.Observable.Operator;
2930
import rx.Observer;
31+
import rx.Subscriber;
3032
import rx.Subscription;
3133
import rx.functions.Func1;
3234
import rx.observers.TestSubscriber;
@@ -143,6 +145,87 @@ public Observable<String> call(Throwable t1) {
143145
verify(observer, times(0)).onCompleted();
144146
}
145147

148+
/**
149+
* Test that we receive the onError if an exception is thrown from an operator that
150+
* does not have manual try/catch handling like map does.
151+
*/
152+
@Test
153+
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperator() {
154+
TestSubscriber<String> ts = new TestSubscriber<String>();
155+
Observable.from(1).lift(new Operator<String, Integer>() {
156+
157+
@Override
158+
public Subscriber<? super Integer> call(Subscriber<? super String> t1) {
159+
throw new RuntimeException("failed");
160+
}
161+
162+
}).onErrorResumeNext(new Func1<Throwable, Observable<String>>() {
163+
164+
@Override
165+
public Observable<String> call(Throwable t1) {
166+
if (t1.getMessage().equals("failed")) {
167+
return Observable.from("success");
168+
} else {
169+
return Observable.error(t1);
170+
}
171+
}
172+
173+
}).subscribe(ts);
174+
175+
ts.assertTerminalEvent();
176+
System.out.println(ts.getOnNextEvents());
177+
ts.assertReceivedOnNext(Arrays.asList("success"));
178+
}
179+
180+
/**
181+
* Test that we receive the onError if an exception is thrown from an operator that
182+
* does not have manual try/catch handling like map does.
183+
*/
184+
@Test
185+
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperatorOnNext() {
186+
TestSubscriber<String> ts = new TestSubscriber<String>();
187+
Observable.from(1).lift(new Operator<String, Integer>() {
188+
189+
@Override
190+
public Subscriber<? super Integer> call(Subscriber<? super String> t1) {
191+
return new Subscriber<Integer>() {
192+
193+
@Override
194+
public void onCompleted() {
195+
throw new RuntimeException("failed");
196+
}
197+
198+
@Override
199+
public void onError(Throwable e) {
200+
throw new RuntimeException("failed");
201+
}
202+
203+
@Override
204+
public void onNext(Integer t) {
205+
throw new RuntimeException("failed");
206+
}
207+
208+
};
209+
}
210+
211+
}).onErrorResumeNext(new Func1<Throwable, Observable<String>>() {
212+
213+
@Override
214+
public Observable<String> call(Throwable t1) {
215+
if (t1.getMessage().equals("failed")) {
216+
return Observable.from("success");
217+
} else {
218+
return Observable.error(t1);
219+
}
220+
}
221+
222+
}).subscribe(ts);
223+
224+
ts.assertTerminalEvent();
225+
System.out.println(ts.getOnNextEvents());
226+
ts.assertReceivedOnNext(Arrays.asList("success"));
227+
}
228+
146229
private static class TestObservable implements Observable.OnSubscribeFunc<String> {
147230

148231
final Subscription s;

0 commit comments

Comments
 (0)