+
```java
Observable
+
```java
Observable
+
```java
Observable
+
```java
Observable
+
```java
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
@@ -86,14 +86,14 @@ Observable
+
```java
Observable
+
```java
Observable



+
#### see also:
* javadoc: `BlockingObservable`
diff --git a/docs/Conditional-and-Boolean-Operators.md b/docs/Conditional-and-Boolean-Operators.md
index f7ed64ce34..e6d1358e31 100644
--- a/docs/Conditional-and-Boolean-Operators.md
+++ b/docs/Conditional-and-Boolean-Operators.md
@@ -1,21 +1,169 @@
This section explains operators with which you conditionally emit or transform Observables, or can do boolean evaluations of them:
### Conditional Operators
-* [**`amb( )`**](http://reactivex.io/documentation/operators/amb.html) — given two or more source Observables, emits all of the items from the first of these Observables to emit an item
-* [**`defaultIfEmpty( )`**](http://reactivex.io/documentation/operators/defaultifempty.html) — emit items from the source Observable, or emit a default item if the source Observable completes after emitting no items
-* (`rxjava-computation-expressions`) [**`doWhile( )`**](http://reactivex.io/documentation/operators/repeat.html) — emit the source Observable's sequence, and then repeat the sequence as long as a condition remains true
-* (`rxjava-computation-expressions`) [**`ifThen( )`**](http://reactivex.io/documentation/operators/defer.html) — only emit the source Observable's sequence if a condition is true, otherwise emit an empty or default sequence
-* [**`skipUntil( )`**](http://reactivex.io/documentation/operators/skipuntil.html) — discard items emitted by a source Observable until a second Observable emits an item, then emit the remainder of the source Observable's items
-* [**`skipWhile( )`**](http://reactivex.io/documentation/operators/skipwhile.html) — discard items emitted by an Observable until a specified condition is false, then emit the remainder
-* (`rxjava-computation-expressions`) [**`switchCase( )`**](http://reactivex.io/documentation/operators/defer.html) — emit the sequence from a particular Observable based on the results of an evaluation
-* [**`takeUntil( )`**](http://reactivex.io/documentation/operators/takeuntil.html) — emits the items from the source Observable until a second Observable emits an item or issues a notification
-* [**`takeWhile( )` and `takeWhileWithIndex( )`**](http://reactivex.io/documentation/operators/takewhile.html) — emit items emitted by an Observable as long as a specified condition is true, then skip the remainder
-* (`rxjava-computation-expressions`) [**`whileDo( )`**](http://reactivex.io/documentation/operators/repeat.html) — if a condition is true, emit the source Observable's sequence and then repeat the sequence as long as the condition remains true
-
-> (`rxjava-computation-expressions`) — indicates that this operator is currently part of the optional `rxjava-computation-expressions` package under `rxjava-contrib` and is not included with the standard RxJava set of operators
+
+### Outline
+
+- [`amb`](#all)
+- [`defaultIfEmpty`](#defaultIfEmpty)
+- [`skipUntil`](#skipUntil)
+- [`skipWhile`](#skipWhile)
+- [`takeUntil`](#takeUntil)
+- [`takeWhile`](#takeUntil)
+
+## amb
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/amb.html](http://reactivex.io/documentation/operators/amb.html)
+
+given two or more source Observables, emits all of the items from the first of these Observables to emit an item
+
+```java
+ Observable source1 = Observable.range(1, 5);
+ Observable source2 = Observable.range(6, 5);
+ Observable.amb(new ArrayList(Arrays.asList(source1, source2)))
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s\n", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+## defaultIfEmpty
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/defaultifempty.html](http://reactivex.io/documentation/operators/defaultifempty.html)
+
+emit items from the source Observable, or emit a default item if the source Observable completes after emitting no items
+
+```java
+ Observable.empty().defaultIfEmpty(1).blockingSubscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+
+## skipUntil
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/skipuntil.html](http://reactivex.io/documentation/operators/skipuntil.html)
+
+discard items emitted by a source Observable until a second Observable emits an item, then emit the remainder of the source Observable's items
+
+```java
+Observable observable1 = Observable.range(1, 10).doOnNext(next -> Thread.sleep(1000));
+
+observable1.skipUntil(Observable.timer(3, TimeUnit.SECONDS))
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+## skipWhile
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/skipwhile.html](http://reactivex.io/documentation/operators/skipwhile.html)
+
+discard items emitted by an Observable until a specified condition is false, then emit the remainder
+
+```java
+Observable.range(1, 10).skipWhile(next -> next < 5)
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+
+## takeUntil
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/takeuntil.html](http://reactivex.io/documentation/operators/takeuntil.html)
+
+emits the items from the source Observable until a second Observable emits an item or issues a notification
+
+```java
+Observable.range(1, 10).takeUntil(value -> value >= 5)
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+
+## takeWhile
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/takewhile.html](http://reactivex.io/documentation/operators/takewhile.html)
+
+emit items emitted by an Observable as long as a specified condition is true, then skip the remainder
+
+```java
+ Observable.range(1, 10).takeWhile(value -> value <= 5)
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
### Boolean Operators
-* [**`all( )`**](http://reactivex.io/documentation/operators/all.html) — determine whether all items emitted by an Observable meet some criteria
-* [**`contains( )`**](http://reactivex.io/documentation/operators/contains.html) — determine whether an Observable emits a particular item or not
-* [**`exists( )` and `isEmpty( )`**](http://reactivex.io/documentation/operators/contains.html) — determine whether an Observable emits any items or not
-* [**`sequenceEqual( )`**](http://reactivex.io/documentation/operators/sequenceequal.html) — test the equality of the sequences emitted by two Observables
+
+### Outline
+
+- [`all`](#all)
+- [`contains`](#contains)
+- [`isEmpty`](#isEmpty)
+- [`sequenceEqual`](#sequenceEqual)
+
+## all
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/all.html](http://reactivex.io/documentation/operators/all.html)
+
+determine whether all items emitted by an Observable meet some criteria
+
+```java
+Flowable.range(0,10).doOnNext(next -> System.out.println(next)).all(integer -> integer<10).
+ blockingSubscribe(success->System.out.println("Success: "+success));
+```
+
+## contains
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/contains.html](http://reactivex.io/documentation/operators/contains.html)
+
+determine whether an Observable emits a particular item or not
+
+```java
+Flowable.range(1,10).doOnNext(next->System.out.println(next))
+ .contains(4).blockingSubscribe(contains->System.out.println("contains: "+contains));
+```
+
+## isEmpty
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/contains.html](http://reactivex.io/documentation/operators/contains.html)
+
+determine whether the source Publisher is empty
+
+```java
+Flowable.empty().isEmpty().subscribe(isEmpty -> System.out.printf("isEmpty: %s", isEmpty));
+```
+
+## sequenceEqual
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/sequenceequal.html](http://reactivex.io/documentation/operators/sequenceequal.html)
+
+test the equality of the sequences emitted by two Observables
+
+```java
+Flowable
+
The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output:
**Example #1:**
-```groovy
-def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);
+```java
+Observable firstMillion = Observable.range(1, 1000000).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);
-firstMillion.subscribe(
- { println("Subscriber #1:" + it); }, // onNext
- { println("Error: " + it.getMessage()); }, // onError
- { println("Sequence #1 complete"); } // onCompleted
-);
-
-firstMillion.subscribe(
- { println("Subscriber #2:" + it); }, // onNext
- { println("Error: " + it.getMessage()); }, // onError
- { println("Sequence #2 complete"); } // onCompleted
-);
+firstMillion.subscribe(next -> System.out.println("Subscriber #1: " + next), // onNext
+ throwable -> System.out.println("Error: " + throwable), // onError
+ () -> System.out.println("Sequence #1 complete") // onComplete
+ );
+firstMillion.subscribe(next -> System.out.println("Subscriber #2: " + next), // onNext
+ throwable -> System.out.println("Error: " + throwable), // onError
+ () -> System.out.println("Sequence #2 complete") // onComplete
+ );
```
```
Subscriber #1:211128
@@ -40,20 +37,18 @@ Subscriber #2:826996
Sequence #2 complete
```
**Example #2:**
-```groovy
-def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();
+```java
+ConnectableObservable firstMillion = Observable.range(1, 1000000).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();
-firstMillion.subscribe(
- { println("Subscriber #1:" + it); }, // onNext
- { println("Error: " + it.getMessage()); }, // onError
- { println("Sequence #1 complete"); } // onCompleted
-);
+firstMillion.subscribe(next -> System.out.println("Subscriber #1: " + next), // onNext
+ throwable -> System.out.println("Error: " + throwable), // onError
+ () -> System.out.println("Sequence #1 complete") // onComplete
+ );
-firstMillion.subscribe(
- { println("Subscriber #2:" + it); }, // onNext
- { println("Error: " + it.getMessage()); }, // onError
- { println("Sequence #2 complete"); } // onCompleted
-);
+firstMillion.subscribe(next -> System.out.println("Subscriber #2: " + next), // onNext
+ throwable -> System.out.println("Error: " + throwable), // onError
+ () -> System.out.println("Sequence #2 complete") // onComplete
+ );
firstMillion.connect();
```
diff --git a/docs/Filtering-Observables.md b/docs/Filtering-Observables.md
index 620800dc8c..512b69ba8a 100644
--- a/docs/Filtering-Observables.md
+++ b/docs/Filtering-Observables.md
@@ -259,7 +259,7 @@ firstOrError.subscribe(
**ReactiveX documentation:** [http://reactivex.io/documentation/operators/ignoreelements.html](http://reactivex.io/documentation/operators/ignoreelements.html)
-Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the the source.
+Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the source.
### ignoreElement example
diff --git a/docs/Getting-Started.md b/docs/Getting-Started.md
index fb9baa47dd..69b69a8ca6 100644
--- a/docs/Getting-Started.md
+++ b/docs/Getting-Started.md
@@ -1,20 +1,20 @@
## Getting Binaries
-You can find binaries and dependency information for Maven, Ivy, Gradle, SBT, and others at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Cg%3A"io.reactivex.rxjava2"%20AND%20"rxjava2").
+You can find binaries and dependency information for Maven, Ivy, Gradle, SBT, and others at [http://search.maven.org](https://search.maven.org/search?q=g:io.reactivex.rxjava3%20AND%20rxjava).
Example for Maven:
```xml
+
This next example, in Clojure, consumes three asynchronous Observables, including a dependency from one to another, and emits a single response item by combining the items emitted by each of the three Observables with the [`zip`](http://reactivex.io/documentation/operators/zip.html) operator and then transforming the result with [`map`](http://reactivex.io/documentation/operators/map.html):
@@ -333,7 +333,7 @@ The response looks like this:
And here is a marble diagram that illustrates how that code produces that response:
-
+
The following example, in Groovy, comes from [Ben Christensen’s QCon presentation on the evolution of the Netflix API](https://speakerdeck.com/benjchristensen/evolution-of-the-netflix-api-qcon-sf-2013). It combines two Observables with the [`merge`](http://reactivex.io/documentation/operators/merge.html) operator, then uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to construct a single item out of the resulting sequence, then transforms that item with [`map`](http://reactivex.io/documentation/operators/map.html) before emitting it:
@@ -350,7 +350,7 @@ public Observable getVideoSummary(APIVideo video) {
And here is a marble diagram that illustrates how that code uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to bring the results from multiple Observables together in one structure:
-
+
## Error Handling
diff --git a/docs/Phantom-Operators.md b/docs/Phantom-Operators.md
index 60da4a1a40..5193147a22 100644
--- a/docs/Phantom-Operators.md
+++ b/docs/Phantom-Operators.md
@@ -19,7 +19,7 @@ These operators have been proposed but are not part of the 1.0 release of RxJava
## chunkify( )
#### returns an iterable that periodically returns a list of items emitted by the source Observable since the last list
-
+
The `chunkify( )` operator represents a blocking observable as an Iterable, that, each time you iterate over it, returns a list of items emitted by the source Observable since the previous iteration. These lists may be empty if there have been no such items emitted.
@@ -27,7 +27,7 @@ The `chunkify( )` operator represents a blocking observable as an Iterable, th
## fromFuture( )
#### convert a Future into an Observable, but do not attempt to get the Future's value until a Subscriber subscribes
-
+
The `fromFuture( )` method also converts a Future into an Observable, but it obtains this Future indirectly, by means of a function you provide. It creates the Observable immediately, but waits to call the function and to obtain the Future until a Subscriber subscribes to it.
@@ -35,7 +35,7 @@ The `fromFuture( )` method also converts a Future into an Observable, but it o
## forEachFuture( )
#### create a futureTask that will invoke a specified function on each item emitted by an Observable
-
+
The `forEachFuture( )` returns a `FutureTask` for each item emitted by the source Observable (or each item and each notification) that, when executed, will apply a function you specify to each such item (or item and notification).
@@ -43,7 +43,7 @@ The `forEachFuture( )` returns a `FutureTask` for each item emitted by the sou
## forIterable( )
#### apply a function to the elements of an Iterable to create Observables which are then concatenated
-
+
`forIterable( )` is similar to `from(Iterable )` but instead of the resulting Observable emitting the elements of the Iterable as its own emitted items, it applies a specified function to each of these elements to generate one Observable per element, and then concatenates the emissions of these Observables to be its own sequence of emitted items.
@@ -58,7 +58,7 @@ If the a subscriber to the Observable that results when a Future is converted to
## generate( ) and generateAbsoluteTime( )
#### create an Observable that emits a sequence of items as generated by a function of your choosing
-
+
The basic form of `generate( )` takes four parameters. These are `initialState` and three functions: `iterate( )`, `condition( )`, and `resultSelector( )`. `generate( )` uses these four parameters to generate an Observable sequence, which is its return value. It does so in the following way.
@@ -66,7 +66,7 @@ The basic form of `generate( )` takes four parameters. These are `initialState
There are also versions of `generate( )` that allow you to do the work of generating the sequence on a particular `Scheduler` and that allow you to set the time interval between emissions by applying a function to the current state. The `generateAbsoluteTime( )` allows you to control the time at which an item is emitted by applying a function to the state to get an absolute system clock time (rather than an interval from the previous emission).
-
+
#### see also:
* Introduction to Rx: Generate
@@ -79,7 +79,7 @@ There are also versions of `generate( )` that allow you to do the work of gene
This version of `groupBy` adds another parameter: an Observable that emits duration markers. When a duration marker is emitted by this Observable, any grouped Observables that have been opened are closed, and `groupByUntil( )` will create new grouped Observables for any subsequent emissions by the source Observable.
-
+
Another variety of `groupByUntil( )` limits the number of groups that can be active at any particular time. If an item is emitted by the source Observable that would cause the number of groups to exceed this maximum, before the new group is emitted, one of the existing groups is closed (that is, the Observable it represents terminates by calling its Subscribers' `onCompleted` methods and then expires).
@@ -101,7 +101,7 @@ To represent an Observable as a Connectable Observable, use the `multicast( )`
## onErrorFlatMap( )
#### instructs an Observable to emit a sequence of items whenever it encounters an error
-
+
The `onErrorFlatMap( )` method is similar to `onErrorResumeNext( )` except that it does not assume the source Observable will correctly terminate when it issues an error. Because of this, after emitting its backup sequence of items, `onErrorFlatMap( )` relinquishes control of the emitted sequence back to the source Observable. If that Observable again issues an error, `onErrorFlatMap( )` will again emit its backup sequence.
@@ -111,13 +111,13 @@ Because `onErrorFlatMap( )` is designed to work with pathological source Obser
Note that you should apply `onErrorFlatMap( )` directly to the pathological source Observable, and not to that Observable after it has been modified by additional operators, as such operators may effectively renormalize the source Observable by unsubscribing from it immediately after it issues an error. Below, for example, is an illustration showing how `onErrorFlatMap( )` will respond to two error-generating Observables that have been merged by the `merge( )` operator. Note that it will *not* react to both errors generated by both Observables, but only to the single error passed along by `merge( )`:
-
+
***
## parallel( )
#### split the work done on the emissions from an Observable into multiple Observables each operating on its own parallel thread
-
+
The `parallel( )` method splits an Observable into as many Observables as there are available processors, and does work in parallel on each of these Observables. `parallel( )` then merges the results of these parallel computations back into a single, well-behaved Observable sequence.
@@ -127,7 +127,7 @@ streamOfItems.flatMap(item -> {
itemToObservable(item).subscribeOn(Schedulers.io());
});
```
-Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asychronous calls.
+Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asynchronous calls.
#### see also:
* RxJava Threading Examples by Graham Lea
@@ -136,7 +136,7 @@ Kick off your work for each item inside [`flatMap`](Transforming-Observables#fla
## parallelMerge( )
#### combine multiple Observables into a smaller number of Observables, to facilitate parallelism
-
+
Use the `parallelMerge( )` method to take an Observable that emits a large number of Observables and to reduce it to an Observable that emits a particular, smaller number of Observables that emit the same set of items as the original larger set of Observables: for instance a number of Observables that matches the number of parallel processes that you want to use when processing the emissions from the complete set of Observables.
@@ -144,7 +144,7 @@ Use the `parallelMerge( )` method to take an Observable that emits a large num
## pivot( )
#### combine multiple sets of grouped observables so that they are arranged primarily by group rather than by set
-
+
If you combine multiple sets of grouped observables, such as those created by [`groupBy( )` and `groupByUntil( )`](Transforming-Observables#wiki-groupby-and-groupbyuntil), then even if those grouped observables have been grouped by a similar differentiation function, the resulting grouping will be primarily based on which set the observable came from, not on which group the observable belonged to.
@@ -152,13 +152,13 @@ An example may make this clearer. Imagine you use `groupBy( )` to group the em
The result will be a grouped observable that emits two groups: the grouped observable resulting from transforming Observable1, and the grouped observable resulting from transforming Observable2. Each of those grouped observables emit observables that in turn emit the odds and evens from the source observables. You can use `pivot( )` to change this around: by applying `pivot( )` to this grouped observable it will transform into one that emits two different groups: the odds group and the evens group, with each of these groups emitting a separate observable corresponding to which source observable its set of integers came from. Here is an illustration:
-
+
***
## publishLast( )
#### represent an Observable as a Connectable Observable that emits only the last item emitted by the source Observable
-
+
#### see also:
* RxJS: `publishLast`
diff --git a/docs/What's-different-in-2.0.md b/docs/What's-different-in-2.0.md
index fac50df56d..edc681fa7f 100644
--- a/docs/What's-different-in-2.0.md
+++ b/docs/What's-different-in-2.0.md
@@ -450,12 +450,12 @@ Before 2.0.7, the operator `strict()` had to be applied in order to achieve the
As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a custom `io.reactivex.FlowableSubscriber` interface (extends `org.reactivestreams.Subscriber`) but adds no new methods to it. The new interface is **constrained to RxJava 2** and represents a consumer to `Flowable` that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9:
- - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
+ - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.
-From a user's perspective, if one was using the the `subscribe` methods other than `Flowable.subscribe(Subscriber super T>)`, there is no need to do anything regarding this change and there is no extra penalty for it.
+From a user's perspective, if one was using the `subscribe` methods other than `Flowable.subscribe(Subscriber super T>)`, there is no need to do anything regarding this change and there is no extra penalty for it.
If one was using `Flowable.subscribe(Subscriber super T>)` with the built-in RxJava `Subscriber` implementations such as `DisposableSubscriber`, `TestSubscriber` and `ResourceSubscriber`, there is a small runtime overhead (one `instanceof` check) associated when the code is not recompiled against 2.0.7.
diff --git a/docs/Writing-operators-for-2.0.md b/docs/Writing-operators-for-2.0.md
index e8486564b1..1a51664880 100644
--- a/docs/Writing-operators-for-2.0.md
+++ b/docs/Writing-operators-for-2.0.md
@@ -565,7 +565,7 @@ Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Sub
The rule relaxations are as follows:
-- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
+- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.
diff --git a/docs/_Sidebar.md b/docs/_Sidebar.md
index 32e201fc46..22961acec3 100644
--- a/docs/_Sidebar.md
+++ b/docs/_Sidebar.md
@@ -27,6 +27,6 @@
* [Writing operators](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0)
* [Backpressure](https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0))
* [another explanation](https://github.com/ReactiveX/RxJava/wiki/Backpressure)
-* [JavaDoc](http://reactivex.io/RxJava/2.x/javadoc)
+* JavaDoc: [1.x](http://reactivex.io/RxJava/1.x/javadoc), [2.x](http://reactivex.io/RxJava/2.x/javadoc), [3.x](http://reactivex.io/RxJava/3.x/javadoc)
* [Coming from RxJava 1](https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0)
* [Additional Reading](https://github.com/ReactiveX/RxJava/wiki/Additional-Reading)
diff --git a/gradle.properties b/gradle.properties
index 820b2a5bc1..e685b8103a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,2 +1,24 @@
-release.scope=patch
-release.version=3.0.0-SNAPSHOT
+group=io.reactivex.rxjava3
+version=3.0.0-SNAPSHOT
+description=RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
+
+POM_ARTIFACT_ID=rxjava
+POM_NAME=RxJava
+POM_PACKAGING=jar
+
+POM_DESCRIPTION=Reactive Extensions for Java
+POM_INCEPTION_YEAR=2013
+
+POM_URL=https://github.com/ReactiveX/RxJava
+POM_SCM_URL=https://github.com/ReactiveX/RxJava
+POM_SCM_CONNECTION=scm:git:git://github.com/ReactiveX/RxJava.git
+POM_SCM_DEV_CONNECTION=scm:git:ssh://git@github.com/ReactiveX/RxJava.git
+
+POM_LICENCE_NAME=The Apache Software License, Version 2.0
+POM_LICENCE_URL=https://www.apache.org/licenses/LICENSE-2.0.txt
+POM_LICENCE_DIST=repo
+
+POM_DEVELOPER_ID=akarnokd
+POM_DEVELOPER_NAME=David Karnok
+POM_DEVELOPER_URL=https://github.com/akarnokd/
+POM_DEVELOPER_EMAIL=akarnokd@gmail.com
diff --git a/gradle/buildViaTravis.sh b/gradle/buildViaTravis.sh
deleted file mode 100755
index ea385c3e92..0000000000
--- a/gradle/buildViaTravis.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-# This script will build the project.
-
-buildTag="$TRAVIS_TAG"
-
-if [ "$buildTag" != "" ] && [ "${buildTag:0:3}" != "v3." ]; then
- echo -e "Wrong tag on the 3.x brach: $buildTag : build stopped"
- exit 1
-fi
-
-export GRADLE_OPTS=-Xmx1024m
-
-if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then
- echo -e "Build Pull Request #$TRAVIS_PULL_REQUEST => Branch [$TRAVIS_BRANCH]"
- ./gradlew -PreleaseMode=pr build --stacktrace
-elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ]; then
- if [ "$TRAVIS_BRANCH" != "3.x" ]; then
- echo -e 'Build secondary Branch (no snapshot) => Branch ['$TRAVIS_BRANCH']'
- ./gradlew -PreleaseMode=pr build --stacktrace
- else
- echo -e 'Build Branch with Snapshot => Branch ['$TRAVIS_BRANCH']'
- ./gradlew -PreleaseMode=branch -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build --stacktrace
- fi
-elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ]; then
- echo -e 'Build Branch for Release => Branch ['$TRAVIS_BRANCH'] Tag ['$TRAVIS_TAG']'
- ./gradlew -PreleaseMode=full -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build --stacktrace
-else
- echo -e 'WARN: Should not be here => Branch ['$TRAVIS_BRANCH'] Tag ['$TRAVIS_TAG'] Pull Request ['$TRAVIS_PULL_REQUEST']'
-fi
diff --git a/gradle/javadoc_cleanup.gradle b/gradle/javadoc_cleanup.gradle
index 12216464a7..63b4f7f045 100644
--- a/gradle/javadoc_cleanup.gradle
+++ b/gradle/javadoc_cleanup.gradle
@@ -1,36 +1,74 @@
// remove the excessive whitespaces between method arguments in the javadocs
task javadocCleanup(dependsOn: "javadoc") doLast {
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Flowable.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Observable.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Single.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Maybe.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Completable.html'));
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Flowable.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Observable.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Single.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Maybe.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Completable.html'))
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/flowables/ConnectableFlowable.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observables/ConnectableObservable.html'));
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/flowables/ConnectableFlowable.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observables/ConnectableObservable.html'))
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/ReplaySubject.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/ReplayProcessor.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/plugins/RxJavaPlugins.html'));
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/ReplaySubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/ReplayProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/PublishSubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/PublishProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/AsyncSubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/AsyncProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/BehaviorSubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/BehaviorProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/MulticastProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/UnicastSubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/UnicastProcessor.html'))
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/parallel/ParallelFlowable.html'));
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/plugins/RxJavaPlugins.html'))
+
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/parallel/ParallelFlowable.html'))
+
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/disposables/Disposable.html'))
+
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observers/TestObserver.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observers/BaseTestConsumer.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subscribers/TestSubscriber.html'))
}
def fixJavadocFile(file) {
- println("Cleaning up: " + file);
+ logger.lifecycle("Cleaning up: " + file)
String fileContents = file.getText('UTF-8')
// lots of spaces after the previous method argument
- fileContents = fileContents.replaceAll(",\\s{4,}", ",\n ");
+ fileContents = fileContents.replaceAll(",\\s{4,}", ",\n ")
// lots of spaces after the @NonNull annotations
- fileContents = fileContents.replaceAll("@NonNull* Like {@code Observable}, a running {@code Completable} can be stopped through the {@link Disposable} instance - * provided to consumers through {@link SingleObserver#onSubscribe}. + * provided to consumers through {@link CompletableObserver#onSubscribe}. *
* Like an {@code Observable}, a {@code Completable} is lazy, can be either "hot" or "cold", synchronous or * asynchronous. {@code Completable} instances returned by the methods of this class are cold @@ -478,7 +479,7 @@ public static Completable unsafeCreate(@NonNull CompletableSource onSubscribe) { @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - public static Completable defer(@NonNull Supplier extends CompletableSource> supplier) { + public static Completable defer(@NonNull Supplier extends @NonNull CompletableSource> supplier) { Objects.requireNonNull(supplier, "supplier is null"); return RxJavaPlugins.onAssembly(new CompletableDefer(supplier)); } @@ -502,7 +503,7 @@ public static Completable defer(@NonNull Supplier extends CompletableSource> s @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - public static Completable error(@NonNull Supplier extends Throwable> supplier) { + public static Completable error(@NonNull Supplier extends @NonNull Throwable> supplier) { Objects.requireNonNull(supplier, "supplier is null"); return RxJavaPlugins.onAssembly(new CompletableErrorSupplier(supplier)); } @@ -528,8 +529,8 @@ public static Completable error(@NonNull Throwable throwable) { } /** - * Returns a {@code Completable} instance that runs the given {@link Action} for each subscriber and - * emits either an unchecked exception or simply completes. + * Returns a {@code Completable} instance that runs the given {@link Action} for each {@link CompletableObserver} and + * emits either an exception or simply completes. *
*
*
*
+ *
+ * If the code to be wrapped needs to throw a checked or more broader {@link Throwable} exception, that + * exception has to be converted to an unchecked exception by the wrapped code itself. Alternatively, + * use the {@link #fromAction(Action)} method which allows the wrapped code to throw any {@code Throwable} + * exception and will signal it to observers as-is. *
- *
+ *
*
+ * The {@code CompletableObserver} will be removed after the callback for the terminal event has been invoked. + *
* * Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}. - * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)} + * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)} * around {@code fromCompletionStage}: *
* Maybe.defer(() -> Completable.fromCompletionStage(createCompletionStage()));
@@ -3420,7 +3466,7 @@ public static Completable fromCompletionStage(@NonNull CompletionStage> stage)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public final CompletionStage toCompletionStage(@Nullable T defaultItem) {
+ public final <@Nullable T> CompletionStage toCompletionStage(T defaultItem) {
return subscribeWith(new CompletionStageConsumer<>(true, defaultItem));
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableConverter.java b/src/main/java/io/reactivex/rxjava3/core/CompletableConverter.java
index 83e8f693eb..a213c6f5ab 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableConverter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableConverter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java b/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
index 5ebb4e1877..2c5a9811c4 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java b/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
index 7c01ebfee1..9339307047 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java b/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
index 70d79e62b6..e73fae2d5c 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -10,13 +10,14 @@
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
+
package io.reactivex.rxjava3.core;
import io.reactivex.rxjava3.annotations.NonNull;
/**
* A functional interface that has a {@code subscribe()} method that receives
- * an instance of a {@link CompletableEmitter} instance that allows pushing
+ * a {@link CompletableEmitter} instance that allows pushing
* an event in a cancellation-safe manner.
*/
@FunctionalInterface
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java b/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
index e9b1df83cb..f06a94f36a 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java b/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
index 58edf9471c..90d3853b8a 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -10,6 +10,7 @@
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
+
package io.reactivex.rxjava3.core;
import io.reactivex.rxjava3.annotations.NonNull;
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java b/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
index 98a9e2aa19..2887c4717c 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/Emitter.java b/src/main/java/io/reactivex/rxjava3/core/Emitter.java
index e9ea2a1827..83410f056e 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Emitter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Emitter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -10,6 +10,7 @@
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
+
package io.reactivex.rxjava3.core;
import io.reactivex.rxjava3.annotations.NonNull;
diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index 33c89e1343..39f3c63b43 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -10,6 +10,7 @@
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
+
package io.reactivex.rxjava3.core;
import java.util.*;
@@ -19,12 +20,11 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
-import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.flowables.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
-import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.MaybeToFlowable;
@@ -34,6 +34,7 @@
import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.rxjava3.internal.subscribers.*;
import io.reactivex.rxjava3.internal.util.*;
+import io.reactivex.rxjava3.operators.ScalarSupplier;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.*;
@@ -53,7 +54,7 @@
*
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
*
- *
+ *
*
* The {@code Flowable} follows the protocol
*
@@ -163,13 +164,21 @@ public abstract class Flowable<@NonNull T> implements Publisher {
* Mirrors the one {@link Publisher} in an {@link Iterable} of several {@code Publisher}s that first either emits an item or sends
* a termination notification.
*
- *
+ *
+ *
+ * When one of the {@code Publisher}s signal an item or terminates first, all subscriptions to the other
+ * {@code Publisher}s are canceled.
*
* - Backpressure:
* - The operator itself doesn't interfere with backpressure which is determined by the winning
* {@code Publisher}'s backpressure behavior.
* - Scheduler:
* - {@code amb} does not operate by default on a particular {@link Scheduler}.
+ * - Error handling:
+ * -
+ * If any of the losing {@code Publisher}s signals an error, the error is routed to the global
+ * error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
*
*
* @param the common element type
@@ -184,7 +193,7 @@ public abstract class Flowable<@NonNull T> implements Publisher {
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
- public static Flowable amb(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
+ public static <@NonNull T> Flowable amb(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableAmb<>(null, sources));
}
@@ -193,13 +202,21 @@ public static Flowable amb(@NonNull Iterable<@NonNull ? extends Publisher
* Mirrors the one {@link Publisher} in an array of several {@code Publisher}s that first either emits an item or sends
* a termination notification.
*
- *
+ *
+ *
+ * When one of the {@code Publisher}s signal an item or terminates first, all subscriptions to the other
+ * {@code Publisher}s are canceled.
*
* - Backpressure:
* - The operator itself doesn't interfere with backpressure which is determined by the winning
* {@code Publisher}'s backpressure behavior.
* - Scheduler:
* - {@code ambArray} does not operate by default on a particular {@link Scheduler}.
+ * - Error handling:
+ * -
+ * If any of the losing {@code Publisher}s signals an error, the error is routed to the global
+ * error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
*
*
* @param the common element type
@@ -215,7 +232,7 @@ public static Flowable amb(@NonNull Iterable<@NonNull ? extends Publisher
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
- public static Flowable ambArray(@NonNull Publisher<@NonNull ? extends T>... sources) {
+ public static <@NonNull T> Flowable ambArray(@NonNull Publisher extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
int len = sources.length;
if (len == 0) {
@@ -279,7 +296,7 @@ public static int bufferSize() {
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatestArray(@NonNull Publisher<@NonNull ? extends T>[] sources, @NonNull Function super Object[], ? extends R> combiner) {
+ public static <@NonNull T, @NonNull R> Flowable combineLatestArray(@NonNull Publisher extends T>[] sources, @NonNull Function super Object[], ? extends R> combiner) {
return combineLatestArray(sources, combiner, bufferSize());
}
@@ -327,7 +344,7 @@ public static int bufferSize() {
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatestArray(@NonNull Publisher<@NonNull ? extends T>[] sources, @NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
+ public static <@NonNull T, @NonNull R> Flowable combineLatestArray(@NonNull Publisher extends T>[] sources, @NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
@@ -378,7 +395,7 @@ public static Flowable combineLatestArray(@NonNull Publisher<@NonNull
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatest(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatest(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources,
@NonNull Function super Object[], ? extends R> combiner) {
return combineLatest(sources, combiner, bufferSize());
}
@@ -427,7 +444,7 @@ public static Flowable combineLatestArray(@NonNull Publisher<@NonNull
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatest(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatest(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources,
@NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
Objects.requireNonNull(sources, "sources is null");
Objects.requireNonNull(combiner, "combiner is null");
@@ -476,7 +493,7 @@ public static Flowable combineLatest(@NonNull Iterable<@NonNull ? exte
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatestArrayDelayError(@NonNull Publisher<@NonNull ? extends T>[] sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatestArrayDelayError(@NonNull Publisher extends T>[] sources,
@NonNull Function super Object[], ? extends R> combiner) {
return combineLatestArrayDelayError(sources, combiner, bufferSize());
}
@@ -526,7 +543,7 @@ public static Flowable combineLatest(@NonNull Iterable<@NonNull ? exte
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatestArrayDelayError(@NonNull Publisher<@NonNull ? extends T>[] sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatestArrayDelayError(@NonNull Publisher extends T>[] sources,
@NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
Objects.requireNonNull(sources, "sources is null");
Objects.requireNonNull(combiner, "combiner is null");
@@ -579,7 +596,7 @@ public static Flowable combineLatest(@NonNull Iterable<@NonNull ? exte
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatestDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatestDelayError(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources,
@NonNull Function super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
}
@@ -629,7 +646,7 @@ public static Flowable combineLatest(@NonNull Iterable<@NonNull ? exte
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatestDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatestDelayError(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources,
@NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
Objects.requireNonNull(sources, "sources is null");
Objects.requireNonNull(combiner, "combiner is null");
@@ -646,7 +663,7 @@ public static Flowable combineLatestDelayError(@NonNull Iterable<@NonN
* resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
*
- *
+ *
*
* - Backpressure:
* - The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -674,8 +691,8 @@ public static
Flowable combineLatestDelayError(@NonNull Iterable<@NonN
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public static Flowable combineLatest(
- @NonNull Publisher<@NonNull ? extends T1> source1, @NonNull Publisher<@NonNull ? extends T2> source2,
+ public static <@NonNull T1, @NonNull T2, @NonNull R> Flowable combineLatest(
+ @NonNull Publisher extends T1> source1, @NonNull Publisher extends T2> source2,
@NonNull BiFunction super T1, ? super T2, ? extends R> combiner) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
@@ -692,7 +709,7 @@ public static Flowable combineLatest(
* resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
*
- *
+ *
*
* - Backpressure:
* - The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -723,9 +740,9 @@ public static
Flowable combineLatest(
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
- public static Flowable combineLatest(
- @NonNull Publisher<@NonNull ? extends T1> source1, @NonNull Publisher<@NonNull ? extends T2> source2,
- @NonNull Publisher<@NonNull ? extends T3> source3,
+ public static <@NonNull T1, @NonNull T2, @NonNull T3, @NonNull R> Flowable combineLatest(
+ @NonNull Publisher extends T1> source1, @NonNull Publisher extends T2> source2,
+ @NonNull Publisher extends T3> source3,
@NonNull Function3 super T1, ? super T2, ? super T3, ? extends R> combiner) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
@@ -743,7 +760,7 @@ public static Flowable combineLatest(
* resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
*
- *
+ *
*
* - Backpressure:
* - The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -778,9 +795,9 @@ public static
Flowable combineLatest(
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
- public static Flowable combineLatest(
- @NonNull Publisher<@NonNull ? extends T1> source1, @NonNull Publisher<@NonNull ? extends T2> source2,
- @NonNull Publisher<@NonNull ? extends T3> source3, @NonNull Publisher<@NonNull ? extends T4> source4,
+ public static <@NonNull T1, @NonNull T2, @NonNull T3, @NonNull T4, @NonNull R> Flowable combineLatest(
+ @NonNull Publisher extends T1> source1, @NonNull Publisher extends T2> source2,
+ @NonNull Publisher extends T3> source3, @NonNull Publisher extends T4> source4,
@NonNull Function4 super T1, ? super T2, ? super T3, ? super T4, ? extends R> combiner) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
@@ -799,7 +816,7 @@ public static Flowable combineLatest(
* resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
*
- *
+ *
*
* - Backpressure:
* - The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -837,10 +854,10 @@ public static
Flowable