Skip to content

Commit af3107c

Browse files
authored
2.x: Enable fusion-consumers (ReactiveX#4157)
* 2.x: Enable fusion-consumers * Fix observeOn leaking the worker * Code cleanup
1 parent 4dbd735 commit af3107c

File tree

143 files changed

+3387
-1464
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

143 files changed

+3387
-1464
lines changed

Diff for: src/main/java/io/reactivex/Flowable.java

+5-20
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.reactivex.internal.functions.Objects;
2828
import io.reactivex.internal.fuseable.ScalarCallable;
2929
import io.reactivex.internal.operators.flowable.*;
30+
import io.reactivex.internal.operators.flowable.FlowableConcatMap.ErrorMode;
3031
import io.reactivex.internal.subscribers.flowable.*;
3132
import io.reactivex.internal.subscriptions.EmptySubscription;
3233
import io.reactivex.plugins.RxJavaPlugins;
@@ -1467,7 +1468,7 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
14671468
if (prefetch <= 0) {
14681469
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
14691470
}
1470-
return new FlowableConcatMap<T, R>(this, mapper, prefetch);
1471+
return new FlowableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE);
14711472
}
14721473

14731474
@BackpressureSupport(BackpressureKind.FULL)
@@ -1480,12 +1481,7 @@ public final <U> Flowable<U> concatMapIterable(Function<? super T, ? extends Ite
14801481
@SchedulerSupport(SchedulerSupport.NONE)
14811482
public final <U> Flowable<U> concatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int prefetch) {
14821483
Objects.requireNonNull(mapper, "mapper is null");
1483-
return concatMap(new Function<T, Publisher<U>>() {
1484-
@Override
1485-
public Publisher<U> apply(T v) {
1486-
return new FlowableFromIterable<U>(mapper.apply(v));
1487-
}
1488-
}, prefetch);
1484+
return new FlowableFlattenIterable<T, U>(this, mapper, prefetch);
14891485
}
14901486

14911487
@BackpressureSupport(BackpressureKind.FULL)
@@ -2029,13 +2025,7 @@ public final <U, R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<
20292025
@BackpressureSupport(BackpressureKind.FULL)
20302026
@SchedulerSupport(SchedulerSupport.NONE)
20312027
public final <U> Flowable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
2032-
Objects.requireNonNull(mapper, "mapper is null");
2033-
return flatMap(new Function<T, Publisher<U>>() {
2034-
@Override
2035-
public Publisher<U> apply(T v) {
2036-
return new FlowableFromIterable<U>(mapper.apply(v));
2037-
}
2038-
});
2028+
return flatMapIterable(mapper, bufferSize());
20392029
}
20402030

20412031
@BackpressureSupport(BackpressureKind.FULL)
@@ -2054,12 +2044,7 @@ public Publisher<U> apply(T t) {
20542044
@BackpressureSupport(BackpressureKind.FULL)
20552045
@SchedulerSupport(SchedulerSupport.NONE)
20562046
public final <U> Flowable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int bufferSize) {
2057-
return flatMap(new Function<T, Publisher<U>>() {
2058-
@Override
2059-
public Publisher<U> apply(T v) {
2060-
return new FlowableFromIterable<U>(mapper.apply(v));
2061-
}
2062-
}, false, bufferSize);
2047+
return new FlowableFlattenIterable<T, U>(this, mapper, bufferSize);
20632048
}
20642049

