Skip to content

Commit d78add3

Browse files
Merge pull request #481 from zsxwing/using
Implement the 'Using' operator
2 parents ba1c4e8 + 2eded0a commit d78add3

File tree

3 files changed

+294
-0
lines changed

3 files changed

+294
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

+16
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import rx.operators.OperationToObservableIterable;
8787
import rx.operators.OperationToObservableList;
8888
import rx.operators.OperationToObservableSortedList;
89+
import rx.operators.OperationUsing;
8990
import rx.operators.OperationWindow;
9091
import rx.operators.OperationZip;
9192
import rx.operators.SafeObservableSubscription;
@@ -4774,6 +4775,21 @@ public Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
47744775
return create(OperationTimeInterval.timeInterval(this, scheduler));
47754776
}
47764777

4778+
/**
4779+
* Constructs an observable sequence that depends on a resource object.
4780+
*
4781+
* @param resourceFactory
4782+
* The factory function to obtain a resource object.
4783+
* @param observableFactory
4784+
* The factory function to obtain an observable sequence that depends on the obtained resource.
4785+
* @return
4786+
* The observable sequence whose lifetime controls the lifetime of the dependent resource object.
4787+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229585(v=vs.103).aspx">MSDN: Observable.Using</a>
4788+
*/
4789+
public static <T, RESOURCE extends Subscription> Observable<T> using(Func0<RESOURCE> resourceFactory, Func1<RESOURCE, Observable<T>> observableFactory) {
4790+
return create(OperationUsing.using(resourceFactory, observableFactory));
4791+
}
4792+
47774793
/**
47784794
* Propagates the observable sequence that reacts first.
47794795
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.OnSubscribeFunc;
20+
import rx.Observer;
21+
import rx.Subscription;
22+
import rx.subscriptions.CompositeSubscription;
23+
import rx.subscriptions.Subscriptions;
24+
import rx.util.functions.Func0;
25+
import rx.util.functions.Func1;
26+
27+
/**
28+
* Constructs an observable sequence that depends on a resource object.
29+
*/
30+
public class OperationUsing {
31+
32+
public static <T, RESOURCE extends Subscription> OnSubscribeFunc<T> using(
33+
final Func0<RESOURCE> resourceFactory,
34+
final Func1<RESOURCE, Observable<T>> observableFactory) {
35+
return new OnSubscribeFunc<T>() {
36+
@Override
37+
public Subscription onSubscribe(Observer<? super T> observer) {
38+
Subscription resourceSubscription = Subscriptions.empty();
39+
try {
40+
RESOURCE resource = resourceFactory.call();
41+
if (resource != null) {
42+
resourceSubscription = resource;
43+
}
44+
Observable<T> observable = observableFactory.call(resource);
45+
SafeObservableSubscription subscription = new SafeObservableSubscription();
46+
// Use SafeObserver to guarantee resourceSubscription will
47+
// be unsubscribed.
48+
return subscription.wrap(new CompositeSubscription(
49+
observable.subscribe(new SafeObserver<T>(
50+
subscription, observer)),
51+
resourceSubscription));
52+
} catch (Throwable e) {
53+
resourceSubscription.unsubscribe();
54+
return Observable.<T> error(e).subscribe(observer);
55+
}
56+
}
57+
};
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.fail;
19+
import static org.mockito.Mockito.inOrder;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.when;
24+
import static rx.operators.OperationUsing.using;
25+
26+
import org.junit.Test;
27+
import org.mockito.InOrder;
28+
29+
import rx.Observable;
30+
import rx.Observable.OnSubscribeFunc;
31+
import rx.Observer;
32+
import rx.Subscription;
33+
import rx.subscriptions.Subscriptions;
34+
import rx.util.functions.Action0;
35+
import rx.util.functions.Func0;
36+
import rx.util.functions.Func1;
37+
38+
public class OperationUsingTest {
39+
40+
@SuppressWarnings("serial")
41+
private static class TestException extends RuntimeException {
42+
}
43+
44+
private static interface Resource extends Subscription {
45+
public String getTextFromWeb();
46+
47+
@Override
48+
public void unsubscribe();
49+
}
50+
51+
@Test
52+
public void testUsing() {
53+
final Resource resource = mock(Resource.class);
54+
when(resource.getTextFromWeb()).thenReturn("Hello world!");
55+
56+
Func0<Resource> resourceFactory = new Func0<Resource>() {
57+
@Override
58+
public Resource call() {
59+
return resource;
60+
}
61+
};
62+
63+
Func1<Resource, Observable<String>> observableFactory = new Func1<Resource, Observable<String>>() {
64+
@Override
65+
public Observable<String> call(Resource resource) {
66+
return Observable.from(resource.getTextFromWeb().split(" "));
67+
}
68+
};
69+
70+
@SuppressWarnings("unchecked")
71+
Observer<String> observer = (Observer<String>) mock(Observer.class);
72+
Observable<String> observable = Observable.create(using(
73+
resourceFactory, observableFactory));
74+
observable.subscribe(observer);
75+
76+
InOrder inOrder = inOrder(observer);
77+
inOrder.verify(observer, times(1)).onNext("Hello");
78+
inOrder.verify(observer, times(1)).onNext("world!");
79+
inOrder.verify(observer, times(1)).onCompleted();
80+
inOrder.verifyNoMoreInteractions();
81+
82+
// The resouce should be closed
83+
verify(resource, times(1)).unsubscribe();
84+
}
85+
86+
@Test
87+
public void testUsingWithSubscribingTwice() {
88+
// When subscribe is called, a new resource should be created.
89+
Func0<Resource> resourceFactory = new Func0<Resource>() {
90+
@Override
91+
public Resource call() {
92+
return new Resource() {
93+
94+
boolean first = true;
95+
96+
@Override
97+
public String getTextFromWeb() {
98+
if (first) {
99+
first = false;
100+
return "Hello world!";
101+
}
102+
return "Nothing";
103+
}
104+
105+
@Override
106+
public void unsubscribe() {
107+
}
108+
109+
};
110+
}
111+
};
112+
113+
Func1<Resource, Observable<String>> observableFactory = new Func1<Resource, Observable<String>>() {
114+
@Override
115+
public Observable<String> call(Resource resource) {
116+
return Observable.from(resource.getTextFromWeb().split(" "));
117+
}
118+
};
119+
120+
@SuppressWarnings("unchecked")
121+
Observer<String> observer = (Observer<String>) mock(Observer.class);
122+
Observable<String> observable = Observable.create(using(
123+
resourceFactory, observableFactory));
124+
observable.subscribe(observer);
125+
observable.subscribe(observer);
126+
127+
InOrder inOrder = inOrder(observer);
128+
129+
inOrder.verify(observer, times(1)).onNext("Hello");
130+
inOrder.verify(observer, times(1)).onNext("world!");
131+
inOrder.verify(observer, times(1)).onCompleted();
132+
133+
inOrder.verify(observer, times(1)).onNext("Hello");
134+
inOrder.verify(observer, times(1)).onNext("world!");
135+
inOrder.verify(observer, times(1)).onCompleted();
136+
inOrder.verifyNoMoreInteractions();
137+
}
138+
139+
@Test(expected = TestException.class)
140+
public void testUsingWithResourceFactoryError() {
141+
Func0<Subscription> resourceFactory = new Func0<Subscription>() {
142+
@Override
143+
public Subscription call() {
144+
throw new TestException();
145+
}
146+
};
147+
148+
Func1<Subscription, Observable<Integer>> observableFactory = new Func1<Subscription, Observable<Integer>>() {
149+
@Override
150+
public Observable<Integer> call(Subscription subscription) {
151+
return Observable.empty();
152+
}
153+
};
154+
155+
Observable.create(using(resourceFactory, observableFactory))
156+
.toBlockingObservable().last();
157+
}
158+
159+
@Test
160+
public void testUsingWithObservableFactoryError() {
161+
final Action0 unsubscribe = mock(Action0.class);
162+
Func0<Subscription> resourceFactory = new Func0<Subscription>() {
163+
@Override
164+
public Subscription call() {
165+
return Subscriptions.create(unsubscribe);
166+
}
167+
};
168+
169+
Func1<Subscription, Observable<Integer>> observableFactory = new Func1<Subscription, Observable<Integer>>() {
170+
@Override
171+
public Observable<Integer> call(Subscription subscription) {
172+
throw new TestException();
173+
}
174+
};
175+
176+
try {
177+
Observable.create(using(resourceFactory, observableFactory))
178+
.toBlockingObservable().last();
179+
fail("Should throw a TestException when the observableFactory throws it");
180+
} catch (TestException e) {
181+
// Make sure that unsubscribe is called so that users can close
182+
// the resource if some error happens.
183+
verify(unsubscribe, times(1)).call();
184+
}
185+
}
186+
187+
@Test
188+
public void testUsingWithObservableFactoryErrorInOnSubscribe() {
189+
final Action0 unsubscribe = mock(Action0.class);
190+
Func0<Subscription> resourceFactory = new Func0<Subscription>() {
191+
@Override
192+
public Subscription call() {
193+
return Subscriptions.create(unsubscribe);
194+
}
195+
};
196+
197+
Func1<Subscription, Observable<Integer>> observableFactory = new Func1<Subscription, Observable<Integer>>() {
198+
@Override
199+
public Observable<Integer> call(Subscription subscription) {
200+
return Observable.create(new OnSubscribeFunc<Integer>() {
201+
@Override
202+
public Subscription onSubscribe(Observer<? super Integer> t1) {
203+
throw new TestException();
204+
}
205+
});
206+
}
207+
};
208+
209+
try {
210+
Observable.create(using(resourceFactory, observableFactory))
211+
.toBlockingObservable().last();
212+
fail("Should throw a TestException when the observableFactory throws it");
213+
} catch (TestException e) {
214+
// Make sure that unsubscribe is called so that users can close
215+
// the resource if some error happens.
216+
verify(unsubscribe, times(1)).call();
217+
}
218+
}
219+
}

0 commit comments

Comments
 (0)