Skip to content

Commit 75c40b5

Browse files
authored
2.x: enable op-fusion on GroupBy, doOnX, fix mistakes in map and filter (#4160)
1 parent af3107c commit 75c40b5

File tree

6 files changed

+477
-101
lines changed

6 files changed

+477
-101
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java

+172-24
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import io.reactivex.Flowable;
1919
import io.reactivex.exceptions.CompositeException;
2020
import io.reactivex.functions.Consumer;
21-
import io.reactivex.internal.subscriptions.SubscriptionHelper;
21+
import io.reactivex.internal.fuseable.ConditionalSubscriber;
22+
import io.reactivex.internal.subscribers.flowable.*;
23+
import io.reactivex.internal.util.Exceptions;
2224
import io.reactivex.plugins.RxJavaPlugins;
2325

2426
public final class FlowableDoOnEach<T> extends Flowable<T> {
@@ -41,51 +43,49 @@ public FlowableDoOnEach(Publisher<T> source, Consumer<? super T> onNext,
4143

4244
@Override
4345
protected void subscribeActual(Subscriber<? super T> s) {
44-
source.subscribe(new DoOnEachSubscriber<T>(s, onNext, onError, onComplete, onAfterTerminate));
46+
if (s instanceof ConditionalSubscriber) {
47+
source.subscribe(new DoOnEachConditionalSubscriber<T>(
48+
(ConditionalSubscriber<? super T>)s, onNext, onError, onComplete, onAfterTerminate));
49+
} else {
50+
source.subscribe(new DoOnEachSubscriber<T>(
51+
s, onNext, onError, onComplete, onAfterTerminate));
52+
}
4553
}
4654

47-
static final class DoOnEachSubscriber<T> implements Subscriber<T>, Subscription {
48-
final Subscriber<? super T> actual;
55+
static final class DoOnEachSubscriber<T> extends BasicFuseableSubscriber<T, T> {
4956
final Consumer<? super T> onNext;
5057
final Consumer<? super Throwable> onError;
5158
final Runnable onComplete;
5259
final Runnable onAfterTerminate;
5360

54-
Subscription s;
55-
56-
boolean done;
57-
5861
public DoOnEachSubscriber(
5962
Subscriber<? super T> actual,
6063
Consumer<? super T> onNext,
6164
Consumer<? super Throwable> onError,
6265
Runnable onComplete,
6366
Runnable onAfterTerminate) {
64-
this.actual = actual;
67+
super(actual);
6568
this.onNext = onNext;
6669
this.onError = onError;
6770
this.onComplete = onComplete;
6871
this.onAfterTerminate = onAfterTerminate;
6972
}
7073

71-
@Override
72-
public void onSubscribe(Subscription s) {
73-
if (SubscriptionHelper.validate(this.s, s)) {
74-
this.s = s;
75-
actual.onSubscribe(this);
76-
}
77-
}
78-
7974
@Override
8075
public void onNext(T t) {
8176
if (done) {
8277
return;
8378
}
79+
80+
if (sourceMode != NONE) {
81+
actual.onNext(null);
82+
return;
83+
}
84+
8485
try {
8586
onNext.accept(t);
8687
} catch (Throwable e) {
87-
s.cancel();
88-
onError(e);
88+
fail(e);
8989
return;
9090
}
9191

@@ -103,6 +103,7 @@ public void onError(Throwable t) {
103103
try {
104104
onError.accept(t);
105105
} catch (Throwable e) {
106+
Exceptions.throwIfFatal(e);
106107
actual.onError(new CompositeException(e, t));
107108
relay = false;
108109
}
@@ -126,7 +127,7 @@ public void onComplete() {
126127
try {
127128
onComplete.run();
128129
} catch (Throwable e) {
129-
onError(e);
130+
fail(e);
130131
return;
131132
}
132133

@@ -138,17 +139,164 @@ public void onComplete() {
138139
RxJavaPlugins.onError(e);
139140
}
140141
}
142+
143+
@Override
144+
public int requestFusion(int mode) {
145+
return transitiveBoundaryFusion(mode);
146+
}
147+
148+
@Override
149+
public T poll() {
150+
T v = qs.poll();
151+
152+
if (v != null) {
153+
try {
154+
onNext.accept(v);
155+
} finally {
156+
onAfterTerminate.run();
157+
}
158+
} else {
159+
if (sourceMode == SYNC) {
160+
onComplete.run();
161+
162+
onAfterTerminate.run();
163+
}
164+
}
165+
return v;
166+
}
167+
}
168+
169+
static final class DoOnEachConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
170+
final Consumer<? super T> onNext;
171+
final Consumer<? super Throwable> onError;
172+
final Runnable onComplete;
173+
final Runnable onAfterTerminate;
174+
175+
public DoOnEachConditionalSubscriber(
176+
ConditionalSubscriber<? super T> actual,
177+
Consumer<? super T> onNext,
178+
Consumer<? super Throwable> onError,
179+
Runnable onComplete,
180+
Runnable onAfterTerminate) {
181+
super(actual);
182+
this.onNext = onNext;
183+
this.onError = onError;
184+
this.onComplete = onComplete;
185+
this.onAfterTerminate = onAfterTerminate;
186+
}
187+
188+
@Override
189+
public void onNext(T t) {
190+
if (done) {
191+
return;
192+
}
193+
194+
if (sourceMode != NONE) {
195+
actual.onNext(null);
196+
return;
197+
}
198+
199+
try {
200+
onNext.accept(t);
201+
} catch (Throwable e) {
202+
fail(e);
203+
return;
204+
}
205+
206+
actual.onNext(t);
207+
}
141208

209+
@Override
210+
public boolean tryOnNext(T t) {
211+
if (done) {
212+
return false;
213+
}
214+
215+
if (sourceMode != NONE) {
216+
return actual.tryOnNext(null);
217+
}
218+
219+
try {
220+
onNext.accept(t);
221+
} catch (Throwable e) {
222+
fail(e);
223+
return false;
224+
}
225+
226+
return actual.tryOnNext(t);
227+
}
142228

143229
@Override
144-
public void request(long n) {
145-
s.request(n);
230+
public void onError(Throwable t) {
231+
if (done) {
232+
RxJavaPlugins.onError(t);
233+
return;
234+
}
235+
done = true;
236+
boolean relay = true;
237+
try {
238+
onError.accept(t);
239+
} catch (Throwable e) {
240+
Exceptions.throwIfFatal(e);
241+
actual.onError(new CompositeException(e, t));
242+
relay = false;
243+
}
244+
if (relay) {
245+
actual.onError(t);
246+
}
247+
248+
try {
249+
onAfterTerminate.run();
250+
} catch (Throwable e) {
251+
RxJavaPlugins.onError(e);
252+
}
146253
}
147254

148255
@Override
149-
public void cancel() {
150-
s.cancel();
256+
public void onComplete() {
257+
if (done) {
258+
return;
259+
}
260+
done = true;
261+
try {
262+
onComplete.run();
263+
} catch (Throwable e) {
264+
fail(e);
265+
return;
266+
}
267+
268+
actual.onComplete();
269+
270+
try {
271+
onAfterTerminate.run();
272+
} catch (Throwable e) {
273+
RxJavaPlugins.onError(e);
274+
}
275+
}
276+
277+
@Override
278+
public int requestFusion(int mode) {
279+
return transitiveBoundaryFusion(mode);
151280
}
152281

282+
@Override
283+
public T poll() {
284+
T v = qs.poll();
285+
286+
if (v != null) {
287+
try {
288+
onNext.accept(v);
289+
} finally {
290+
onAfterTerminate.run();
291+
}
292+
} else {
293+
if (sourceMode == SYNC) {
294+
onComplete.run();
295+
296+
onAfterTerminate.run();
297+
}
298+
}
299+
return v;
300+
}
153301
}
154302
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFilter.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ public void onNext(T t) {
5454

5555
@Override
5656
public boolean tryOnNext(T t) {
57+
if (done) {
58+
return false;
59+
}
60+
if (sourceMode != NONE) {
61+
actual.onNext(null);
62+
return true;
63+
}
5764
boolean b;
5865
try {
5966
b = filter.test(t);
@@ -117,10 +124,10 @@ public boolean tryOnNext(T t) {
117124
return false;
118125
}
119126

120-
if (sourceMode == ASYNC) {
121-
actual.onNext(null);
122-
return true;
127+
if (sourceMode != NONE) {
128+
return actual.tryOnNext(null);
123129
}
130+
124131
boolean b;
125132
try {
126133
b = filter.test(t);

0 commit comments

Comments
 (0)