Skip to content

Commit fc0ca6e

Browse files
RomanWuattierakarnokd
authored andcommitted
Fix Flowable.blockingSubscribe is unbounded and can lead to OOME (#6026)
1 parent ba06bff commit fc0ca6e

File tree

7 files changed

+816
-0
lines changed

7 files changed

+816
-0
lines changed

src/main/java/io/reactivex/Flowable.java

+85
Original file line numberDiff line numberDiff line change
@@ -5853,6 +5853,38 @@ public final void blockingSubscribe(Consumer<? super T> onNext) {
58535853
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
58545854
}
58555855

5856+
/**
5857+
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
5858+
* <p>
5859+
* If the Flowable emits an error, it is wrapped into an
5860+
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
5861+
* and routed to the RxJavaPlugins.onError handler.
5862+
* Using the overloads {@link #blockingSubscribe(Consumer, Consumer)}
5863+
* or {@link #blockingSubscribe(Consumer, Consumer, Action)} instead is recommended.
5864+
* <p>
5865+
* Note that calling this method will block the caller thread until the upstream terminates
5866+
* normally or with an error. Therefore, calling this method from special threads such as the
5867+
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
5868+
* <dl>
5869+
* <dt><b>Backpressure:</b></dt>
5870+
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
5871+
* outstanding request amount for items).</dd>
5872+
* <dt><b>Scheduler:</b></dt>
5873+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
5874+
* </dl>
5875+
* @param onNext the callback action for each source value
5876+
* @param bufferSize the size of the buffer
5877+
* @since 2.1.15 - experimental
5878+
* @see #blockingSubscribe(Consumer, Consumer)
5879+
* @see #blockingSubscribe(Consumer, Consumer, Action)
5880+
*/
5881+
@BackpressureSupport(BackpressureKind.FULL)
5882+
@SchedulerSupport(SchedulerSupport.NONE)
5883+
@Experimental
5884+
public final void blockingSubscribe(Consumer<? super T> onNext, int bufferSize) {
5885+
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, bufferSize);
5886+
}
5887+
58565888
/**
58575889
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
58585890
* <p>
@@ -5877,6 +5909,32 @@ public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super
58775909
FlowableBlockingSubscribe.subscribe(this, onNext, onError, Functions.EMPTY_ACTION);
58785910
}
58795911

5912+
/**
5913+
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
5914+
* <p>
5915+
* Note that calling this method will block the caller thread until the upstream terminates
5916+
* normally or with an error. Therefore, calling this method from special threads such as the
5917+
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
5918+
* <dl>
5919+
* <dt><b>Backpressure:</b></dt>
5920+
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
5921+
* outstanding request amount for items).</dd>
5922+
* <dt><b>Scheduler:</b></dt>
5923+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
5924+
* </dl>
5925+
* @param onNext the callback action for each source value
5926+
* @param onError the callback action for an error event
5927+
* @param bufferSize the size of the buffer
5928+
* @since 2.1.15 - experimental
5929+
* @see #blockingSubscribe(Consumer, Consumer, Action)
5930+
*/
5931+
@BackpressureSupport(BackpressureKind.FULL)
5932+
@SchedulerSupport(SchedulerSupport.NONE)
5933+
@Experimental
5934+
public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
5935+
int bufferSize) {
5936+
FlowableBlockingSubscribe.subscribe(this, onNext, onError, Functions.EMPTY_ACTION, bufferSize);
5937+
}
58805938

