Skip to content

Commit 6926fa6

Browse files
Synchronize -> Serialize
- migrate all usage to Serialized instead of Synchronized - remove implementations of SerializedObserver that lost the competition (performance and testing in production)
1 parent 76bbefc commit 6926fa6

14 files changed

+192
-559
lines changed

rxjava-core/src/main/java/rx/observers/SafeSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
* <li>When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).</li>
5353
* </ul>
5454
* <p>
55-
* It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that.
55+
* It will not synchronize onNext execution. Use the {@link SerializedSubscriber} to do that.
5656
*
5757
* @param <T>
5858
*/
Lines changed: 156 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package rx.observers;
22

3+
import java.util.ArrayList;
4+
35
import rx.Observer;
46

57
/**
@@ -15,28 +17,170 @@
1517
* @param <T>
1618
*/
1719
public class SerializedObserver<T> implements Observer<T> {
18-
/*
19-
* Facade to actual implementation until final decision is made
20-
* on the implementation.
21-
*/
22-
private final SerializedObserverViaQueueAndLock<T> actual;
23-
24-
public SerializedObserver(Observer<? super T> observer) {
25-
this.actual = new SerializedObserverViaQueueAndLock<T>(observer);
20+
private final Observer<? super T> actual;
21+
22+
private boolean emitting = false;
23+
private boolean terminated = false;
24+
private ArrayList<Object> queue = new ArrayList<Object>();
25+
26+
private static Sentinel NULL_SENTINEL = new Sentinel();
27+
private static Sentinel COMPLETE_SENTINEL = new Sentinel();
28+
29+
private static class Sentinel {
30+
31+
}
32+
33+
private static class ErrorSentinel extends Sentinel {
34+
final Throwable e;
35+
36+
ErrorSentinel(Throwable e) {
37+
this.e = e;
38+
}
39+
}
40+
41+
public SerializedObserver(Observer<? super T> s) {
42+
this.actual = s;
2643
}
2744

2845
@Override
2946
public void onCompleted() {
30-
actual.onCompleted();
47+
boolean canEmit = false;
48+
ArrayList<Object> list = null;
49+
synchronized (this) {
50+
if (terminated) {
51+
return;
52+
}
53+
terminated = true;
54+
if (!emitting) {
55+
// emit immediately
56+
emitting = true;
57+
canEmit = true;
58+
if (queue.size() > 0) {
59+
list = queue; // copy reference
60+
queue = new ArrayList<Object>(); // new version;
61+
}
62+
} else {
63+
// someone else is already emitting so just queue it
64+
queue.add(COMPLETE_SENTINEL);
65+
}
66+
}
67+
if (canEmit) {
68+
// we won the right to emit
69+
try {
70+
drainQueue(list);
71+
actual.onCompleted();
72+
} finally {
73+
synchronized (this) {
74+
emitting = false;
75+
}
76+
}
77+
}
3178
}
3279

3380
@Override
34-
public void onError(Throwable e) {
35-
actual.onError(e);
81+
public void onError(final Throwable e) {
82+
boolean canEmit = false;
83+
ArrayList<Object> list = null;
84+
synchronized (this) {
85+
if (terminated) {
86+
return;
87+
}
88+
terminated = true;
89+
if (!emitting) {
90+
// emit immediately
91+
emitting = true;
92+
canEmit = true;
93+
if (queue.size() > 0) {
94+
list = queue; // copy reference
95+
queue = new ArrayList<Object>(); // new version;
96+
}
97+
} else {
98+
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
99+
queue.clear();
100+
queue.add(new ErrorSentinel(e));
101+
}
102+
}
103+
if (canEmit) {
104+
// we won the right to emit
105+
try {
106+
drainQueue(list);
107+
actual.onError(e);
108+
} finally {
109+
synchronized (this) {
110+
emitting = false;
111+
}
112+
}
113+
}
36114
}
37115

38116
@Override
39117
public void onNext(T t) {
40-
actual.onNext(t);
118+
boolean canEmit = false;
119+
ArrayList<Object> list = null;
120+
synchronized (this) {
121+
if (terminated) {
122+
return;
123+
}
124+
if (!emitting) {
125+
// emit immediately
126+
emitting = true;
127+
canEmit = true;
128+
if (queue.size() > 0) {
129+
list = queue; // copy reference
130+
queue = new ArrayList<Object>(); // new version;
131+
}
132+
} else {
133+
// someone else is already emitting so just queue it
134+
if (t == null) {
135+
queue.add(NULL_SENTINEL);
136+
} else {
137+
queue.add(t);
138+
}
139+
}
140+
}
141+
if (canEmit) {
142+
// we won the right to emit
143+
try {
144+
drainQueue(list);
145+
actual.onNext(t);
146+
} finally {
147+
synchronized (this) {
148+
if (terminated) {
149+
list = queue; // copy reference
150+
queue = new ArrayList<Object>(); // new version;
151+
} else {
152+
// release this thread
153+
emitting = false;
154+
canEmit = false;
155+
}
156+
}
157+
}
158+
}
159+
160+
// if terminated this will still be true so let's drain the rest of the queue
161+
if (canEmit) {
162+
drainQueue(list);
163+
}
164+
}
165+
166+
public void drainQueue(ArrayList<Object> list) {
167+
if (list == null || list.size() == 0) {
168+
return;
169+
}
170+
for (Object v : list) {
171+
if (v != null) {
172+
if (v instanceof Sentinel) {
173+
if (v == NULL_SENTINEL) {
174+
actual.onNext(null);
175+
} else if (v == COMPLETE_SENTINEL) {
176+
actual.onCompleted();
177+
} else if (v instanceof ErrorSentinel) {
178+
actual.onError(((ErrorSentinel) v).e);
179+
}
180+
} else {
181+
actual.onNext((T) v);
182+
}
183+
}
184+
}
41185
}
42186
}

rxjava-core/src/main/java/rx/observers/SerializedObserverViaQueueAndCounter.java

Lines changed: 0 additions & 70 deletions
This file was deleted.

0 commit comments

Comments
 (0)