Skip to content

Commit 5ce21f4

Browse files
authored
2.x: Add throttleLatest operator (#5979)
1 parent 8a28169 commit 5ce21f4

File tree

7 files changed

+1259
-0
lines changed

7 files changed

+1259
-0
lines changed

src/main/java/io/reactivex/Flowable.java

+152
Original file line numberDiff line numberDiff line change
@@ -15536,6 +15536,158 @@ public final Flowable<T> throttleLast(long intervalDuration, TimeUnit unit, Sche
1553615536
return sample(intervalDuration, unit, scheduler);
1553715537
}
1553815538

15539+
/**
15540+
* Throttles items from the upstream {@code Flowable} by first emitting the next
15541+
* item from upstream, then periodically emitting the latest item (if any) when
15542+
* the specified timeout elapses between them.
15543+
* <p>
15544+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.png" alt="">
15545+
* <p>
15546+
* Unlike the option with {@link #throttleLatest(long, TimeUnit, boolean)}, the very last item being held back
15547+
* (if any) is not emitted when the upstream completes.
15548+
* <p>
15549+
* If no items were emitted from the upstream during this timeout phase, the next
15550+
* upstream item is emitted immediately and the timeout window starts from then.
15551+
* <dl>
15552+
* <dt><b>Backpressure:</b></dt>
15553+
* <dd>This operator does not support backpressure as it uses time to control data flow.
15554+
* If the downstream is not ready to receive items, a
15555+
* {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
15556+
* will be signaled.</dd>
15557+
* <dt><b>Scheduler:</b></dt>
15558+
* <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
15559+
* </dl>
15560+
* @param timeout the time to wait after an item emission towards the downstream
15561+
* before trying to emit the latest item from upstream again
15562+
* @param unit the time unit
15563+
* @return the new Flowable instance
15564+
* @since 2.1.14 - experimental
15565+
* @see #throttleLatest(long, TimeUnit, boolean)
15566+
* @see #throttleLatest(long, TimeUnit, Scheduler)
15567+
*/
15568+
@Experimental
15569+
@CheckReturnValue
15570+
@BackpressureSupport(BackpressureKind.ERROR)
15571+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
15572+
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit) {
15573+
return throttleLatest(timeout, unit, Schedulers.computation(), false);
15574+
}
15575+
15576+
/**
15577+
* Throttles items from the upstream {@code Flowable} by first emitting the next
15578+
* item from upstream, then periodically emitting the latest item (if any) when
15579+
* the specified timeout elapses between them.
15580+
* <p>
15581+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.e.png" alt="">
15582+
* <p>
15583+
* If no items were emitted from the upstream during this timeout phase, the next
15584+
* upstream item is emitted immediately and the timeout window starts from then.
15585+
* <dl>
15586+
* <dt><b>Backpressure:</b></dt>
15587+
* <dd>This operator does not support backpressure as it uses time to control data flow.
15588+
* If the downstream is not ready to receive items, a
15589+
* {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
15590+
* will be signaled.</dd>
15591+
* <dt><b>Scheduler:</b></dt>
15592+
* <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
15593+
* </dl>
15594+
* @param timeout the time to wait after an item emission towards the downstream
15595+
* before trying to emit the latest item from upstream again
15596+
* @param unit the time unit
15597+
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
15598+
* immediately when the upstream completes, regardless if there is
15599+
* a timeout window active or not. If {@code false}, the very last
15600+
* upstream item is ignored and the flow terminates.
15601+
* @return the new Flowable instance
15602+
* @since 2.1.14 - experimental
15603+
* @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
15604+
*/
15605+
@Experimental
15606+
@CheckReturnValue
15607+
@BackpressureSupport(BackpressureKind.ERROR)
15608+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
15609+
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, boolean emitLast) {
15610+
return throttleLatest(timeout, unit, Schedulers.computation(), emitLast);
15611+
}
15612+
15613+
/**
15614+
* Throttles items from the upstream {@code Flowable} by first emitting the next
15615+
* item from upstream, then periodically emitting the latest item (if any) when
15616+
* the specified timeout elapses between them.
15617+
* <p>
15618+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.s.png" alt="">
15619+
* <p>
15620+
* Unlike the option with {@link #throttleLatest(long, TimeUnit, Scheduler, boolean)}, the very last item being held back
15621+
* (if any) is not emitted when the upstream completes.
15622+
* <p>
15623+
* If no items were emitted from the upstream during this timeout phase, the next
15624+
* upstream item is emitted immediately and the timeout window starts from then.
15625+
* <dl>
15626+
* <dt><b>Backpressure:</b></dt>
15627+
* <dd>This operator does not support backpressure as it uses time to control data flow.
15628+
* If the downstream is not ready to receive items, a
15629+
* {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
15630+
* will be signaled.</dd>
15631+
* <dt><b>Scheduler:</b></dt>
15632+
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
15633+
* </dl>
15634+
* @param timeout the time to wait after an item emission towards the downstream
15635+
* before trying to emit the latest item from upstream again
15636+
* @param unit the time unit
15637+
* @param scheduler the {@link Scheduler} where the timed wait and latest item
15638+
* emission will be performed
15639+
* @return the new Flowable instance
15640+
* @since 2.1.14 - experimental
15641+
* @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
15642+
*/
15643+
@Experimental
15644+
@CheckReturnValue
15645+
@BackpressureSupport(BackpressureKind.ERROR)
15646+
@SchedulerSupport(SchedulerSupport.CUSTOM)
15647+
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler) {
15648+
return throttleLatest(timeout, unit, scheduler, false);
15649+
}
15650+
15651+
/**
15652+
* Throttles items from the upstream {@code Flowable} by first emitting the next
15653+
* item from upstream, then periodically emitting the latest item (if any) when
15654+
* the specified timeout elapses between them.
15655+
* <p>
15656+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
15657+
* <p>
15658+
* If no items were emitted from the upstream during this timeout phase, the next
15659+
* upstream item is emitted immediately and the timeout window starts from then.
15660+
* <dl>
15661+
* <dt><b>Backpressure:</b></dt>
15662+
* <dd>This operator does not support backpressure as it uses time to control data flow.
15663+
* If the downstream is not ready to receive items, a
15664+
* {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
15665+
* will be signaled.</dd>
15666+
* <dt><b>Scheduler:</b></dt>
15667+
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
15668+
* </dl>
15669+
* @param timeout the time to wait after an item emission towards the downstream
15670+
* before trying to emit the latest item from upstream again
15671+
* @param unit the time unit
15672+
* @param scheduler the {@link Scheduler} where the timed wait and latest item
15673+
* emission will be performed
15674+
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
15675+
* immediately when the upstream completes, regardless if there is
15676+
* a timeout window active or not. If {@code false}, the very last
15677+
* upstream item is ignored and the flow terminates.
15678+
* @return the new Flowable instance
15679+
* @since 2.1.14 - experimental
15680+
*/
15681+
@Experimental
15682+
@CheckReturnValue
15683+
@BackpressureSupport(BackpressureKind.ERROR)
15684+
@SchedulerSupport(SchedulerSupport.CUSTOM)
15685+
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
15686+
ObjectHelper.requireNonNull(unit, "unit is null");
15687+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
15688+
return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<T>(this, timeout, unit, scheduler, emitLast));
15689+
}
15690+
1553915691
/**
1554015692
* Returns a Flowable that only emits those items emitted by the source Publisher that are not followed
1554115693
* by another emitted item within a specified time window.

src/main/java/io/reactivex/Observable.java

+128
Original file line numberDiff line numberDiff line change
@@ -13055,6 +13055,134 @@ public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Sc
1305513055
return sample(intervalDuration, unit, scheduler);
1305613056
}
1305713057

13058+
/**
13059+
* Throttles items from the upstream {@code Observable} by first emitting the next
13060+
* item from upstream, then periodically emitting the latest item (if any) when
13061+
* the specified timeout elapses between them.
13062+
* <p>
13063+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.png" alt="">
13064+
* <p>
13065+
* Unlike the option with {@link #throttleLatest(long, TimeUnit, boolean)}, the very last item being held back
13066+
* (if any) is not emitted when the upstream completes.
13067+
* <p>
13068+
* If no items were emitted from the upstream during this timeout phase, the next
13069+
* upstream item is emitted immediately and the timeout window starts from then.
13070+
* <dl>
13071+
* <dt><b>Scheduler:</b></dt>
13072+
* <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
13073+
* </dl>
13074+
* @param timeout the time to wait after an item emission towards the downstream
13075+
* before trying to emit the latest item from upstream again
13076+
* @param unit the time unit
13077+
* @return the new Observable instance
13078+
* @since 2.1.14 - experimental
13079+
* @see #throttleLatest(long, TimeUnit, boolean)
13080+
* @see #throttleLatest(long, TimeUnit, Scheduler)
13081+
*/
13082+
@Experimental
13083+
@CheckReturnValue
13084+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
13085+
public final Observable<T> throttleLatest(long timeout, TimeUnit unit) {
13086+
return throttleLatest(timeout, unit, Schedulers.computation(), false);
13087+
}
13088+
13089+
/**
13090+
* Throttles items from the upstream {@code Observable} by first emitting the next
13091+
* item from upstream, then periodically emitting the latest item (if any) when
13092+
* the specified timeout elapses between them.
13093+
* <p>
13094+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.e.png" alt="">
13095+
* <p>
13096+
* If no items were emitted from the upstream during this timeout phase, the next
13097+
* upstream item is emitted immediately and the timeout window starts from then.
13098+
* <dl>
13099+
* <dt><b>Scheduler:</b></dt>
13100+
* <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
13101+
* </dl>
13102+
* @param timeout the time to wait after an item emission towards the downstream
13103+
* before trying to emit the latest item from upstream again
13104+
* @param unit the time unit
13105+
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
13106+
* immediately when the upstream completes, regardless if there is
13107+
* a timeout window active or not. If {@code false}, the very last
13108+
* upstream item is ignored and the flow terminates.
13109+
* @return the new Observable instance
13110+
* @since 2.1.14 - experimental
13111+
* @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
13112+
*/
13113+
@Experimental
13114+
@CheckReturnValue
13115+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
13116+
public final Observable<T> throttleLatest(long timeout, TimeUnit unit, boolean emitLast) {
13117+
return throttleLatest(timeout, unit, Schedulers.computation(), emitLast);
13118+
}
13119+
13120+
/**
13121+
* Throttles items from the upstream {@code Observable} by first emitting the next
13122+
* item from upstream, then periodically emitting the latest item (if any) when
13123+
* the specified timeout elapses between them.
13124+
* <p>
13125+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.s.png" alt="">
13126+
* <p>
13127+
* Unlike the option with {@link #throttleLatest(long, TimeUnit, Scheduler, boolean)}, the very last item being held back
13128+
* (if any) is not emitted when the upstream completes.
13129+
* <p>
13130+
* If no items were emitted from the upstream during this timeout phase, the next
13131+
* upstream item is emitted immediately and the timeout window starts from then.
13132+
* <dl>
13133+
* <dt><b>Scheduler:</b></dt>
13134+
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
13135+
* </dl>
13136+
* @param timeout the time to wait after an item emission towards the downstream
13137+
* before trying to emit the latest item from upstream again
13138+
* @param unit the time unit
13139+
* @param scheduler the {@link Scheduler} where the timed wait and latest item
13140+
* emission will be performed
13141+
* @return the new Observable instance
13142+
* @since 2.1.14 - experimental
13143+
* @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
13144+
*/
13145+
@Experimental
13146+
@CheckReturnValue
13147+
@SchedulerSupport(SchedulerSupport.CUSTOM)
13148+
public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler) {
13149+
return throttleLatest(timeout, unit, scheduler, false);
13150+
}
13151+
13152+
/**
13153+
* Throttles items from the upstream {@code Observable} by first emitting the next
13154+
* item from upstream, then periodically emitting the latest item (if any) when
13155+
* the specified timeout elapses between them.
13156+
* <p>
13157+
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
13158+
* <p>
13159+
* If no items were emitted from the upstream during this timeout phase, the next
13160+
* upstream item is emitted immediately and the timeout window starts from then.
13161+
* <dl>
13162+
* <dt><b>Scheduler:</b></dt>
13163+
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
13164+
* </dl>
13165+
* @param timeout the time to wait after an item emission towards the downstream
13166+
* before trying to emit the latest item from upstream again
13167+
* @param unit the time unit
13168+
* @param scheduler the {@link Scheduler} where the timed wait and latest item
13169+
* emission will be performed
13170+
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
13171+
* immediately when the upstream completes, regardless if there is
13172+
* a timeout window active or not. If {@code false}, the very last
13173+
* upstream item is ignored and the flow terminates.
13174+
* @return the new Observable instance
13175+
* @since 2.1.14 - experimental
13176+
*/
13177+
@Experimental
13178+
@CheckReturnValue
13179+
@SchedulerSupport(SchedulerSupport.CUSTOM)
13180+
public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
13181+
ObjectHelper.requireNonNull(unit, "unit is null");
13182+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
13183+
return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<T>(this, timeout, unit, scheduler, emitLast));
13184+
}
13185+
1305813186
/**
1305913187
* Returns an Observable that only emits those items emitted by the source ObservableSource that are not followed
1306013188
* by another emitted item within a specified time window.

0 commit comments

Comments
 (0)