Skip to content

Commit d35ac97

Browse files
authored
3.x: remove buffer/window with supplier & onExceptionResumeNext (#6564)
1 parent 71945f3 commit d35ac97

19 files changed

+12
-4313
lines changed

src/main/java/io/reactivex/Flowable.java

+1-192
Original file line numberDiff line numberDiff line change
@@ -6943,79 +6943,6 @@ public final <B, U extends Collection<? super T>> Flowable<U> buffer(Publisher<B
69436943
return RxJavaPlugins.onAssembly(new FlowableBufferExactBoundary<T, U, B>(this, boundaryIndicator, bufferSupplier));
69446944
}
69456945

6946-
/**
6947-
* Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting
6948-
* Publisher emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a
6949-
* new buffer whenever the Publisher produced by the specified {@code boundaryIndicatorSupplier} emits an item.
6950-
* <p>
6951-
* <img width="640" height="395" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer1.png" alt="">
6952-
* <p>
6953-
* If either the source {@code Publisher} or the boundary {@code Publisher} issues an {@code onError} notification the event is passed on
6954-
* immediately without first emitting the buffer it is in the process of assembling.
6955-
* <dl>
6956-
* <dt><b>Backpressure:</b></dt>
6957-
* <dd>This operator does not support backpressure as it is instead controlled by the given Publishers and
6958-
* buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.</dd>
6959-
* <dt><b>Scheduler:</b></dt>
6960-
* <dd>This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.</dd>
6961-
* </dl>
6962-
*
6963-
* @param <B> the value type of the boundary-providing Publisher
6964-
* @param boundaryIndicatorSupplier
6965-
* a {@link Supplier} that produces a Publisher that governs the boundary between buffers.
6966-
* Whenever the supplied {@code Publisher} emits an item, {@code buffer} emits the current buffer and
6967-
* begins to fill a new one
6968-
* @return a Flowable that emits a connected, non-overlapping buffer of items from the source Publisher
6969-
* each time the Publisher created with the {@code closingIndicator} argument emits an item
6970-
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
6971-
*/
6972-
@CheckReturnValue
6973-
@BackpressureSupport(BackpressureKind.ERROR)
6974-
@SchedulerSupport(SchedulerSupport.NONE)
6975-
public final <B> Flowable<List<T>> buffer(Supplier<? extends Publisher<B>> boundaryIndicatorSupplier) {
6976-
return buffer(boundaryIndicatorSupplier, ArrayListSupplier.<T>asSupplier());
6977-
}
6978-
6979-
/**
6980-
* Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting
6981-
* Publisher emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a
6982-
* new buffer whenever the Publisher produced by the specified {@code boundaryIndicatorSupplier} emits an item.
6983-
* <p>
6984-
* <img width="640" height="395" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer1.png" alt="">
6985-
* <p>
6986-
* If either the source {@code Publisher} or the boundary {@code Publisher} issues an {@code onError} notification the event is passed on
6987-
* immediately without first emitting the buffer it is in the process of assembling.
6988-
* <dl>
6989-
* <dt><b>Backpressure:</b></dt>
6990-
* <dd>This operator does not support backpressure as it is instead controlled by the given Publishers and
6991-
* buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.</dd>
6992-
* <dt><b>Scheduler:</b></dt>
6993-
* <dd>This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.</dd>
6994-
* </dl>
6995-
*
6996-
* @param <U> the collection subclass type to buffer into
6997-
* @param <B> the value type of the boundary-providing Publisher
6998-
* @param boundaryIndicatorSupplier
6999-
* a {@link Callable} that produces a Publisher that governs the boundary between buffers.
7000-
* Whenever the supplied {@code Publisher} emits an item, {@code buffer} emits the current buffer and
7001-
* begins to fill a new one
7002-
* @param bufferSupplier
7003-
* a factory function that returns an instance of the collection subclass to be used and returned
7004-
* as the buffer
7005-
* @return a Flowable that emits a connected, non-overlapping buffer of items from the source Publisher
7006-
* each time the Publisher created with the {@code closingIndicator} argument emits an item
7007-
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
7008-
*/
7009-
@CheckReturnValue
7010-
@BackpressureSupport(BackpressureKind.ERROR)
7011-
@SchedulerSupport(SchedulerSupport.NONE)
7012-
public final <B, U extends Collection<? super T>> Flowable<U> buffer(Supplier<? extends Publisher<B>> boundaryIndicatorSupplier,
7013-
Supplier<U> bufferSupplier) {
7014-
ObjectHelper.requireNonNull(boundaryIndicatorSupplier, "boundaryIndicatorSupplier is null");
7015-
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
7016-
return RxJavaPlugins.onAssembly(new FlowableBufferBoundarySupplier<T, U, B>(this, boundaryIndicatorSupplier, bufferSupplier));
7017-
}
7018-
70196946
/**
70206947
* Returns a Flowable that subscribes to this Publisher lazily, caches all of its events
70216948
* and replays them, in the same order as received, to all the downstream subscribers.
@@ -12186,7 +12113,7 @@ public final Flowable<T> onBackpressureLatest() {
1218612113
@SchedulerSupport(SchedulerSupport.NONE)
1218712114
public final Flowable<T> onErrorResumeNext(Function<? super Throwable, ? extends Publisher<? extends T>> resumeFunction) {
1218812115
ObjectHelper.requireNonNull(resumeFunction, "resumeFunction is null");
12189-
return RxJavaPlugins.onAssembly(new FlowableOnErrorNext<T>(this, resumeFunction, false));
12116+
return RxJavaPlugins.onAssembly(new FlowableOnErrorNext<T>(this, resumeFunction));
1219012117
}
1219112118

1219212119
/**
@@ -12313,53 +12240,6 @@ public final Flowable<T> onErrorReturnItem(final T item) {
1231312240
return onErrorReturn(Functions.justFunction(item));
1231412241
}
1231512242

12316-
/**
12317-
* Instructs a Publisher to pass control to another Publisher rather than invoking
12318-
* {@link Subscriber#onError onError} if it encounters an {@link java.lang.Exception}.
12319-
* <p>
12320-
* This differs from {@link #onErrorResumeNext} in that this one does not handle {@link java.lang.Throwable}
12321-
* or {@link java.lang.Error} but lets those continue through.
12322-
* <p>
12323-
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onExceptionResumeNextViaPublisher.png" alt="">
12324-
* <p>
12325-
* By default, when a Publisher encounters an exception that prevents it from emitting the expected item
12326-
* to its {@link Subscriber}, the Publisher invokes its Subscriber's {@code onError} method, and then quits
12327-
* without invoking any more of its Subscriber's methods. The {@code onExceptionResumeNext} method changes
12328-
* this behavior. If you pass another Publisher ({@code resumeSequence}) to a Publisher's
12329-
* {@code onExceptionResumeNext} method, if the original Publisher encounters an exception, instead of
12330-
* invoking its Subscriber's {@code onError} method, it will instead relinquish control to
12331-
* {@code resumeSequence} which will invoke the Subscriber's {@link Subscriber#onNext onNext} method if it is
12332-
* able to do so. In such a case, because no Publisher necessarily invokes {@code onError}, the Subscriber
12333-
* may never know that an exception happened.
12334-
* <p>
12335-
* You can use this to prevent exceptions from propagating or to supply fallback data should exceptions be
12336-
* encountered.
12337-
* <dl>
12338-
* <dt><b>Backpressure:</b></dt>
12339-
* <dd>The operator honors backpressure from downstream. This and the resuming {@code Publisher}s
12340-
* are expected to honor backpressure as well.
12341-
* If any of them violate this expectation, the operator <em>may</em> throw an
12342-
* {@code IllegalStateException} when the source {@code Publisher} completes or
12343-
* {@code MissingBackpressureException} is signaled somewhere downstream.</dd>
12344-
* <dt><b>Scheduler:</b></dt>
12345-
* <dd>{@code onExceptionResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
12346-
* </dl>
12347-
*
12348-
* @param next
12349-
* the next Publisher that will take over if the source Publisher encounters
12350-
* an exception
12351-
* @return the original Publisher, with appropriately modified behavior
12352-
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
12353-
*/
12354-
@CheckReturnValue
12355-
@NonNull
12356-
@BackpressureSupport(BackpressureKind.FULL)
12357-
@SchedulerSupport(SchedulerSupport.NONE)
12358-
public final Flowable<T> onExceptionResumeNext(final Publisher<? extends T> next) {
12359-
ObjectHelper.requireNonNull(next, "next is null");
12360-
return RxJavaPlugins.onAssembly(new FlowableOnErrorNext<T>(this, Functions.justFunction(next), true));
12361-
}
12362-
1236312243
/**
1236412244
* Nulls out references to the upstream producer and downstream Subscriber if
1236512245
* the sequence is terminated or downstream cancels.
@@ -18290,77 +18170,6 @@ public final <U, V> Flowable<Flowable<T>> window(
1829018170
return RxJavaPlugins.onAssembly(new FlowableWindowBoundarySelector<T, U, V>(this, openingIndicator, closingIndicator, bufferSize));
1829118171
}
1829218172

18293-
/**
18294-
* Returns a Flowable that emits windows of items it collects from the source Publisher. The resulting
18295-
* Publisher emits connected, non-overlapping windows. It emits the current window and opens a new one
18296-
* whenever the Publisher produced by the specified {@code closingSelector} emits an item.
18297-
* <p>
18298-
* <img width="640" height="455" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window1.png" alt="">
18299-
* <dl>
18300-
* <dt><b>Backpressure:</b></dt>
18301-
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
18302-
* The returned {@code Publisher} doesn't support backpressure as it uses
18303-
* the {@code closingSelector} to control the creation of windows. The returned inner {@code Publisher}s honor
18304-
* backpressure but have an unbounded inner buffer that <em>may</em> lead to {@code OutOfMemoryError}
18305-
* if left unconsumed.</dd>
18306-
* <dt><b>Scheduler:</b></dt>
18307-
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
18308-
* </dl>
18309-
*
18310-
* @param <B> the element type of the boundary Publisher
18311-
* @param boundaryIndicatorSupplier
18312-
* a {@link Supplier} that returns a {@code Publisher} that governs the boundary between windows.
18313-
* When the source {@code Publisher} emits an item, {@code window} emits the current window and begins
18314-
* a new one.
18315-
* @return a Flowable that emits connected, non-overlapping windows of items from the source Publisher
18316-
* whenever {@code closingSelector} emits an item
18317-
* @see <a href="http://reactivex.io/documentation/operators/window.html">ReactiveX operators documentation: Window</a>
18318-
*/
18319-
@CheckReturnValue
18320-
@BackpressureSupport(BackpressureKind.ERROR)
18321-
@SchedulerSupport(SchedulerSupport.NONE)
18322-
public final <B> Flowable<Flowable<T>> window(Supplier<? extends Publisher<B>> boundaryIndicatorSupplier) {
18323-
return window(boundaryIndicatorSupplier, bufferSize());
18324-
}
18325-
18326-
/**
18327-
* Returns a Flowable that emits windows of items it collects from the source Publisher. The resulting
18328-
* Publisher emits connected, non-overlapping windows. It emits the current window and opens a new one
18329-
* whenever the Publisher produced by the specified {@code closingSelector} emits an item.
18330-
* <p>
18331-
* <img width="640" height="455" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window1.png" alt="">
18332-
* <dl>
18333-
* <dt><b>Backpressure:</b></dt>
18334-
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
18335-
* The returned {@code Publisher} doesn't support backpressure as it uses
18336-
* the {@code closingSelector} to control the creation of windows. The returned inner {@code Publisher}s honor
18337-
* backpressure but have an unbounded inner buffer that <em>may</em> lead to {@code OutOfMemoryError}
18338-
* if left unconsumed.</dd>
18339-
* <dt><b>Scheduler:</b></dt>
18340-
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
18341-
* </dl>
18342-
*
18343-
* @param <B> the element type of the boundary Publisher
18344-
* @param boundaryIndicatorSupplier
18345-
* a {@link Supplier} that returns a {@code Publisher} that governs the boundary between windows.
18346-
* When the source {@code Publisher} emits an item, {@code window} emits the current window and begins
18347-
* a new one.
18348-
* @param bufferSize
18349-
* the capacity hint for the buffer in the inner windows
18350-
* @return a Flowable that emits connected, non-overlapping windows of items from the source Publisher
18351-
* whenever {@code closingSelector} emits an item
18352-
* @see <a href="http://reactivex.io/documentation/operators/window.html">ReactiveX operators documentation: Window</a>
18353-
*/
18354-
@CheckReturnValue
18355-
@NonNull
18356-
@BackpressureSupport(BackpressureKind.ERROR)
18357-
@SchedulerSupport(SchedulerSupport.NONE)
18358-
public final <B> Flowable<Flowable<T>> window(Supplier<? extends Publisher<B>> boundaryIndicatorSupplier, int bufferSize) {
18359-
ObjectHelper.requireNonNull(boundaryIndicatorSupplier, "boundaryIndicatorSupplier is null");
18360-
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
18361-
return RxJavaPlugins.onAssembly(new FlowableWindowBoundarySupplier<T, B>(this, boundaryIndicatorSupplier, bufferSize));
18362-
}
18363-
1836418173
/**
1836518174
* Merges the specified Publisher into this Publisher sequence by using the {@code resultSelector}
1836618175
* function only when the source Publisher (this instance) emits an item.

0 commit comments

Comments
 (0)