58815939
/**
58825940
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
@@ -5902,6 +5960,33 @@ public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super
59025960
FlowableBlockingSubscribe.subscribe(this, onNext, onError, onComplete);
59035961
}
59045962

5963+
/**
5964+
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
5965+
* <p>
5966+
* Note that calling this method will block the caller thread until the upstream terminates
5967+
* normally or with an error. Therefore, calling this method from special threads such as the
5968+
* Android Main Thread or the Swing Event Dispatch Thread is not recommended.
5969+
* <dl>
5970+
* <dt><b>Backpressure:</b></dt>
5971+
* <dd>The operator consumes the source {@code Flowable} in an bounded manner (up to bufferSize
5972+
* outstanding request amount for items).</dd>
5973+
* <dt><b>Scheduler:</b></dt>
5974+
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
5975+
* </dl>
5976+
* @param onNext the callback action for each source value
5977+
* @param onError the callback action for an error event
5978+
* @param onComplete the callback action for the completion event.
5979+
* @param bufferSize the size of the buffer
5980+
* @since 2.1.15 - experimental
5981+
*/
5982+
@BackpressureSupport(BackpressureKind.FULL)
5983+
@SchedulerSupport(SchedulerSupport.NONE)
5984+
@Experimental
5985+
public final void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete,
5986+
int bufferSize) {
5987+
FlowableBlockingSubscribe.subscribe(this, onNext, onError, onComplete, bufferSize);
5988+
}
5989+
59055990
/**
59065991
* Subscribes to the source and calls the {@link Subscriber} methods <strong>on the current thread</strong>.
59075992
* <p>

src/main/java/io/reactivex/internal/functions/Functions.java

+19
Original file line numberDiff line numberDiff line change
@@ -745,4 +745,23 @@ public void accept(Subscription t) throws Exception {
745745
t.request(Long.MAX_VALUE);
746746
}
747747
}
748+
749+
@SuppressWarnings("unchecked")
750+
public static <T> Consumer<T> boundedConsumer(int bufferSize) {
751+
return (Consumer<T>) new BoundedConsumer(bufferSize);
752+
}
753+
754+
public static class BoundedConsumer implements Consumer<Subscription> {
755+
756+
final int bufferSize;
757+
758+
BoundedConsumer(int bufferSize) {
759+
this.bufferSize = bufferSize;
760+
}
761+
762+
@Override
763+
public void accept(Subscription s) throws Exception {
764+
s.request(bufferSize);
765+
}
766+
}
748767
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java

+19
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,23 @@ public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? supe
108108
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
109109
subscribe(o, new LambdaSubscriber<T>(onNext, onError, onComplete, Functions.REQUEST_MAX));
110110
}
111+
112+
/**
113+
* Subscribes to the source and calls the given actions on the current thread.
114+
* @param o the source publisher
115+
* @param onNext the callback action for each source value
116+
* @param onError the callback action for an error event
117+
* @param onComplete the callback action for the completion event.
118+
* @param bufferSize the number of elements to prefetch from the source Publisher
119+
* @param <T> the value type
120+
*/
121+
public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? super T> onNext,
122+
final Consumer<? super Throwable> onError, final Action onComplete, int bufferSize) {
123+
ObjectHelper.requireNonNull(onNext, "onNext is null");
124+
ObjectHelper.requireNonNull(onError, "onError is null");
125+
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
126+
ObjectHelper.verifyPositive(bufferSize, "number > 0 required");
127+
subscribe(o, new BoundedSubscriber<T>(onNext, onError, onComplete, Functions.boundedConsumer(bufferSize),
128+
bufferSize));
129+
}
111130
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.subscribers;
15+
16+
import io.reactivex.FlowableSubscriber;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.exceptions.CompositeException;
19+
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.functions.Action;
21+
import io.reactivex.functions.Consumer;
22+
import io.reactivex.internal.functions.Functions;
23+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
24+
import io.reactivex.observers.LambdaConsumerIntrospection;
25+
import io.reactivex.plugins.RxJavaPlugins;
26+
import org.reactivestreams.Subscription;
27+
28+
import java.util.concurrent.atomic.AtomicReference;
29+
30+
public final class BoundedSubscriber<T> extends AtomicReference<Subscription>
31+
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {
32+
33+
private static final long serialVersionUID = -7251123623727029452L;
34+
final Consumer<? super T> onNext;
35+
final Consumer<? super Throwable> onError;
36+
final Action onComplete;
37+
final Consumer<? super Subscription> onSubscribe;
38+
39+
final int bufferSize;
40+
int consumed;
41+
final int limit;
42+
43+
public BoundedSubscriber(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
44+
Action onComplete, Consumer<? super Subscription> onSubscribe, int bufferSize) {
45+
super();
46+
this.onNext = onNext;
47+
this.onError = onError;
48+
this.onComplete = onComplete;
49+
this.onSubscribe = onSubscribe;
50+
this.bufferSize = bufferSize;
51+
this.limit = bufferSize - (bufferSize >> 2);
52+
}
53+
54+
@Override
55+
public void onSubscribe(Subscription s) {
56+
if (SubscriptionHelper.setOnce(this, s)) {
57+
try {
58+
onSubscribe.accept(this);
59+
} catch (Throwable e) {
60+
Exceptions.throwIfFatal(e);
61+
s.cancel();
62+
onError(e);
63+
}
64+
}
65+
}
66+
67+
@Override
68+
public void onNext(T t) {
69+
if (!isDisposed()) {
70+
try {
71+
onNext.accept(t);
72+
73+
int c = consumed + 1;
74+
if (c == limit) {
75+
consumed = 0;
76+
get().request(limit);
77+
} else {
78+
consumed = c;
79+
}
80+
} catch (Throwable e) {
81+
Exceptions.throwIfFatal(e);
82+
get().cancel();
83+
onError(e);
84+
}
85+
}
86+
}
87+
88+
@Override
89+
public void onError(Throwable t) {
90+
if (get() != SubscriptionHelper.CANCELLED) {
91+
lazySet(SubscriptionHelper.CANCELLED);
92+
try {
93+
onError.accept(t);
94+
} catch (Throwable e) {
95+
Exceptions.throwIfFatal(e);
96+
RxJavaPlugins.onError(new CompositeException(t, e));
97+
}
98+
} else {
99+
RxJavaPlugins.onError(t);
100+
}
101+
}
102+
103+
@Override
104+
public void onComplete() {
105+
if (get() != SubscriptionHelper.CANCELLED) {
106+
lazySet(SubscriptionHelper.CANCELLED);
107+
try {
108+
onComplete.run();
109+
} catch (Throwable e) {
110+
Exceptions.throwIfFatal(e);
111+
RxJavaPlugins.onError(e);
112+
}
113+
}
114+
}
115+
116+
@Override
117+
public void dispose() {
118+
cancel();
119+
}
120+
121+
@Override
122+
public boolean isDisposed() {
123+
return get() == SubscriptionHelper.CANCELLED;
124+
}
125+
126+
@Override
127+
public void request(long n) {
128+
get().request(n);
129+
}
130+
131+
@Override
132+
public void cancel() {
133+
SubscriptionHelper.cancel(this);
134+
}
135+
136+
@Override
137+
public boolean hasCustomOnError() {
138+
return onError != Functions.ON_ERROR_MISSING;
139+
}
140+
}

src/test/java/io/reactivex/exceptions/OnErrorNotImplementedExceptionTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ public void flowableBlockingSubscribe1() {
6666
.blockingSubscribe(Functions.emptyConsumer());
6767
}
6868

69+
@Test
70+
public void flowableBoundedBlockingSubscribe1() {
71+
Flowable.error(new TestException())
72+
.blockingSubscribe(Functions.emptyConsumer(), 128);
73+
}
74+
6975
@Test
7076
public void observableSubscribe0() {
7177
Observable.error(new TestException())

0 commit comments

Comments
 (0)