+
This next example, in Clojure, consumes three asynchronous Observables, including a dependency from one to another, and emits a single response item by combining the items emitted by each of the three Observables with the [`zip`](http://reactivex.io/documentation/operators/zip.html) operator and then transforming the result with [`map`](http://reactivex.io/documentation/operators/map.html):
@@ -333,7 +333,7 @@ The response looks like this:
And here is a marble diagram that illustrates how that code produces that response:
-
+
The following example, in Groovy, comes from [Ben Christensen’s QCon presentation on the evolution of the Netflix API](https://speakerdeck.com/benjchristensen/evolution-of-the-netflix-api-qcon-sf-2013). It combines two Observables with the [`merge`](http://reactivex.io/documentation/operators/merge.html) operator, then uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to construct a single item out of the resulting sequence, then transforms that item with [`map`](http://reactivex.io/documentation/operators/map.html) before emitting it:
@@ -350,7 +350,7 @@ public Observable getVideoSummary(APIVideo video) {
And here is a marble diagram that illustrates how that code uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to bring the results from multiple Observables together in one structure:
-
+
## Error Handling
diff --git a/docs/Phantom-Operators.md b/docs/Phantom-Operators.md
index b01ac28ff2..5193147a22 100644
--- a/docs/Phantom-Operators.md
+++ b/docs/Phantom-Operators.md
@@ -127,7 +127,7 @@ streamOfItems.flatMap(item -> {
itemToObservable(item).subscribeOn(Schedulers.io());
});
```
-Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asychronous calls.
+Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asynchronous calls.
#### see also:
* RxJava Threading Examples by Graham Lea
diff --git a/docs/What's-different-in-2.0.md b/docs/What's-different-in-2.0.md
index fac50df56d..edc681fa7f 100644
--- a/docs/What's-different-in-2.0.md
+++ b/docs/What's-different-in-2.0.md
@@ -450,12 +450,12 @@ Before 2.0.7, the operator `strict()` had to be applied in order to achieve the
As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a custom `io.reactivex.FlowableSubscriber` interface (extends `org.reactivestreams.Subscriber`) but adds no new methods to it. The new interface is **constrained to RxJava 2** and represents a consumer to `Flowable` that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9:
- - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
+ - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.
-From a user's perspective, if one was using the the `subscribe` methods other than `Flowable.subscribe(Subscriber super T>)`, there is no need to do anything regarding this change and there is no extra penalty for it.
+From a user's perspective, if one was using the `subscribe` methods other than `Flowable.subscribe(Subscriber super T>)`, there is no need to do anything regarding this change and there is no extra penalty for it.
If one was using `Flowable.subscribe(Subscriber super T>)` with the built-in RxJava `Subscriber` implementations such as `DisposableSubscriber`, `TestSubscriber` and `ResourceSubscriber`, there is a small runtime overhead (one `instanceof` check) associated when the code is not recompiled against 2.0.7.
diff --git a/docs/Writing-operators-for-2.0.md b/docs/Writing-operators-for-2.0.md
index e8486564b1..1a51664880 100644
--- a/docs/Writing-operators-for-2.0.md
+++ b/docs/Writing-operators-for-2.0.md
@@ -565,7 +565,7 @@ Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Sub
The rule relaxations are as follows:
-- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
+- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.
diff --git a/gradle/javadoc_cleanup.gradle b/gradle/javadoc_cleanup.gradle
index 5f120656e0..63b4f7f045 100644
--- a/gradle/javadoc_cleanup.gradle
+++ b/gradle/javadoc_cleanup.gradle
@@ -24,6 +24,12 @@ task javadocCleanup(dependsOn: "javadoc") doLast {
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/plugins/RxJavaPlugins.html'))
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/parallel/ParallelFlowable.html'))
+
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/disposables/Disposable.html'))
+
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observers/TestObserver.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observers/BaseTestConsumer.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subscribers/TestSubscriber.html'))
}
def fixJavadocFile(file) {
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 7454180f2a..afba109285 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 2e6e5897b5..3c44eb1b6f 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
+networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index c53aefaa5f..65dcd68d65 100755
--- a/gradlew
+++ b/gradlew
@@ -1,7 +1,7 @@
#!/bin/sh
#
-# Copyright © 2015-2021 the original authors.
+# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -32,10 +32,10 @@
# Busybox and similar reduced shells will NOT work, because this script
# requires all of these POSIX shell features:
# * functions;
-# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
-# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
-# * compound commands having a testable exit status, especially «case»;
-# * various built-in commands including «command», «set», and «ulimit».
+# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+# * compound commands having a testable exit status, especially «case»;
+# * various built-in commands including «command», «set», and «ulimit».
#
# Important for patching:
#
@@ -55,7 +55,7 @@
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
-# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
@@ -80,10 +80,10 @@ do
esac
done
-APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
-
-APP_NAME="Gradle"
+# This is normally unused
+# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
+APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
@@ -143,12 +143,16 @@ fi
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
+ # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
+ # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
@@ -205,6 +209,12 @@ set -- \
org.gradle.wrapper.GradleWrapperMain \
"$@"
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
+
# Use "xargs" to parse quoted args.
#
# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
diff --git a/gradlew.bat b/gradlew.bat
index 107acd32c4..93e3f59f13 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -14,7 +14,7 @@
@rem limitations under the License.
@rem
-@if "%DEBUG%" == "" @echo off
+@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@@ -25,7 +25,8 @@
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.
+if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@@ -40,7 +41,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
-if "%ERRORLEVEL%" == "0" goto execute
+if %ERRORLEVEL% equ 0 goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@@ -75,13 +76,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
:end
@rem End local scope for the variables with windows NT shell
-if "%ERRORLEVEL%"=="0" goto mainEnd
+if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
-if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
-exit /b 1
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
index c4a0a385a5..6e6ae7e3c6 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
@@ -139,9 +139,9 @@ public Observable extends Integer> apply(Integer v) {
}
});
- singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function
* * Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}. - * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)} + * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)} * around {@code fromCompletionStage}: *
* Maybe.defer(() -> Completable.fromCompletionStage(createCompletionStage()));
diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index 6cbdf0a412..39f3c63b43 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -2040,7 +2040,7 @@ public static int bufferSize() {
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
- public static <@NonNull T> Flowable defer(@NonNull Supplier extends Publisher extends T>> supplier) {
+ public static <@NonNull T> Flowable defer(@NonNull Supplier extends @NonNull Publisher extends T>> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new FlowableDefer<>(supplier));
}
@@ -2095,7 +2095,7 @@ public static int bufferSize() {
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
- public static <@NonNull T> Flowable error(@NonNull Supplier extends Throwable> supplier) {
+ public static <@NonNull T> Flowable error(@NonNull Supplier extends @NonNull Throwable> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new FlowableError<>(supplier));
}
@@ -7475,8 +7475,10 @@ public final Flowable cacheWithInitialCapacity(int initialCapacity) {
}
/**
- * Returns a {@code Flowable} that emits the items emitted by the current {@code Flowable}, converted to the specified
- * type.
+ * Returns a {@code Flowable} that emits the upstream items while
+ * they can be cast via {@link Class#cast(Object)} until the upstream terminates,
+ * or until the upstream signals an item which can't be cast,
+ * resulting in a {@link ClassCastException} to be signaled to the downstream.
*
*
*
@@ -7489,8 +7491,7 @@ public final Flowable cacheWithInitialCapacity(int initialCapacity) {
*
* @param the output value type cast to
* @param clazz
- * the target class type that {@code cast} will cast the items emitted by the current {@code Flowable}
- * into before emitting them from the resulting {@code Flowable}
+ * the target class to use to try and cast the upstream items into
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code clazz} is {@code null}
* @see ReactiveX operators documentation: Map
@@ -8265,7 +8266,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or completes, emits their success value if available or terminates immediately if
* either this {@code Flowable} or the current inner {@code MaybeSource} fail.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8298,7 +8299,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or completes, emits their success value if available or terminates immediately if
* either this {@code Flowable} or the current inner {@code MaybeSource} fail.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8338,7 +8339,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and delaying all errors
* till both this {@code Flowable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8371,7 +8372,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and optionally delaying all errors
* till both this {@code Flowable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8410,7 +8411,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and optionally delaying all errors
* till both this {@code Flowable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8455,7 +8456,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds, emits their success values or terminates immediately if
* either this {@code Flowable} or the current inner {@code SingleSource} fail.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8488,7 +8489,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds, emits their success values or terminates immediately if
* either this {@code Flowable} or the current inner {@code SingleSource} fail.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8528,7 +8529,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and delays all errors
* till both this {@code Flowable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8561,7 +8562,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and optionally delays all errors
* till both this {@code Flowable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8600,7 +8601,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and optionally delays errors
* till both this {@code Flowable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
* - Backpressure:
* - The operator expects the upstream to support backpressure and honors
@@ -8930,7 +8931,59 @@ public final Flowable
debounce(long timeout, @NonNull TimeUnit unit) {
public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler));
+ return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, null));
+ }
+
+ /**
+ * Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
+ * current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
+ * {@link Scheduler}. The timer resets on each emission.
+ *
+ * Note: If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
+ * will be emitted by the resulting {@code Flowable}.
+ *
+ *
+ *
+ * Delivery of the item after the grace period happens on the given {@code Scheduler}'s
+ * {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the
+ * {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation
+ * (yielding an {@code InterruptedException}). It is recommended processing items
+ * that may take long time to be moved to another thread via {@link #observeOn} applied after
+ * {@code debounce} itself.
+ *
+ * - Backpressure:
+ * - This operator does not support backpressure as it uses time to control data flow.
+ * - Scheduler:
+ * - You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param timeout
+ * the time each item has to be "the most recent" of those emitted by the current {@code Flowable} to
+ * ensure that it's not dropped
+ * @param unit
+ * the unit of time for the specified {@code timeout}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
+ * item
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Debounce
+ * @see RxJava wiki: Backpressure
+ * @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @Experimental
+ public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, onDropped));
}
/**
@@ -10148,7 +10201,7 @@ public final Maybe firstElement() {
* Returns a {@link Single} that emits only the very first item emitted by this {@code Flowable}, or a default
* item if this {@code Flowable} completes without emitting anything.
*
- *
+ *
*
* - Backpressure:
* - The operator honors backpressure from downstream and consumes the current {@code Flowable} in a bounded manner.
@@ -10904,6 +10957,8 @@ public final Completable flatMapCompletable(@NonNull Function super T, ? exten
/**
* Maps each element of the upstream {@code Flowable} into {@link MaybeSource}s, subscribes to all of them
* and merges their {@code onSuccess} values, in no particular order, into a single {@code Flowable} sequence.
+ *
+ *
*
* - Backpressure:
* - The operator consumes the upstream in an unbounded manner.
@@ -10927,6 +10982,8 @@ public final Completable flatMapCompletable(@NonNull Function super T, ? exten
* Maps each element of the upstream {@code Flowable} into {@link MaybeSource}s, subscribes to at most
* {@code maxConcurrency} {@code MaybeSource}s at a time and merges their {@code onSuccess} values,
* in no particular order, into a single {@code Flowable} sequence, optionally delaying all errors.
+ *
+ *
*
* - Backpressure:
* - If {@code maxConcurrency == }{@link Integer#MAX_VALUE} the operator consumes the upstream in an unbounded manner.
@@ -12494,7 +12551,7 @@ public final Flowable
onBackpressureBuffer(int capacity, boolean delayError)
@NonNull
public final Flowable onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded) {
ObjectHelper.verifyPositive(capacity, "capacity");
- return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION));
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION, Functions.emptyConsumer()));
}
/**
@@ -12525,6 +12582,7 @@ public final Flowable onBackpressureBuffer(int capacity, boolean delayError,
* @throws NullPointerException if {@code onOverflow} is {@code null}
* @throws IllegalArgumentException if {@code capacity} is non-positive
* @see ReactiveX operators documentation: backpressure operators
+ * @see #onBackpressureBuffer(int, boolean, boolean, Action, Consumer)
* @since 1.1.0
*/
@CheckReturnValue
@@ -12535,7 +12593,51 @@ public final Flowable onBackpressureBuffer(int capacity, boolean delayError,
@NonNull Action onOverflow) {
Objects.requireNonNull(onOverflow, "onOverflow is null");
ObjectHelper.verifyPositive(capacity, "capacity");
- return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow));
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow, Functions.emptyConsumer()));
+ }
+
+ /**
+ * Buffers an optionally unlimited number of items from the current {@code Flowable} and allows it to emit as fast it can while allowing the
+ * downstream to consume the items at its own place.
+ * If {@code unbounded} is {@code true}, the resulting {@code Flowable} will signal a
+ * {@link MissingBackpressureException} via {@code onError} as soon as the buffer's capacity is exceeded, dropping all undelivered
+ * items, canceling the flow and calling the {@code onOverflow} action.
+ *
+ *
+ *
+ * - Backpressure:
+ * - The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
+ * manner (i.e., not applying backpressure to it).
+ * - Scheduler:
+ * - {@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param capacity number of slots available in the buffer.
+ * @param delayError
+ * if {@code true}, an exception from the current {@code Flowable} is delayed until all buffered elements have been
+ * consumed by the downstream; if {@code false}, an exception is immediately signaled to the downstream, skipping
+ * any buffered element
+ * @param unbounded
+ * if {@code true}, the capacity value is interpreted as the internal "island" size of the unbounded buffer
+ * @param onOverflow action to execute if an item needs to be buffered, but there are no available slots.
+ * @param onDropped the {@link Consumer} to be called with the item that could not be buffered due to capacity constraints.
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code onOverflow} or {@code onDropped} is {@code null}
+ * @throws IllegalArgumentException if {@code capacity} is non-positive
+ * @see ReactiveX operators documentation: backpressure operators
+ * @since 3.1.7
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.SPECIAL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @Experimental
+ public final Flowable onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded,
+ @NonNull Action onOverflow, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(onOverflow, "onOverflow is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ ObjectHelper.verifyPositive(capacity, "capacity");
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow, onDropped));
}
/**
@@ -12601,6 +12703,7 @@ public final Flowable onBackpressureBuffer(int capacity, @NonNull Action onOv
* @throws NullPointerException if {@code onOverflow} or {@code overflowStrategy} is {@code null}
* @throws IllegalArgumentException if {@code capacity} is non-positive
* @see ReactiveX operators documentation: backpressure operators
+ * @see #onBackpressureBuffer(long, Action, BackpressureOverflowStrategy)
* @since 2.0
*/
@CheckReturnValue
@@ -12610,9 +12713,55 @@ public final Flowable onBackpressureBuffer(int capacity, @NonNull Action onOv
public final Flowable onBackpressureBuffer(long capacity, @Nullable Action onOverflow, @NonNull BackpressureOverflowStrategy overflowStrategy) {
Objects.requireNonNull(overflowStrategy, "overflowStrategy is null");
ObjectHelper.verifyPositive(capacity, "capacity");
- return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy));
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy, null));
}
+ /**
+ * Buffers an optionally unlimited number of items from the current {@code Flowable} and allows it to emit as fast it can while allowing the
+ * downstream to consume the items at its own place.
+ * The resulting {@code Flowable} will behave as determined by {@code overflowStrategy} if the buffer capacity is exceeded:
+ *
+ * - {@link BackpressureOverflowStrategy#ERROR} (default) will call {@code onError} dropping all undelivered items,
+ * canceling the source, and notifying the producer with {@code onOverflow}.
+ * - {@link BackpressureOverflowStrategy#DROP_LATEST} will drop any new items emitted by the producer while
+ * the buffer is full, without generating any {@code onError}. Each drop will, however, invoke {@code onOverflow}
+ * to signal the overflow to the producer.
+ * - {@link BackpressureOverflowStrategy#DROP_OLDEST} will drop the oldest items in the buffer in order to make
+ * room for newly emitted ones. Overflow will not generate an {@code onError}, but each drop will invoke
+ * {@code onOverflow} to signal the overflow to the producer.
+ *
+ *
+ *
+ *
+ *
+ * - Backpressure:
+ * - The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
+ * manner (i.e., not applying backpressure to it).
+ * - Scheduler:
+ * - {@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param capacity number of slots available in the buffer.
+ * @param onOverflow action to execute if an item needs to be buffered, but there are no available slots, {@code null} is allowed.
+ * @param overflowStrategy how should the resulting {@code Flowable} react to buffer overflows, {@code null} is not allowed.
+ * @param onDropped the {@link Consumer} to be called with the item that could not be buffered due to capacity constraints.
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code onOverflow}, {@code overflowStrategy} or {@code onDropped} is {@code null}
+ * @throws IllegalArgumentException if {@code capacity} is non-positive
+ * @see ReactiveX operators documentation: backpressure operators
+ * @since 3.1.7
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.SPECIAL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @Experimental
+ public final Flowable onBackpressureBuffer(long capacity, @Nullable Action onOverflow, @NonNull BackpressureOverflowStrategy overflowStrategy, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(overflowStrategy, "overflowStrategy is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ ObjectHelper.verifyPositive(capacity, "capacity");
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy, onDropped));
+ }
/**
* Drops items from the current {@code Flowable} if the downstream is not ready to receive new items (indicated
* by a lack of {@link Subscription#request(long)} calls from it).
@@ -12703,7 +12852,46 @@ public final Flowable onBackpressureDrop(@NonNull Consumer super T> onDrop)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Flowable onBackpressureLatest() {
- return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this));
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, null));
+ }
+
+ /**
+ * Drops all but the latest item emitted by the current {@code Flowable} if the downstream is not ready to receive
+ * new items (indicated by a lack of {@link Subscription#request(long)} calls from it) and emits this latest
+ * item when the downstream becomes ready.
+ *
+ *
+ *
+ * Its behavior is logically equivalent to {@code blockingLatest()} with the exception that
+ * the downstream is not blocking while requesting more values.
+ *
+ * Note that if the current {@code Flowable} does support backpressure, this operator ignores that capability
+ * and doesn't propagate any backpressure requests from downstream.
+ *
+ * Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
+ * requesting more than 1 from downstream doesn't guarantee a continuous delivery of {@code onNext} events.
+ *
+ * - Backpressure:
+ * - The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
+ * manner (i.e., not applying backpressure to it).
+ * - Scheduler:
+ * - {@code onBackpressureLatest} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @throws NullPointerException if {@code onDropped} is {@code null}
+ * @return the new {@code Flowable} instance
+ * @since 3.1.7
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ @Experimental
+ public final Flowable onBackpressureLatest(@NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, onDropped));
}
/**
@@ -14672,7 +14860,7 @@ public final Flowable sample(long period, @NonNull TimeUnit unit, boolean emi
public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false));
+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false, null));
}
/**
@@ -14713,7 +14901,51 @@ public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Sc
public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast));
+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
+ }
+
+ /**
+ * Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
+ * within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}
+ * and optionally emit the very last upstream item when the upstream completes.
+ *
+ *
+ *
+ * - Backpressure:
+ * - This operator does not support backpressure as it uses time to control data flow.
+ * - Scheduler:
+ * - You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param period
+ * the sampling rate
+ * @param unit
+ * the {@link TimeUnit} in which {@code period} is defined
+ * @param scheduler
+ * the {@code Scheduler} to use when sampling
+ * @param emitLast
+ * if {@code true} and the upstream completes while there is still an unsampled item available,
+ * that item is emitted to downstream before completion
+ * if {@code false}, an unsampled last item is ignored.
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Sample
+ * @see RxJava wiki: Backpressure
+ * @see #throttleLast(long, TimeUnit, Scheduler)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @Experimental
+ public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped));
}
/**
@@ -15816,7 +16048,7 @@ public final Disposable subscribe(@NonNull Consumer super T> onNext, @NonNull
* terminates or this particular {@code Disposable} is disposed, the {@code Subscriber} is removed
* from the given container.
*
- * The {@coded Subscriber} will be removed after the callback for the terminal event has been invoked.
+ * The {@code Subscriber} will be removed after the callback for the terminal event has been invoked.
*
* - Backpressure:
* - The operator consumes the current {@code Flowable} in an unbounded manner (i.e., no
@@ -17096,7 +17328,50 @@ public final Flowable
throttleFirst(long windowDuration, @NonNull TimeUnit un
public final Flowable throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler));
+ return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null));
+ }
+
+ /**
+ * Returns a {@code Flowable} that emits only the first item emitted by the current {@code Flowable} during sequential
+ * time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}.
+ *
+ * This differs from {@link #throttleLast} in that this only tracks the passage of time whereas
+ * {@link #throttleLast} ticks at scheduled intervals.
+ *
+ *
+ *
+ * - Backpressure:
+ * - This operator does not support backpressure as it uses time to control data flow.
+ * - Scheduler:
+ * - You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param skipDuration
+ * time to wait before emitting another item after emitting the last item
+ * @param unit
+ * the unit of time of {@code skipDuration}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle timeout for each
+ * event
+ * @param onDropped
+ * called when an item doesn't get delivered to the downstream
+ *
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Sample
+ * @see RxJava wiki: Backpressure
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @Experimental
+ public final Flowable throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped));
}
/**
@@ -17170,6 +17445,47 @@ public final Flowable throttleLast(long intervalDuration, @NonNull TimeUnit u
return sample(intervalDuration, unit, scheduler);
}
+ /**
+ * Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential
+ * time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
+ *
+ * This differs from {@link #throttleFirst(long, TimeUnit, Scheduler)} in that this ticks along at a scheduled interval whereas
+ * {@code throttleFirst} does not tick, it just tracks the passage of time.
+ *
+ *
+ *
+ * - Backpressure:
+ * - This operator does not support backpressure as it uses time to control data flow.
+ * - Scheduler:
+ * - You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param intervalDuration
+ * duration of windows within which the last item emitted by the current {@code Flowable} will be
+ * emitted
+ * @param unit
+ * the unit of time of {@code intervalDuration}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle timeout for each
+ * event
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Sample
+ * @see RxJava wiki: Backpressure
+ * @see #sample(long, TimeUnit, Scheduler)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ @Experimental
+ public final Flowable throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ return sample(intervalDuration, unit, scheduler, false, onDropped);
+ }
+
/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
@@ -17327,7 +17643,61 @@ public final Flowable throttleLatest(long timeout, @NonNull TimeUnit unit, @N
public final Flowable throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast));
+ return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, null));
+ }
+
+ /**
+ * Throttles items from the upstream {@code Flowable} by first emitting the next
+ * item from upstream, then periodically emitting the latest item (if any) when
+ * the specified timeout elapses between them, invoking the consumer for any dropped item.
+ *
+ *
+ *
+ * If no items were emitted from the upstream during this timeout phase, the next
+ * upstream item is emitted immediately and the timeout window starts from then.
+ *
+ * - Backpressure:
+ * - This operator does not support backpressure as it uses time to control data flow.
+ * If the downstream is not ready to receive items, a
+ * {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}
+ * will be signaled.
+ * - Scheduler:
+ * - You specify which {@link Scheduler} this operator will use.
+ * - Error handling:
+ * -
+ * If the upstream signals an {@code onError} or {@code onDropped} callback crashes,
+ * the error is delivered immediately to the downstream. If both happen, a {@link CompositeException}
+ * is created, containing both the upstream and the callback error.
+ * If the {@code onDropped} callback crashes during cancellation, the exception is forwarded
+ * to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
+ *
+ * @param timeout the time to wait after an item emission towards the downstream
+ * before trying to emit the latest item from upstream again
+ * @param unit the time unit
+ * @param scheduler the {@code Scheduler} where the timed wait and latest item
+ * emission will be performed
+ * @param emitLast If {@code true}, the very last item from the upstream will be emitted
+ * immediately when the upstream completes, regardless if there is
+ * a timeout window active or not. If {@code false}, the very last
+ * upstream item is ignored and the flow terminates.
+ * @param onDropped called when an item is replaced by a newer item that doesn't get delivered
+ * to the downstream, including the very last item if {@code emitLast} is {@code false}
+ * and the current undelivered item when the sequence gets canceled.
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit}, {@code scheduler} or {@code onDropped} is {@code null}
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @Experimental
+ public final Flowable throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, onDropped));
}
/**
@@ -17405,6 +17775,49 @@ public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit uni
return debounce(timeout, unit, scheduler);
}
+ /**
+ * Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
+ * current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
+ * {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler, Consumer)}).
+ *
+ * Note: If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
+ * will be emitted by the resulting {@code Flowable}.
+ *
+ *
+ *
+ * - Backpressure:
+ * - This operator does not support backpressure as it uses time to control data flow.
+ * - Scheduler:
+ * - You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param timeout
+ * the length of the window of time that must pass after the emission of an item from the current
+ * {@code Flowable} in which it emits no items in order for the item to be emitted by the
+ * resulting {@code Flowable}
+ * @param unit
+ * the unit of time for the specified {@code timeout}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
+ * item
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Debounce
+ * @see RxJava wiki: Backpressure
+ * @see #debounce(long, TimeUnit, Scheduler, Consumer)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ @Experimental
+ public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ return debounce(timeout, unit, scheduler, onDropped);
+ }
+
/**
* Returns a {@code Flowable} that emits records of the time interval between consecutive items emitted by the
* current {@code Flowable}.
@@ -19851,7 +20264,7 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No
*
*
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
- * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
+ * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
*
* Flowable.defer(() -> Flowable.fromCompletionStage(createCompletionStage()));
@@ -19986,7 +20399,7 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public final <@NonNull R, @NonNull A> Single collect(@NonNull Collector super T, A, R> collector) {
+ public final <@NonNull R, @Nullable A> Single collect(@NonNull Collector super T, A, R> collector) {
Objects.requireNonNull(collector, "collector is null");
return RxJavaPlugins.onAssembly(new FlowableCollectWithCollectorSingle<>(this, collector));
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index bbb3a10d4b..8ae4137c83 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -110,7 +110,7 @@
* @since 2.0
* @see io.reactivex.rxjava3.observers.DisposableMaybeObserver
*/
-public abstract class Maybe implements MaybeSource {
+public abstract class Maybe<@NonNull T> implements MaybeSource {
/**
* Runs multiple {@link MaybeSource}s provided by an {@link Iterable} sequence and
@@ -889,7 +889,7 @@ public abstract class Maybe implements MaybeSource {
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
- public static <@NonNull T> Maybe defer(@NonNull Supplier extends MaybeSource extends T>> supplier) {
+ public static <@NonNull T> Maybe defer(@NonNull Supplier extends @NonNull MaybeSource extends T>> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new MaybeDefer<>(supplier));
}
@@ -961,7 +961,7 @@ public abstract class Maybe implements MaybeSource {
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
- public static <@NonNull T> Maybe error(@NonNull Supplier extends Throwable> supplier) {
+ public static <@NonNull T> Maybe error(@NonNull Supplier extends @NonNull Throwable> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new MaybeErrorCallable<>(supplier));
}
@@ -1079,7 +1079,7 @@ public abstract class Maybe implements MaybeSource {
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
- public static <@NonNull T> Maybe fromCallable(@NonNull Callable extends T> callable) {
+ public static Maybe<@NonNull T> fromCallable(@NonNull Callable extends @Nullable T> callable) {
Objects.requireNonNull(callable, "callable is null");
return RxJavaPlugins.onAssembly(new MaybeFromCallable<>(callable));
}
@@ -1109,6 +1109,7 @@ public abstract class Maybe implements MaybeSource {
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code future} is {@code null}
* @see ReactiveX operators documentation: From
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -1147,6 +1148,7 @@ public abstract class Maybe implements MaybeSource {
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code future} or {@code unit} is {@code null}
* @see ReactiveX operators documentation: From
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -1285,7 +1287,7 @@ public abstract class Maybe implements MaybeSource {
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
- public static <@NonNull T> Maybe fromSupplier(@NonNull Supplier extends T> supplier) {
+ public static Maybe<@NonNull T> fromSupplier(@NonNull Supplier extends @Nullable T> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new MaybeFromSupplier<>(supplier));
}
@@ -6145,7 +6147,7 @@ public final TestObserver test(boolean dispose) {
*
*
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
- * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
+ * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
*
* Maybe.defer(() -> Maybe.fromCompletionStage(createCompletionStage()));
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java
index 2206319b18..fcf809cdf6 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java
@@ -1790,7 +1790,7 @@ public static int bufferSize() {
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
- public static <@NonNull T> Observable defer(@NonNull Supplier extends ObservableSource extends T>> supplier) {
+ public static <@NonNull T> Observable defer(@NonNull Supplier extends @NonNull ObservableSource extends T>> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableDefer<>(supplier));
}
@@ -1839,7 +1839,7 @@ public static int bufferSize() {
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
- public static <@NonNull T> Observable error(@NonNull Supplier extends Throwable> supplier) {
+ public static <@NonNull T> Observable error(@NonNull Supplier extends @NonNull Throwable> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableError<>(supplier));
}
@@ -2020,6 +2020,7 @@ public static int bufferSize() {
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code future} is {@code null}
* @see ReactiveX operators documentation: From
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -2062,6 +2063,7 @@ public static int bufferSize() {
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code future} or {@code unit} is {@code null}
* @see ReactiveX operators documentation: From
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -5520,7 +5522,6 @@ public final T blockingFirst(@NonNull T defaultItem) {
* @see #blockingForEach(Consumer, int)
*/
@SchedulerSupport(SchedulerSupport.NONE)
- @NonNull
public final void blockingForEach(@NonNull Consumer super T> onNext) {
blockingForEach(onNext, bufferSize());
}
@@ -5560,7 +5561,6 @@ public final void blockingForEach(@NonNull Consumer super T> onNext) {
* @see #subscribe(Consumer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
- @NonNull
public final void blockingForEach(@NonNull Consumer super T> onNext, int capacityHint) {
Objects.requireNonNull(onNext, "onNext is null");
Iterator it = blockingIterable(capacityHint).iterator();
@@ -6668,8 +6668,10 @@ public final Observable cacheWithInitialCapacity(int initialCapacity) {
}
/**
- * Returns an {@code Observable} that emits the items emitted by the current {@code Observable}, converted to the specified
- * type.
+ * Returns an {@code Observable} that emits the upstream items while
+ * they can be cast via {@link Class#cast(Object)} until the upstream terminates,
+ * or until the upstream signals an item which can't be cast,
+ * resulting in a {@link ClassCastException} to be signaled to the downstream.
*
*
*
@@ -6679,8 +6681,7 @@ public final Observable cacheWithInitialCapacity(int initialCapacity) {
*
* @param the output value type cast to
* @param clazz
- * the target class type that {@code cast} will cast the items emitted by the current {@code Observable}
- * into before emitting them from the resulting {@code Observable}
+ * the target class to use to try and cast the upstream items into
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code clazz} is {@code null}
* @see ReactiveX operators documentation: Map
@@ -7328,7 +7329,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or completes, emits their success value if available or terminates immediately if
* either the current {@code Observable} or the current inner {@code MaybeSource} fail.
*
- *
+ *
*
* - Scheduler:
* - {@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
@@ -7356,7 +7357,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or completes, emits their success value if available or terminates immediately if
* either the current {@code Observable} or the current inner {@code MaybeSource} fail.
*
- *
+ *
*
*