20652050
@BackpressureSupport(BackpressureKind.NONE)

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableConcat.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public CompletableConcatSubscriber(CompletableSubscriber actual, int prefetch) {
6868

6969
@Override
7070
public void onSubscribe(Subscription s) {
71-
if (SubscriptionHelper.validateSubscription(this.s, s)) {
71+
if (SubscriptionHelper.validate(this.s, s)) {
7272
this.s = s;
7373
actual.onSubscribe(this);
7474
s.request(prefetch);

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableMerge.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public boolean isDisposed() {
8282

8383
@Override
8484
public void onSubscribe(Subscription s) {
85-
if (SubscriptionHelper.validateSubscription(this.s, s)) {
85+
if (SubscriptionHelper.validate(this.s, s)) {
8686
this.s = s;
8787
set.add(Disposables.from(s));
8888
actual.onSubscribe(this);

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableAll.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public AllSubscriber(Subscriber<? super Boolean> actual, Predicate<? super T> pr
5050
}
5151
@Override
5252
public void onSubscribe(Subscription s) {
53-
if (SubscriptionHelper.validateSubscription(this.s, s)) {
53+
if (SubscriptionHelper.validate(this.s, s)) {
5454
this.s = s;
5555
actual.onSubscribe(this);
5656
s.request(Long.MAX_VALUE);

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void subscribe(Publisher<? extends T>[] sources) {
9595

9696
@Override
9797
public void request(long n) {
98-
if (!SubscriptionHelper.validateRequest(n)) {
98+
if (!SubscriptionHelper.validate(n)) {
9999
return;
100100
}
101101

@@ -172,7 +172,7 @@ public void request(long n) {
172172
Subscription s = get();
173173
if (s != null) {
174174
s.request(n);
175-
} else if (SubscriptionHelper.validateRequest(n)) {
175+
} else if (SubscriptionHelper.validate(n)) {
176176
BackpressureHelper.add(missedRequested, n);
177177
s = get();
178178
if (s != null && s != SubscriptionHelper.CANCELLED) {

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableAny.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public AnySubscriber(Subscriber<? super Boolean> actual, Predicate<? super T> pr
4747
}
4848
@Override
4949
public void onSubscribe(Subscription s) {
50-
if (SubscriptionHelper.validateSubscription(this.s, s)) {
50+
if (SubscriptionHelper.validate(this.s, s)) {
5151
this.s = s;
5252
actual.onSubscribe(this);
5353
s.request(Long.MAX_VALUE);

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ boolean createBuffer() {
9696

9797
@Override
9898
public void onSubscribe(Subscription s) {
99-
if (SubscriptionHelper.validateSubscription(this.s, s)) {
99+
if (SubscriptionHelper.validate(this.s, s)) {
100100
this.s = s;
101101
actual.onSubscribe(this);
102102
}
@@ -137,7 +137,7 @@ public void onComplete() {
137137

138138
@Override
139139
public void request(long n) {
140-
if (SubscriptionHelper.validateRequest(n)) {
140+
if (SubscriptionHelper.validate(n)) {
141141
long m = BackpressureHelper.multiplyCap(n, count);
142142
s.request(m);
143143
}
@@ -173,7 +173,7 @@ public BufferSkipSubscriber(Subscriber<? super U> actual, int count, int skip, S
173173

174174
@Override
175175
public void onSubscribe(Subscription s) {
176-
if (SubscriptionHelper.validateSubscription(this.s, s)) {
176+
if (SubscriptionHelper.validate(this.s, s)) {
177177
this.s = s;
178178
actual.onSubscribe(this);
179179
}
@@ -231,7 +231,7 @@ public void onComplete() {
231231

232232
@Override
233233
public void request(long n) {
234-
if (!SubscriptionHelper.validateRequest(n)) {
234+
if (!SubscriptionHelper.validate(n)) {
235235
return;
236236
}
237237
// requesting the first set of buffers must happen only once

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public BufferBoundarySubscriber(Subscriber<? super U> actual,
7777
}
7878
@Override
7979
public void onSubscribe(Subscription s) {
80-
if (SubscriptionHelper.validateSubscription(this.s, s)) {
80+
if (SubscriptionHelper.validate(this.s, s)) {
8181
this.s = s;
8282

8383
BufferOpenSubscriber<T, U, Open, Close> bos = new BufferOpenSubscriber<T, U, Open, Close>(this);

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public BufferBondarySupplierSubscriber(Subscriber<? super U> actual, Supplier<U>
6767

6868
@Override
6969
public void onSubscribe(Subscription s) {
70-
if (!SubscriptionHelper.validateSubscription(this.s, s)) {
70+
if (!SubscriptionHelper.validate(this.s, s)) {
7171
return;
7272
}
7373
this.s = s;

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferExactBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public BufferExactBondarySubscriber(Subscriber<? super U> actual, Supplier<U> bu
6464

6565
@Override
6666
public void onSubscribe(Subscription s) {
67-
if (!SubscriptionHelper.validateSubscription(this.s, s)) {
67+
if (!SubscriptionHelper.validate(this.s, s)) {
6868
return;
6969
}
7070
this.s = s;

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public BufferExactUnboundedSubscriber(
106106

107107
@Override
108108
public void onSubscribe(Subscription s) {
109-
if (!SubscriptionHelper.validateSubscription(this.s, s)) {
109+
if (!SubscriptionHelper.validate(this.s, s)) {
110110
return;
111111
}
112112
this.s = s;
@@ -283,7 +283,7 @@ public BufferSkipBoundedSubscriber(Subscriber<? super U> actual,
283283

284284
@Override
285285
public void onSubscribe(Subscription s) {
286-
if (!SubscriptionHelper.validateSubscription(this.s, s)) {
286+
if (!SubscriptionHelper.validate(this.s, s)) {
287287
return;
288288
}
289289
this.s = s;
@@ -459,7 +459,7 @@ public BufferExactBoundedSubscriber(
459459

460460
@Override
461461
public void onSubscribe(Subscription s) {
462-
if (!SubscriptionHelper.validateSubscription(this.s, s)) {
462+
if (!SubscriptionHelper.validate(this.s, s)) {
463463
return;
464464
}
465465
this.s = s;

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ public ReplaySubscription(Subscriber<? super T> child, CacheState<T> state) {
275275
}
276276
@Override
277277
public void request(long n) {
278-
if (!SubscriptionHelper.validateRequest(n)) {
278+
if (!SubscriptionHelper.validate(n)) {
279279
return;
280280
}
281281
for (;;) {

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public CollectSubscriber(Subscriber<? super U> actual, U u, BiConsumer<? super U
6666

6767
@Override
6868
public void onSubscribe(Subscription s) {
69-
if (SubscriptionHelper.validateSubscription(this.s, s)) {
69+
if (SubscriptionHelper.validate(this.s, s)) {
7070
this.s = s;
7171
actual.onSubscribe(this);
7272
s.request(Long.MAX_VALUE);

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void subscribe(Publisher<? extends T>[] sources) {
128128

129129
@Override
130130
public void request(long n) {
131-
if (SubscriptionHelper.validateRequest(n)) {
131+
if (SubscriptionHelper.validate(n)) {
132132
BackpressureHelper.add(requested, n);
133133
drain();
134134
}

0 commit comments

Comments
 (0)