Skip to content

Commit 1e29958

Browse files
authored
3.x: Change Flowable.groupBy to signal MBE instead of possibly hanging (#6740)
* 3.x: Change Flowable.groupBy to signal MBE instead of possibly hanging * Update groupBy javadocs * Update backpressure annotation to ERROR, add some see links * Correct javadocs typos * Improve coverage of changes
1 parent 292dc62 commit 1e29958

File tree

5 files changed

+291
-379
lines changed

5 files changed

+291
-379
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+61-37
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import io.reactivex.rxjava3.annotations.*;
2121
import io.reactivex.rxjava3.disposables.Disposable;
22-
import io.reactivex.rxjava3.exceptions.Exceptions;
22+
import io.reactivex.rxjava3.exceptions.*;
2323
import io.reactivex.rxjava3.flowables.*;
2424
import io.reactivex.rxjava3.functions.*;
2525
import io.reactivex.rxjava3.internal.functions.*;
@@ -10421,13 +10421,16 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, final Co
1042110421
*
1042210422
* <dl>
1042310423
* <dt><b>Backpressure:</b></dt>
10424-
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
10425-
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
10426-
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
10427-
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
10428-
* lead to {@code OutOfMemoryError}.</dd>
10424+
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
10425+
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
10426+
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
10427+
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
10428+
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
1042910429
* <dt><b>Scheduler:</b></dt>
1043010430
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
10431+
* <dt><b>Error handling:</b></dt>
10432+
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
10433+
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
1043110434
* </dl>
1043210435
*
1043310436
* @param keySelector
@@ -10438,9 +10441,11 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, final Co
1043810441
* unique key value and each of which emits those items from the source Publisher that share that
1043910442
* key value
1044010443
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
10444+
* @see #groupBy(Function, boolean)
10445+
* @see #groupBy(Function, Function)
1044110446
*/
1044210447
@CheckReturnValue
10443-
@BackpressureSupport(BackpressureKind.FULL)
10448+
@BackpressureSupport(BackpressureKind.ERROR)
1044410449
@SchedulerSupport(SchedulerSupport.NONE)
1044510450
public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector) {
1044610451
return groupBy(keySelector, Functions.<T>identity(), false, bufferSize());
@@ -10474,13 +10479,16 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
1047410479
*
1047510480
* <dl>
1047610481
* <dt><b>Backpressure:</b></dt>
10477-
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
10478-
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
10479-
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
10480-
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
10481-
* lead to {@code OutOfMemoryError}.</dd>
10482+
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
10483+
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
10484+
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
10485+
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
10486+
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
1048210487
* <dt><b>Scheduler:</b></dt>
1048310488
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
10489+
* <dt><b>Error handling:</b></dt>
10490+
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
10491+
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
1048410492
* </dl>
1048510493
*
1048610494
* @param keySelector
@@ -10496,7 +10504,7 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
1049610504
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
1049710505
*/
1049810506
@CheckReturnValue
10499-
@BackpressureSupport(BackpressureKind.FULL)
10507+
@BackpressureSupport(BackpressureKind.ERROR)
1050010508
@SchedulerSupport(SchedulerSupport.NONE)
1050110509
public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector, boolean delayError) {
1050210510
return groupBy(keySelector, Functions.<T>identity(), delayError, bufferSize());
@@ -10530,13 +10538,16 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
1053010538
*
1053110539
* <dl>
1053210540
* <dt><b>Backpressure:</b></dt>
10533-
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
10534-
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
10535-
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
10536-
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
10537-
* lead to {@code OutOfMemoryError}.</dd>
10541+
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
10542+
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
10543+
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
10544+
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
10545+
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
1053810546
* <dt><b>Scheduler:</b></dt>
1053910547
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
10548+
* <dt><b>Error handling:</b></dt>
10549+
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
10550+
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
1054010551
* </dl>
1054110552
*
1054210553
* @param keySelector
@@ -10551,9 +10562,12 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
1055110562
* unique key value and each of which emits those items from the source Publisher that share that
1055210563
* key value
1055310564
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
10565+
* @see #groupBy(Function, Function, boolean)
10566+
* @see #groupBy(Function, Function, boolean, int)
10567+
* @see #groupBy(Function, Function, boolean, int, Function)
1055410568
*/
1055510569
@CheckReturnValue
10556-
@BackpressureSupport(BackpressureKind.FULL)
10570+
@BackpressureSupport(BackpressureKind.ERROR)
1055710571
@SchedulerSupport(SchedulerSupport.NONE)
1055810572
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
1055910573
Function<? super T, ? extends V> valueSelector) {
@@ -10588,13 +10602,16 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1058810602
*
1058910603
* <dl>
1059010604
* <dt><b>Backpressure:</b></dt>
10591-
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
10592-
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
10593-
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
10594-
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
10595-
* lead to {@code OutOfMemoryError}.</dd>
10605+
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
10606+
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
10607+
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
10608+
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
10609+
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
1059610610
* <dt><b>Scheduler:</b></dt>
1059710611
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
10612+
* <dt><b>Error handling:</b></dt>
10613+
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
10614+
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
1059810615
* </dl>
1059910616
*
1060010617
* @param keySelector
@@ -10612,9 +10629,10 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1061210629
* unique key value and each of which emits those items from the source Publisher that share that
1061310630
* key value
1061410631
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
10632+
* @see #groupBy(Function, Function, boolean, int)
1061510633
*/
1061610634
@CheckReturnValue
10617-
@BackpressureSupport(BackpressureKind.FULL)
10635+
@BackpressureSupport(BackpressureKind.ERROR)
1061810636
@SchedulerSupport(SchedulerSupport.NONE)
1061910637
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
1062010638
Function<? super T, ? extends V> valueSelector, boolean delayError) {
@@ -10649,13 +10667,16 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1064910667
*
1065010668
* <dl>
1065110669
* <dt><b>Backpressure:</b></dt>
10652-
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
10653-
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
10654-
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
10655-
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
10656-
* lead to {@code OutOfMemoryError}.</dd>
10670+
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
10671+
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
10672+
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
10673+
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
10674+
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
1065710675
* <dt><b>Scheduler:</b></dt>
1065810676
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
10677+
* <dt><b>Error handling:</b></dt>
10678+
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
10679+
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
1065910680
* </dl>
1066010681
*
1066110682
* @param keySelector
@@ -10678,7 +10699,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1067810699
*/
1067910700
@CheckReturnValue
1068010701
@NonNull
10681-
@BackpressureSupport(BackpressureKind.FULL)
10702+
@BackpressureSupport(BackpressureKind.SPECIAL)
1068210703
@SchedulerSupport(SchedulerSupport.NONE)
1068310704
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
1068410705
Function<? super T, ? extends V> valueSelector,
@@ -10759,13 +10780,16 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1075910780
*
1076010781
* <dl>
1076110782
* <dt><b>Backpressure:</b></dt>
10762-
* <dd>Both the returned and its inner {@code GroupedFlowable}s honor backpressure and the source {@code Publisher}
10763-
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
10764-
* downstream consumption). Note that both the returned and its inner {@code GroupedFlowable}s use
10765-
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
10766-
* lead to {@code OutOfMemoryError}.</dd>
10783+
* <dd>The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else
10784+
* this operator will signal {@link MissingBackpressureException}. To avoid this exception, make
10785+
* sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured.
10786+
* The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer
10787+
* nature of this operator, each group must be consumed so the whole operator can make progress and not hang.</dd>
1076710788
* <dt><b>Scheduler:</b></dt>
1076810789
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
10790+
* <dt><b>Error handling:</b></dt>
10791+
* <dd>If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and
10792+
* all active inner {@code GroupedFlowable}s will signal the same exception.</dd>
1076910793
* </dl>
1077010794
* <p>History: 2.1.10 - beta
1077110795
* @param keySelector
@@ -10796,7 +10820,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1079610820
*/
1079710821
@CheckReturnValue
1079810822
@NonNull
10799-
@BackpressureSupport(BackpressureKind.FULL)
10823+
@BackpressureSupport(BackpressureKind.SPECIAL)
1080010824
@SchedulerSupport(SchedulerSupport.NONE)
1080110825
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
1080210826
Function<? super T, ? extends V> valueSelector,

0 commit comments

Comments
 (0)