+
```java
Observable
+
```java
Observable
+
```java
Observable
+
```java
Observable
+
```java
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
@@ -86,14 +86,14 @@ Observable
+
```java
Observable
+
```java
Observable



+
#### see also:
* javadoc: `BlockingObservable`
diff --git a/docs/Conditional-and-Boolean-Operators.md b/docs/Conditional-and-Boolean-Operators.md
index f7ed64ce34..e6d1358e31 100644
--- a/docs/Conditional-and-Boolean-Operators.md
+++ b/docs/Conditional-and-Boolean-Operators.md
@@ -1,21 +1,169 @@
This section explains operators with which you conditionally emit or transform Observables, or can do boolean evaluations of them:
### Conditional Operators
-* [**`amb( )`**](http://reactivex.io/documentation/operators/amb.html) — given two or more source Observables, emits all of the items from the first of these Observables to emit an item
-* [**`defaultIfEmpty( )`**](http://reactivex.io/documentation/operators/defaultifempty.html) — emit items from the source Observable, or emit a default item if the source Observable completes after emitting no items
-* (`rxjava-computation-expressions`) [**`doWhile( )`**](http://reactivex.io/documentation/operators/repeat.html) — emit the source Observable's sequence, and then repeat the sequence as long as a condition remains true
-* (`rxjava-computation-expressions`) [**`ifThen( )`**](http://reactivex.io/documentation/operators/defer.html) — only emit the source Observable's sequence if a condition is true, otherwise emit an empty or default sequence
-* [**`skipUntil( )`**](http://reactivex.io/documentation/operators/skipuntil.html) — discard items emitted by a source Observable until a second Observable emits an item, then emit the remainder of the source Observable's items
-* [**`skipWhile( )`**](http://reactivex.io/documentation/operators/skipwhile.html) — discard items emitted by an Observable until a specified condition is false, then emit the remainder
-* (`rxjava-computation-expressions`) [**`switchCase( )`**](http://reactivex.io/documentation/operators/defer.html) — emit the sequence from a particular Observable based on the results of an evaluation
-* [**`takeUntil( )`**](http://reactivex.io/documentation/operators/takeuntil.html) — emits the items from the source Observable until a second Observable emits an item or issues a notification
-* [**`takeWhile( )` and `takeWhileWithIndex( )`**](http://reactivex.io/documentation/operators/takewhile.html) — emit items emitted by an Observable as long as a specified condition is true, then skip the remainder
-* (`rxjava-computation-expressions`) [**`whileDo( )`**](http://reactivex.io/documentation/operators/repeat.html) — if a condition is true, emit the source Observable's sequence and then repeat the sequence as long as the condition remains true
-
-> (`rxjava-computation-expressions`) — indicates that this operator is currently part of the optional `rxjava-computation-expressions` package under `rxjava-contrib` and is not included with the standard RxJava set of operators
+
+### Outline
+
+- [`amb`](#all)
+- [`defaultIfEmpty`](#defaultIfEmpty)
+- [`skipUntil`](#skipUntil)
+- [`skipWhile`](#skipWhile)
+- [`takeUntil`](#takeUntil)
+- [`takeWhile`](#takeUntil)
+
+## amb
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/amb.html](http://reactivex.io/documentation/operators/amb.html)
+
+given two or more source Observables, emits all of the items from the first of these Observables to emit an item
+
+```java
+ Observable source1 = Observable.range(1, 5);
+ Observable source2 = Observable.range(6, 5);
+ Observable.amb(new ArrayList(Arrays.asList(source1, source2)))
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s\n", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+## defaultIfEmpty
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/defaultifempty.html](http://reactivex.io/documentation/operators/defaultifempty.html)
+
+emit items from the source Observable, or emit a default item if the source Observable completes after emitting no items
+
+```java
+ Observable.empty().defaultIfEmpty(1).blockingSubscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+
+## skipUntil
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/skipuntil.html](http://reactivex.io/documentation/operators/skipuntil.html)
+
+discard items emitted by a source Observable until a second Observable emits an item, then emit the remainder of the source Observable's items
+
+```java
+Observable observable1 = Observable.range(1, 10).doOnNext(next -> Thread.sleep(1000));
+
+observable1.skipUntil(Observable.timer(3, TimeUnit.SECONDS))
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+## skipWhile
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/skipwhile.html](http://reactivex.io/documentation/operators/skipwhile.html)
+
+discard items emitted by an Observable until a specified condition is false, then emit the remainder
+
+```java
+Observable.range(1, 10).skipWhile(next -> next < 5)
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+
+## takeUntil
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/takeuntil.html](http://reactivex.io/documentation/operators/takeuntil.html)
+
+emits the items from the source Observable until a second Observable emits an item or issues a notification
+
+```java
+Observable.range(1, 10).takeUntil(value -> value >= 5)
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
+
+## takeWhile
+
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/takewhile.html](http://reactivex.io/documentation/operators/takewhile.html)
+
+emit items emitted by an Observable as long as a specified condition is true, then skip the remainder
+
+```java
+ Observable.range(1, 10).takeWhile(value -> value <= 5)
+ .subscribe(next -> System.out.printf("next: %s\n", next), // onNext
+ throwable -> System.out.printf("error: %s", throwable), //onError
+ () -> System.out.println("Completed") //onComplete
+ );
+```
### Boolean Operators
-* [**`all( )`**](http://reactivex.io/documentation/operators/all.html) — determine whether all items emitted by an Observable meet some criteria
-* [**`contains( )`**](http://reactivex.io/documentation/operators/contains.html) — determine whether an Observable emits a particular item or not
-* [**`exists( )` and `isEmpty( )`**](http://reactivex.io/documentation/operators/contains.html) — determine whether an Observable emits any items or not
-* [**`sequenceEqual( )`**](http://reactivex.io/documentation/operators/sequenceequal.html) — test the equality of the sequences emitted by two Observables
+
+### Outline
+
+- [`all`](#all)
+- [`contains`](#contains)
+- [`isEmpty`](#isEmpty)
+- [`sequenceEqual`](#sequenceEqual)
+
+## all
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/all.html](http://reactivex.io/documentation/operators/all.html)
+
+determine whether all items emitted by an Observable meet some criteria
+
+```java
+Flowable.range(0,10).doOnNext(next -> System.out.println(next)).all(integer -> integer<10).
+ blockingSubscribe(success->System.out.println("Success: "+success));
+```
+
+## contains
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/contains.html](http://reactivex.io/documentation/operators/contains.html)
+
+determine whether an Observable emits a particular item or not
+
+```java
+Flowable.range(1,10).doOnNext(next->System.out.println(next))
+ .contains(4).blockingSubscribe(contains->System.out.println("contains: "+contains));
+```
+
+## isEmpty
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/contains.html](http://reactivex.io/documentation/operators/contains.html)
+
+determine whether the source Publisher is empty
+
+```java
+Flowable.empty().isEmpty().subscribe(isEmpty -> System.out.printf("isEmpty: %s", isEmpty));
+```
+
+## sequenceEqual
+**Available in:**  `Flowable`,  `Observable`,  `Maybe`,  `Single`,  `Completable`
+
+**ReactiveX documentation:** [http://reactivex.io/documentation/operators/sequenceequal.html](http://reactivex.io/documentation/operators/sequenceequal.html)
+
+test the equality of the sequences emitted by two Observables
+
+```java
+Flowable
+
The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output:
**Example #1:**
-```groovy
-def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);
+```java
+Observable firstMillion = Observable.range(1, 1000000).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);
-firstMillion.subscribe(
- { println("Subscriber #1:" + it); }, // onNext
- { println("Error: " + it.getMessage()); }, // onError
- { println("Sequence #1 complete"); } // onCompleted
-);
-
-firstMillion.subscribe(
- { println("Subscriber #2:" + it); }, // onNext
- { println("Error: " + it.getMessage()); }, // onError
- { println("Sequence #2 complete"); } // onCompleted
-);
+firstMillion.subscribe(next -> System.out.println("Subscriber #1: " + next), // onNext
+ throwable -> System.out.println("Error: " + throwable), // onError
+ () -> System.out.println("Sequence #1 complete") // onComplete
+ );
+firstMillion.subscribe(next -> System.out.println("Subscriber #2: " + next), // onNext
+ throwable -> System.out.println("Error: " + throwable), // onError
+ () -> System.out.println("Sequence #2 complete") // onComplete
+ );
```
```
Subscriber #1:211128
@@ -40,20 +37,18 @@ Subscriber #2:826996
Sequence #2 complete
```
**Example #2:**
-```groovy
-def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();
+```java
+ConnectableObservable firstMillion = Observable.range(1, 1000000).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();
-firstMillion.subscribe(
- { println("Subscriber #1:" + it); }, // onNext
- { println("Error: " + it.getMessage()); }, // onError
- { println("Sequence #1 complete"); } // onCompleted
-);
+firstMillion.subscribe(next -> System.out.println("Subscriber #1: " + next), // onNext
+ throwable -> System.out.println("Error: " + throwable), // onError
+ () -> System.out.println("Sequence #1 complete") // onComplete
+ );
-firstMillion.subscribe(
- { println("Subscriber #2:" + it); }, // onNext
- { println("Error: " + it.getMessage()); }, // onError
- { println("Sequence #2 complete"); } // onCompleted
-);
+firstMillion.subscribe(next -> System.out.println("Subscriber #2: " + next), // onNext
+ throwable -> System.out.println("Error: " + throwable), // onError
+ () -> System.out.println("Sequence #2 complete") // onComplete
+ );
firstMillion.connect();
```
diff --git a/docs/Filtering-Observables.md b/docs/Filtering-Observables.md
index 620800dc8c..512b69ba8a 100644
--- a/docs/Filtering-Observables.md
+++ b/docs/Filtering-Observables.md
@@ -259,7 +259,7 @@ firstOrError.subscribe(
**ReactiveX documentation:** [http://reactivex.io/documentation/operators/ignoreelements.html](http://reactivex.io/documentation/operators/ignoreelements.html)
-Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the the source.
+Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the source.
### ignoreElement example
diff --git a/docs/Getting-Started.md b/docs/Getting-Started.md
index fb9baa47dd..69b69a8ca6 100644
--- a/docs/Getting-Started.md
+++ b/docs/Getting-Started.md
@@ -1,20 +1,20 @@
## Getting Binaries
-You can find binaries and dependency information for Maven, Ivy, Gradle, SBT, and others at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Cg%3A"io.reactivex.rxjava2"%20AND%20"rxjava2").
+You can find binaries and dependency information for Maven, Ivy, Gradle, SBT, and others at [http://search.maven.org](https://search.maven.org/search?q=g:io.reactivex.rxjava3%20AND%20rxjava).
Example for Maven:
```xml
+
This next example, in Clojure, consumes three asynchronous Observables, including a dependency from one to another, and emits a single response item by combining the items emitted by each of the three Observables with the [`zip`](http://reactivex.io/documentation/operators/zip.html) operator and then transforming the result with [`map`](http://reactivex.io/documentation/operators/map.html):
@@ -333,7 +333,7 @@ The response looks like this:
And here is a marble diagram that illustrates how that code produces that response:
-
+
The following example, in Groovy, comes from [Ben Christensen’s QCon presentation on the evolution of the Netflix API](https://speakerdeck.com/benjchristensen/evolution-of-the-netflix-api-qcon-sf-2013). It combines two Observables with the [`merge`](http://reactivex.io/documentation/operators/merge.html) operator, then uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to construct a single item out of the resulting sequence, then transforms that item with [`map`](http://reactivex.io/documentation/operators/map.html) before emitting it:
@@ -350,7 +350,7 @@ public Observable getVideoSummary(APIVideo video) {
And here is a marble diagram that illustrates how that code uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to bring the results from multiple Observables together in one structure:
-
+
## Error Handling
diff --git a/docs/Phantom-Operators.md b/docs/Phantom-Operators.md
index 60da4a1a40..5193147a22 100644
--- a/docs/Phantom-Operators.md
+++ b/docs/Phantom-Operators.md
@@ -19,7 +19,7 @@ These operators have been proposed but are not part of the 1.0 release of RxJava
## chunkify( )
#### returns an iterable that periodically returns a list of items emitted by the source Observable since the last list
-
+
The `chunkify( )` operator represents a blocking observable as an Iterable, that, each time you iterate over it, returns a list of items emitted by the source Observable since the previous iteration. These lists may be empty if there have been no such items emitted.
@@ -27,7 +27,7 @@ The `chunkify( )` operator represents a blocking observable as an Iterable, th
## fromFuture( )
#### convert a Future into an Observable, but do not attempt to get the Future's value until a Subscriber subscribes
-
+
The `fromFuture( )` method also converts a Future into an Observable, but it obtains this Future indirectly, by means of a function you provide. It creates the Observable immediately, but waits to call the function and to obtain the Future until a Subscriber subscribes to it.
@@ -35,7 +35,7 @@ The `fromFuture( )` method also converts a Future into an Observable, but it o
## forEachFuture( )
#### create a futureTask that will invoke a specified function on each item emitted by an Observable
-
+
The `forEachFuture( )` returns a `FutureTask` for each item emitted by the source Observable (or each item and each notification) that, when executed, will apply a function you specify to each such item (or item and notification).
@@ -43,7 +43,7 @@ The `forEachFuture( )` returns a `FutureTask` for each item emitted by the sou
## forIterable( )
#### apply a function to the elements of an Iterable to create Observables which are then concatenated
-
+
`forIterable( )` is similar to `from(Iterable )` but instead of the resulting Observable emitting the elements of the Iterable as its own emitted items, it applies a specified function to each of these elements to generate one Observable per element, and then concatenates the emissions of these Observables to be its own sequence of emitted items.
@@ -58,7 +58,7 @@ If the a subscriber to the Observable that results when a Future is converted to
## generate( ) and generateAbsoluteTime( )
#### create an Observable that emits a sequence of items as generated by a function of your choosing
-
+
The basic form of `generate( )` takes four parameters. These are `initialState` and three functions: `iterate( )`, `condition( )`, and `resultSelector( )`. `generate( )` uses these four parameters to generate an Observable sequence, which is its return value. It does so in the following way.
@@ -66,7 +66,7 @@ The basic form of `generate( )` takes four parameters. These are `initialState
There are also versions of `generate( )` that allow you to do the work of generating the sequence on a particular `Scheduler` and that allow you to set the time interval between emissions by applying a function to the current state. The `generateAbsoluteTime( )` allows you to control the time at which an item is emitted by applying a function to the state to get an absolute system clock time (rather than an interval from the previous emission).
-
+
#### see also:
* Introduction to Rx: Generate
@@ -79,7 +79,7 @@ There are also versions of `generate( )` that allow you to do the work of gene
This version of `groupBy` adds another parameter: an Observable that emits duration markers. When a duration marker is emitted by this Observable, any grouped Observables that have been opened are closed, and `groupByUntil( )` will create new grouped Observables for any subsequent emissions by the source Observable.
-
+
Another variety of `groupByUntil( )` limits the number of groups that can be active at any particular time. If an item is emitted by the source Observable that would cause the number of groups to exceed this maximum, before the new group is emitted, one of the existing groups is closed (that is, the Observable it represents terminates by calling its Subscribers' `onCompleted` methods and then expires).
@@ -101,7 +101,7 @@ To represent an Observable as a Connectable Observable, use the `multicast( )`
## onErrorFlatMap( )
#### instructs an Observable to emit a sequence of items whenever it encounters an error
-
+
The `onErrorFlatMap( )` method is similar to `onErrorResumeNext( )` except that it does not assume the source Observable will correctly terminate when it issues an error. Because of this, after emitting its backup sequence of items, `onErrorFlatMap( )` relinquishes control of the emitted sequence back to the source Observable. If that Observable again issues an error, `onErrorFlatMap( )` will again emit its backup sequence.
@@ -111,13 +111,13 @@ Because `onErrorFlatMap( )` is designed to work with pathological source Obser
Note that you should apply `onErrorFlatMap( )` directly to the pathological source Observable, and not to that Observable after it has been modified by additional operators, as such operators may effectively renormalize the source Observable by unsubscribing from it immediately after it issues an error. Below, for example, is an illustration showing how `onErrorFlatMap( )` will respond to two error-generating Observables that have been merged by the `merge( )` operator. Note that it will *not* react to both errors generated by both Observables, but only to the single error passed along by `merge( )`:
-
+
***
## parallel( )
#### split the work done on the emissions from an Observable into multiple Observables each operating on its own parallel thread
-
+
The `parallel( )` method splits an Observable into as many Observables as there are available processors, and does work in parallel on each of these Observables. `parallel( )` then merges the results of these parallel computations back into a single, well-behaved Observable sequence.
@@ -127,7 +127,7 @@ streamOfItems.flatMap(item -> {
itemToObservable(item).subscribeOn(Schedulers.io());
});
```
-Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asychronous calls.
+Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asynchronous calls.
#### see also:
* RxJava Threading Examples by Graham Lea
@@ -136,7 +136,7 @@ Kick off your work for each item inside [`flatMap`](Transforming-Observables#fla
## parallelMerge( )
#### combine multiple Observables into a smaller number of Observables, to facilitate parallelism
-
+
Use the `parallelMerge( )` method to take an Observable that emits a large number of Observables and to reduce it to an Observable that emits a particular, smaller number of Observables that emit the same set of items as the original larger set of Observables: for instance a number of Observables that matches the number of parallel processes that you want to use when processing the emissions from the complete set of Observables.
@@ -144,7 +144,7 @@ Use the `parallelMerge( )` method to take an Observable that emits a large num
## pivot( )
#### combine multiple sets of grouped observables so that they are arranged primarily by group rather than by set
-
+
If you combine multiple sets of grouped observables, such as those created by [`groupBy( )` and `groupByUntil( )`](Transforming-Observables#wiki-groupby-and-groupbyuntil), then even if those grouped observables have been grouped by a similar differentiation function, the resulting grouping will be primarily based on which set the observable came from, not on which group the observable belonged to.
@@ -152,13 +152,13 @@ An example may make this clearer. Imagine you use `groupBy( )` to group the em
The result will be a grouped observable that emits two groups: the grouped observable resulting from transforming Observable1, and the grouped observable resulting from transforming Observable2. Each of those grouped observables emit observables that in turn emit the odds and evens from the source observables. You can use `pivot( )` to change this around: by applying `pivot( )` to this grouped observable it will transform into one that emits two different groups: the odds group and the evens group, with each of these groups emitting a separate observable corresponding to which source observable its set of integers came from. Here is an illustration:
-
+
***
## publishLast( )
#### represent an Observable as a Connectable Observable that emits only the last item emitted by the source Observable
-
+
#### see also:
* RxJS: `publishLast`
diff --git a/docs/What's-different-in-2.0.md b/docs/What's-different-in-2.0.md
index fac50df56d..edc681fa7f 100644
--- a/docs/What's-different-in-2.0.md
+++ b/docs/What's-different-in-2.0.md
@@ -450,12 +450,12 @@ Before 2.0.7, the operator `strict()` had to be applied in order to achieve the
As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a custom `io.reactivex.FlowableSubscriber` interface (extends `org.reactivestreams.Subscriber`) but adds no new methods to it. The new interface is **constrained to RxJava 2** and represents a consumer to `Flowable` that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9:
- - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
+ - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.
-From a user's perspective, if one was using the the `subscribe` methods other than `Flowable.subscribe(Subscriber super T>)`, there is no need to do anything regarding this change and there is no extra penalty for it.
+From a user's perspective, if one was using the `subscribe` methods other than `Flowable.subscribe(Subscriber super T>)`, there is no need to do anything regarding this change and there is no extra penalty for it.
If one was using `Flowable.subscribe(Subscriber super T>)` with the built-in RxJava `Subscriber` implementations such as `DisposableSubscriber`, `TestSubscriber` and `ResourceSubscriber`, there is a small runtime overhead (one `instanceof` check) associated when the code is not recompiled against 2.0.7.
diff --git a/docs/Writing-operators-for-2.0.md b/docs/Writing-operators-for-2.0.md
index e8486564b1..1a51664880 100644
--- a/docs/Writing-operators-for-2.0.md
+++ b/docs/Writing-operators-for-2.0.md
@@ -565,7 +565,7 @@ Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Sub
The rule relaxations are as follows:
-- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
+- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.
diff --git a/docs/_Sidebar.md b/docs/_Sidebar.md
index 32e201fc46..22961acec3 100644
--- a/docs/_Sidebar.md
+++ b/docs/_Sidebar.md
@@ -27,6 +27,6 @@
* [Writing operators](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0)
* [Backpressure](https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0))
* [another explanation](https://github.com/ReactiveX/RxJava/wiki/Backpressure)
-* [JavaDoc](http://reactivex.io/RxJava/2.x/javadoc)
+* JavaDoc: [1.x](http://reactivex.io/RxJava/1.x/javadoc), [2.x](http://reactivex.io/RxJava/2.x/javadoc), [3.x](http://reactivex.io/RxJava/3.x/javadoc)
* [Coming from RxJava 1](https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0)
* [Additional Reading](https://github.com/ReactiveX/RxJava/wiki/Additional-Reading)
diff --git a/gradle.properties b/gradle.properties
index 820b2a5bc1..e685b8103a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,2 +1,24 @@
-release.scope=patch
-release.version=3.0.0-SNAPSHOT
+group=io.reactivex.rxjava3
+version=3.0.0-SNAPSHOT
+description=RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
+
+POM_ARTIFACT_ID=rxjava
+POM_NAME=RxJava
+POM_PACKAGING=jar
+
+POM_DESCRIPTION=Reactive Extensions for Java
+POM_INCEPTION_YEAR=2013
+
+POM_URL=https://github.com/ReactiveX/RxJava
+POM_SCM_URL=https://github.com/ReactiveX/RxJava
+POM_SCM_CONNECTION=scm:git:git://github.com/ReactiveX/RxJava.git
+POM_SCM_DEV_CONNECTION=scm:git:ssh://git@github.com/ReactiveX/RxJava.git
+
+POM_LICENCE_NAME=The Apache Software License, Version 2.0
+POM_LICENCE_URL=https://www.apache.org/licenses/LICENSE-2.0.txt
+POM_LICENCE_DIST=repo
+
+POM_DEVELOPER_ID=akarnokd
+POM_DEVELOPER_NAME=David Karnok
+POM_DEVELOPER_URL=https://github.com/akarnokd/
+POM_DEVELOPER_EMAIL=akarnokd@gmail.com
diff --git a/gradle/buildViaTravis.sh b/gradle/buildViaTravis.sh
deleted file mode 100755
index ea385c3e92..0000000000
--- a/gradle/buildViaTravis.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-# This script will build the project.
-
-buildTag="$TRAVIS_TAG"
-
-if [ "$buildTag" != "" ] && [ "${buildTag:0:3}" != "v3." ]; then
- echo -e "Wrong tag on the 3.x brach: $buildTag : build stopped"
- exit 1
-fi
-
-export GRADLE_OPTS=-Xmx1024m
-
-if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then
- echo -e "Build Pull Request #$TRAVIS_PULL_REQUEST => Branch [$TRAVIS_BRANCH]"
- ./gradlew -PreleaseMode=pr build --stacktrace
-elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ]; then
- if [ "$TRAVIS_BRANCH" != "3.x" ]; then
- echo -e 'Build secondary Branch (no snapshot) => Branch ['$TRAVIS_BRANCH']'
- ./gradlew -PreleaseMode=pr build --stacktrace
- else
- echo -e 'Build Branch with Snapshot => Branch ['$TRAVIS_BRANCH']'
- ./gradlew -PreleaseMode=branch -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build --stacktrace
- fi
-elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ]; then
- echo -e 'Build Branch for Release => Branch ['$TRAVIS_BRANCH'] Tag ['$TRAVIS_TAG']'
- ./gradlew -PreleaseMode=full -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build --stacktrace
-else
- echo -e 'WARN: Should not be here => Branch ['$TRAVIS_BRANCH'] Tag ['$TRAVIS_TAG'] Pull Request ['$TRAVIS_PULL_REQUEST']'
-fi
diff --git a/gradle/javadoc_cleanup.gradle b/gradle/javadoc_cleanup.gradle
index 12216464a7..63b4f7f045 100644
--- a/gradle/javadoc_cleanup.gradle
+++ b/gradle/javadoc_cleanup.gradle
@@ -1,36 +1,74 @@
// remove the excessive whitespaces between method arguments in the javadocs
task javadocCleanup(dependsOn: "javadoc") doLast {
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Flowable.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Observable.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Single.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Maybe.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Completable.html'));
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Flowable.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Observable.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Single.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Maybe.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/core/Completable.html'))
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/flowables/ConnectableFlowable.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observables/ConnectableObservable.html'));
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/flowables/ConnectableFlowable.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observables/ConnectableObservable.html'))
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/ReplaySubject.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/ReplayProcessor.html'));
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/plugins/RxJavaPlugins.html'));
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/ReplaySubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/ReplayProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/PublishSubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/PublishProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/AsyncSubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/AsyncProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/BehaviorSubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/BehaviorProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/MulticastProcessor.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subjects/UnicastSubject.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/processors/UnicastProcessor.html'))
- fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/parallel/ParallelFlowable.html'));
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/plugins/RxJavaPlugins.html'))
+
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/parallel/ParallelFlowable.html'))
+
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/disposables/Disposable.html'))
+
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observers/TestObserver.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/observers/BaseTestConsumer.html'))
+ fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/rxjava3/subscribers/TestSubscriber.html'))
}
def fixJavadocFile(file) {
- println("Cleaning up: " + file);
+ logger.lifecycle("Cleaning up: " + file)
String fileContents = file.getText('UTF-8')
// lots of spaces after the previous method argument
- fileContents = fileContents.replaceAll(",\\s{4,}", ",\n ");
+ fileContents = fileContents.replaceAll(",\\s{4,}", ",\n ")
// lots of spaces after the @NonNull annotations
- fileContents = fileContents.replaceAll("@NonNull\\s{4,}", "@NonNull ");
+ fileContents = fileContents.replaceAll("@NonNull\\s{4,}", "@NonNull ")
// lots of spaces after the @Nullable annotations
- fileContents = fileContents.replaceAll("@Nullable\\s{4,}", "@Nullable ");
+ fileContents = fileContents.replaceAll("@Nullable\\s{4,}", "@Nullable ")
- file.setText(fileContents, 'UTF-8');
-}
+ // javadoc bug: duplicates the link to @NonNull for some reason
+ def nonNullText1 = "@NonNull"
+
+ fileContents = fileContents.replace(nonNullText1 + " " + nonNullText1, nonNullText1)
+ fileContents = fileContents.replace(nonNullText1 + "\n " + nonNullText1, nonNullText1)
+ fileContents = fileContents.replace(nonNullText1 + "\r\n " + nonNullText1, nonNullText1)
+
+ def nonNullText2 = "@NonNull"
+ fileContents = fileContents.replace(nonNullText2 + " " + nonNullText2, nonNullText2)
+ fileContents = fileContents.replace(nonNullText2 + "\n " + nonNullText2, nonNullText2)
+ fileContents = fileContents.replace(nonNullText2 + "\r\n " + nonNullText2, nonNullText2)
-javadocJar.dependsOn javadocCleanup
-build.dependsOn javadocCleanup
\ No newline at end of file
+ // javadoc bug: duplicates the link to @Nullable for some reason
+ def nullableText1 = "@Nullable"
+
+ fileContents = fileContents.replace(nullableText1 + " " + nullableText1, nullableText1)
+ fileContents = fileContents.replace(nullableText1 + "\n " + nullableText1, nullableText1)
+ fileContents = fileContents.replace(nullableText1 + "\r\n " + nullableText1, nullableText1)
+
+ def nullableText2 = "@Nullable"
+
+ fileContents = fileContents.replace(nullableText2 + " " + nullableText2, nullableText2)
+ fileContents = fileContents.replace(nullableText2 + "\n " + nullableText2, nullableText2)
+ fileContents = fileContents.replace(nullableText2 + "\r\n " + nullableText2, nullableText2)
+
+ file.setText(fileContents, 'UTF-8')
+}
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 5c2d1cf016..afba109285 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 94920145f3..3c44eb1b6f 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.0.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
+networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index 83f2acfdc3..65dcd68d65 100755
--- a/gradlew
+++ b/gradlew
@@ -1,7 +1,7 @@
-#!/usr/bin/env sh
+#!/bin/sh
#
-# Copyright 2015 the original author or authors.
+# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,78 +17,113 @@
#
##############################################################################
-##
-## Gradle start up script for UN*X
-##
+#
+# Gradle start up script for POSIX generated by Gradle.
+#
+# Important for running:
+#
+# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
+# noncompliant, but you have some other compliant shell such as ksh or
+# bash, then to run this script, type that shell name before the whole
+# command line, like:
+#
+# ksh Gradle
+#
+# Busybox and similar reduced shells will NOT work, because this script
+# requires all of these POSIX shell features:
+# * functions;
+# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+# * compound commands having a testable exit status, especially «case»;
+# * various built-in commands including «command», «set», and «ulimit».
+#
+# Important for patching:
+#
+# (2) This script targets any POSIX shell, so it avoids extensions provided
+# by Bash, Ksh, etc; in particular arrays are avoided.
+#
+# The "traditional" practice of packing multiple parameters into a
+# space-separated string is a well documented source of bugs and security
+# problems, so this is (mostly) avoided, by progressively accumulating
+# options in "$@", and eventually passing that to Java.
+#
+# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
+# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
+# see the in-line comments for details.
+#
+# There are tweaks for specific operating systems such as AIX, CygWin,
+# Darwin, MinGW, and NonStop.
+#
+# (3) This script is generated from the Groovy template
+# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# within the Gradle project.
+#
+# You can find Gradle at https://github.com/gradle/gradle/.
+#
##############################################################################
# Attempt to set APP_HOME
+
# Resolve links: $0 may be a link
-PRG="$0"
-# Need this for relative symlinks.
-while [ -h "$PRG" ] ; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`"/$link"
- fi
+app_path=$0
+
+# Need this for daisy-chained symlinks.
+while
+ APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
+ [ -h "$app_path" ]
+do
+ ls=$( ls -ld "$app_path" )
+ link=${ls#*' -> '}
+ case $link in #(
+ /*) app_path=$link ;; #(
+ *) app_path=$APP_HOME$link ;;
+ esac
done
-SAVED="`pwd`"
-cd "`dirname \"$PRG\"`/" >/dev/null
-APP_HOME="`pwd -P`"
-cd "$SAVED" >/dev/null
-APP_NAME="Gradle"
-APP_BASE_NAME=`basename "$0"`
+# This is normally unused
+# shellcheck disable=SC2034
+APP_BASE_NAME=${0##*/}
+APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
-MAX_FD="maximum"
+MAX_FD=maximum
warn () {
echo "$*"
-}
+} >&2
die () {
echo
echo "$*"
echo
exit 1
-}
+} >&2
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
-case "`uname`" in
- CYGWIN* )
- cygwin=true
- ;;
- Darwin* )
- darwin=true
- ;;
- MINGW* )
- msys=true
- ;;
- NONSTOP* )
- nonstop=true
- ;;
+case "$( uname )" in #(
+ CYGWIN* ) cygwin=true ;; #(
+ Darwin* ) darwin=true ;; #(
+ MSYS* | MINGW* ) msys=true ;; #(
+ NONSTOP* ) nonstop=true ;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
+ JAVACMD=$JAVA_HOME/jre/sh/java
else
- JAVACMD="$JAVA_HOME/bin/java"
+ JAVACMD=$JAVA_HOME/bin/java
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
@@ -97,7 +132,7 @@ Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
- JAVACMD="java"
+ JAVACMD=java
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
@@ -105,84 +140,105 @@ location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
-if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
- MAX_FD_LIMIT=`ulimit -H -n`
- if [ $? -eq 0 ] ; then
- if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
- MAX_FD="$MAX_FD_LIMIT"
- fi
- ulimit -n $MAX_FD
- if [ $? -ne 0 ] ; then
- warn "Could not set maximum file descriptor limit: $MAX_FD"
- fi
- else
- warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
- fi
+if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
+ case $MAX_FD in #(
+ max*)
+ # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC3045
+ MAX_FD=$( ulimit -H -n ) ||
+ warn "Could not query maximum file descriptor limit"
+ esac
+ case $MAX_FD in #(
+ '' | soft) :;; #(
+ *)
+ # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC3045
+ ulimit -n "$MAX_FD" ||
+ warn "Could not set maximum file descriptor limit to $MAX_FD"
+ esac
fi
-# For Darwin, add options to specify how the application appears in the dock
-if $darwin; then
- GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
-fi
+# Collect all arguments for the java command, stacking in reverse order:
+# * args from the command line
+# * the main class name
+# * -classpath
+# * -D...appname settings
+# * --module-path (only if needed)
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
# For Cygwin or MSYS, switch paths to Windows format before running java
-if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
- APP_HOME=`cygpath --path --mixed "$APP_HOME"`
- CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
- JAVACMD=`cygpath --unix "$JAVACMD"`
-
- # We build the pattern for arguments to be converted via cygpath
- ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
- SEP=""
- for dir in $ROOTDIRSRAW ; do
- ROOTDIRS="$ROOTDIRS$SEP$dir"
- SEP="|"
- done
- OURCYGPATTERN="(^($ROOTDIRS))"
- # Add a user-defined pattern to the cygpath arguments
- if [ "$GRADLE_CYGPATTERN" != "" ] ; then
- OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
- fi
+if "$cygwin" || "$msys" ; then
+ APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
+ CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
+
+ JAVACMD=$( cygpath --unix "$JAVACMD" )
+
# Now convert the arguments - kludge to limit ourselves to /bin/sh
- i=0
- for arg in "$@" ; do
- CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
- CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
-
- if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
- eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
- else
- eval `echo args$i`="\"$arg\""
+ for arg do
+ if
+ case $arg in #(
+ -*) false ;; # don't mess with options #(
+ /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
+ [ -e "$t" ] ;; #(
+ *) false ;;
+ esac
+ then
+ arg=$( cygpath --path --ignore --mixed "$arg" )
fi
- i=$((i+1))
+ # Roll the args list around exactly as many times as the number of
+ # args, so each arg winds up back in the position where it started, but
+ # possibly modified.
+ #
+ # NB: a `for` loop captures its iteration list before it begins, so
+ # changing the positional parameters here affects neither the number of
+ # iterations, nor the values presented in `arg`.
+ shift # remove old arg
+ set -- "$@" "$arg" # push replacement arg
done
- case $i in
- (0) set -- ;;
- (1) set -- "$args0" ;;
- (2) set -- "$args0" "$args1" ;;
- (3) set -- "$args0" "$args1" "$args2" ;;
- (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
- (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
- (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
- (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
- (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
- (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
- esac
fi
-# Escape application args
-save () {
- for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
- echo " "
-}
-APP_ARGS=$(save "$@")
+# Collect all arguments for the java command;
+# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
+# shell script including quotes and variable substitutions, so put them in
+# double quotes to make sure that they get re-expanded; and
+# * put everything else in single quotes, so that it's not re-expanded.
+
+set -- \
+ "-Dorg.gradle.appname=$APP_BASE_NAME" \
+ -classpath "$CLASSPATH" \
+ org.gradle.wrapper.GradleWrapperMain \
+ "$@"
+
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
-# Collect all arguments for the java command, following the shell quoting and substitution rules
-eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
+# Use "xargs" to parse quoted args.
+#
+# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
+#
+# In Bash we could simply go:
+#
+# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
+# set -- "${ARGS[@]}" "$@"
+#
+# but POSIX shell has neither arrays nor command substitution, so instead we
+# post-process each arg (as a line of input to sed) to backslash-escape any
+# character that might be a shell metacharacter, then use eval to reverse
+# that process (while maintaining the separation between arguments), and wrap
+# the whole thing up as a single "set" statement.
+#
+# This will of course break if any of these variables contains a newline or
+# an unmatched quote.
+#
-# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
-if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
- cd "$(dirname "$0")"
-fi
+eval "set -- $(
+ printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
+ xargs -n1 |
+ sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
+ tr '\n' ' '
+ )" '"$@"'
exec "$JAVACMD" "$@"
diff --git a/gradlew.bat b/gradlew.bat
index 9618d8d960..93e3f59f13 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -14,7 +14,7 @@
@rem limitations under the License.
@rem
-@if "%DEBUG%" == "" @echo off
+@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@@ -25,10 +25,14 @@
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.
+if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
+@rem Resolve any "." and ".." in APP_HOME to make it shorter.
+for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
+
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@@ -37,7 +41,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
-if "%ERRORLEVEL%" == "0" goto init
+if %ERRORLEVEL% equ 0 goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@@ -51,7 +55,7 @@ goto fail
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
-if exist "%JAVA_EXE%" goto init
+if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
@@ -61,38 +65,26 @@ echo location of your Java installation.
goto fail
-:init
-@rem Get command-line arguments, handling Windows variants
-
-if not "%OS%" == "Windows_NT" goto win9xME_args
-
-:win9xME_args
-@rem Slurp the command line arguments.
-set CMD_LINE_ARGS=
-set _SKIP=2
-
-:win9xME_args_slurp
-if "x%~1" == "x" goto execute
-
-set CMD_LINE_ARGS=%*
-
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
@rem Execute Gradle
-"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
-if "%ERRORLEVEL%"=="0" goto mainEnd
+if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
-if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
-exit /b 1
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal
diff --git a/gradle/push_javadoc.sh b/push_javadoc.sh
similarity index 78%
rename from gradle/push_javadoc.sh
rename to push_javadoc.sh
index c8f648258e..28ce74f1db 100644
--- a/gradle/push_javadoc.sh
+++ b/push_javadoc.sh
@@ -8,21 +8,9 @@
targetRepo=github.com/ReactiveX/RxJava.git
# =======================================================================
-# only for main pushes, for now
-if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then
- echo -e "Pull request detected, skipping JavaDocs pushback."
- exit 0
-fi
-
-# only when on the 3.x branch and not tagged
-if [ "$TRAVIS_BRANCH" != "3.x" ] && [ "$TRAVIS_TAG" == "" ]; then
- echo -e "On a secondary branch '$TRAVIS_BRANCH', skipping JavaDocs pushback."
- exit 0
-fi
-
# get the current build tag if any
-buildTag="$TRAVIS_TAG"
-echo -e "Travis tag: '$buildTag'"
+buildTag="$BUILD_TAG"
+echo -e "Build tag: '$buildTag'"
if [ "$buildTag" == "" ]; then
buildTag="snapshot"
@@ -33,18 +21,18 @@ fi
echo -e "JavaDocs pushback for tag: $buildTag"
# check if the token is actually there
-if [ "$GITHUB_TOKEN" == "" ]; then
+if [ "$JAVADOCS_TOKEN" == "" ]; then
echo -e "No access to GitHub, skipping JavaDocs pushback."
exit 0
fi
# prepare the git information
-git config --global user.email "travis@travis-ci.org"
-git config --global user.name "Travis CI"
+git config --global user.email "akarnokd+ci@gmail.com"
+git config --global user.name "akarnokd+ci"
# setup the remote
echo -e "Adding the target repository to git"
-git remote add origin-pages https://${GITHUB_TOKEN}@${targetRepo} > /dev/null 2>&1
+git remote add origin-pages https://${JAVADOCS_TOKEN}@${targetRepo} > /dev/null 2>&1
# stash changes due to chmod
echo -e "Stashing any local non-ignored changes"
@@ -119,8 +107,8 @@ echo -e "Removing deleted files"
git add -u
# commit all
-echo -e "commit Travis build: $TRAVIS_BUILD_NUMBER for $buildTag"
-git commit --message "Travis build: $TRAVIS_BUILD_NUMBER for $buildTag"
+echo -e "commit CI build: $CI_BUILD_NUMBER for $buildTag"
+git commit --message "CI build: $CI_BUILD_NUMBER for $buildTag"
# debug file list
#find -name "*.html"
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
index 836957192e..6e6ae7e3c6 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -139,9 +139,9 @@ public Observable extends Integer> apply(Integer v) {
}
});
- singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function* Like {@code Observable}, a running {@code Completable} can be stopped through the {@link Disposable} instance - * provided to consumers through {@link SingleObserver#onSubscribe}. + * provided to consumers through {@link CompletableObserver#onSubscribe}. *
* Like an {@code Observable}, a {@code Completable} is lazy, can be either "hot" or "cold", synchronous or * asynchronous. {@code Completable} instances returned by the methods of this class are cold @@ -479,7 +479,7 @@ public static Completable unsafeCreate(@NonNull CompletableSource onSubscribe) { @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - public static Completable defer(@NonNull Supplier extends CompletableSource> supplier) { + public static Completable defer(@NonNull Supplier extends @NonNull CompletableSource> supplier) { Objects.requireNonNull(supplier, "supplier is null"); return RxJavaPlugins.onAssembly(new CompletableDefer(supplier)); } @@ -503,7 +503,7 @@ public static Completable defer(@NonNull Supplier extends CompletableSource> s @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - public static Completable error(@NonNull Supplier extends Throwable> supplier) { + public static Completable error(@NonNull Supplier extends @NonNull Throwable> supplier) { Objects.requireNonNull(supplier, "supplier is null"); return RxJavaPlugins.onAssembly(new CompletableErrorSupplier(supplier)); } @@ -529,8 +529,8 @@ public static Completable error(@NonNull Throwable throwable) { } /** - * Returns a {@code Completable} instance that runs the given {@link Action} for each subscriber and - * emits either an unchecked exception or simply completes. + * Returns a {@code Completable} instance that runs the given {@link Action} for each {@link CompletableObserver} and + * emits either an exception or simply completes. *
*
*
*
+ *
+ * If the code to be wrapped needs to throw a checked or more broader {@link Throwable} exception, that + * exception has to be converted to an unchecked exception by the wrapped code itself. Alternatively, + * use the {@link #fromAction(Action)} method which allows the wrapped code to throw any {@code Throwable} + * exception and will signal it to observers as-is. *
- *
+ *
*
+ * The {@code CompletableObserver} will be removed after the callback for the terminal event has been invoked. + *
* * Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}. - * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)} + * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)} * around {@code fromCompletionStage}: *
* Maybe.defer(() -> Completable.fromCompletionStage(createCompletionStage()));
@@ -3421,7 +3466,7 @@ public static Completable fromCompletionStage(@NonNull CompletionStage> stage)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public final CompletionStage toCompletionStage(@Nullable T defaultItem) {
+ public final <@Nullable T> CompletionStage toCompletionStage(T defaultItem) {
return subscribeWith(new CompletionStageConsumer<>(true, defaultItem));
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableConverter.java b/src/main/java/io/reactivex/rxjava3/core/CompletableConverter.java
index 83e8f693eb..a213c6f5ab 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableConverter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableConverter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java b/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
index 5ebb4e1877..2c5a9811c4 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java b/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
index 7c01ebfee1..9339307047 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java b/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
index 70d79e62b6..e73fae2d5c 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -10,13 +10,14 @@
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
+
package io.reactivex.rxjava3.core;
import io.reactivex.rxjava3.annotations.NonNull;
/**
* A functional interface that has a {@code subscribe()} method that receives
- * an instance of a {@link CompletableEmitter} instance that allows pushing
+ * a {@link CompletableEmitter} instance that allows pushing
* an event in a cancellation-safe manner.
*/
@FunctionalInterface
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java b/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
index e9b1df83cb..f06a94f36a 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java b/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
index 58edf9471c..90d3853b8a 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -10,6 +10,7 @@
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
+
package io.reactivex.rxjava3.core;
import io.reactivex.rxjava3.annotations.NonNull;
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java b/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
index 98a9e2aa19..2887c4717c 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
diff --git a/src/main/java/io/reactivex/rxjava3/core/Emitter.java b/src/main/java/io/reactivex/rxjava3/core/Emitter.java
index e9ea2a1827..83410f056e 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Emitter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Emitter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -10,6 +10,7 @@
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
+
package io.reactivex.rxjava3.core;
import io.reactivex.rxjava3.annotations.NonNull;
diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index ddf5b3cf7b..39f3c63b43 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
@@ -10,6 +10,7 @@
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
+
package io.reactivex.rxjava3.core;
import java.util.*;
@@ -19,12 +20,11 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
-import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.flowables.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
-import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.MaybeToFlowable;
@@ -34,6 +34,7 @@
import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.rxjava3.internal.subscribers.*;
import io.reactivex.rxjava3.internal.util.*;
+import io.reactivex.rxjava3.operators.ScalarSupplier;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.*;
@@ -53,7 +54,7 @@
*
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
*
- *
+ *
*
* The {@code Flowable} follows the protocol
*
@@ -163,13 +164,21 @@ public abstract class Flowable<@NonNull T> implements Publisher {
* Mirrors the one {@link Publisher} in an {@link Iterable} of several {@code Publisher}s that first either emits an item or sends
* a termination notification.
*
- *
+ *
+ *
+ * When one of the {@code Publisher}s signal an item or terminates first, all subscriptions to the other
+ * {@code Publisher}s are canceled.
*
* - Backpressure:
* - The operator itself doesn't interfere with backpressure which is determined by the winning
* {@code Publisher}'s backpressure behavior.
* - Scheduler:
* - {@code amb} does not operate by default on a particular {@link Scheduler}.
+ * - Error handling:
+ * -
+ * If any of the losing {@code Publisher}s signals an error, the error is routed to the global
+ * error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
*
*
* @param the common element type
@@ -184,7 +193,7 @@ public abstract class Flowable<@NonNull T> implements Publisher {
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
- public static Flowable amb(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
+ public static <@NonNull T> Flowable amb(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableAmb<>(null, sources));
}
@@ -193,13 +202,21 @@ public static Flowable amb(@NonNull Iterable<@NonNull ? extends Publisher
* Mirrors the one {@link Publisher} in an array of several {@code Publisher}s that first either emits an item or sends
* a termination notification.
*
- *
+ *
+ *
+ * When one of the {@code Publisher}s signal an item or terminates first, all subscriptions to the other
+ * {@code Publisher}s are canceled.
*
* - Backpressure:
* - The operator itself doesn't interfere with backpressure which is determined by the winning
* {@code Publisher}'s backpressure behavior.
* - Scheduler:
* - {@code ambArray} does not operate by default on a particular {@link Scheduler}.
+ * - Error handling:
+ * -
+ * If any of the losing {@code Publisher}s signals an error, the error is routed to the global
+ * error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
*
*
* @param the common element type
@@ -215,7 +232,7 @@ public static Flowable amb(@NonNull Iterable<@NonNull ? extends Publisher
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
- public static Flowable ambArray(@NonNull Publisher<@NonNull ? extends T>... sources) {
+ public static <@NonNull T> Flowable ambArray(@NonNull Publisher extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
int len = sources.length;
if (len == 0) {
@@ -279,7 +296,7 @@ public static int bufferSize() {
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatestArray(@NonNull Publisher<@NonNull ? extends T>[] sources, @NonNull Function super Object[], ? extends R> combiner) {
+ public static <@NonNull T, @NonNull R> Flowable combineLatestArray(@NonNull Publisher extends T>[] sources, @NonNull Function super Object[], ? extends R> combiner) {
return combineLatestArray(sources, combiner, bufferSize());
}
@@ -327,7 +344,7 @@ public static int bufferSize() {
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatestArray(@NonNull Publisher<@NonNull ? extends T>[] sources, @NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
+ public static <@NonNull T, @NonNull R> Flowable combineLatestArray(@NonNull Publisher extends T>[] sources, @NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
@@ -378,7 +395,7 @@ public static Flowable combineLatestArray(@NonNull Publisher<@NonNull
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatest(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatest(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources,
@NonNull Function super Object[], ? extends R> combiner) {
return combineLatest(sources, combiner, bufferSize());
}
@@ -427,7 +444,7 @@ public static Flowable combineLatestArray(@NonNull Publisher<@NonNull
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatest(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatest(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources,
@NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
Objects.requireNonNull(sources, "sources is null");
Objects.requireNonNull(combiner, "combiner is null");
@@ -476,7 +493,7 @@ public static Flowable combineLatest(@NonNull Iterable<@NonNull ? exte
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatestArrayDelayError(@NonNull Publisher<@NonNull ? extends T>[] sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatestArrayDelayError(@NonNull Publisher extends T>[] sources,
@NonNull Function super Object[], ? extends R> combiner) {
return combineLatestArrayDelayError(sources, combiner, bufferSize());
}
@@ -526,7 +543,7 @@ public static Flowable combineLatest(@NonNull Iterable<@NonNull ? exte
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
- public static Flowable combineLatestArrayDelayError(@NonNull Publisher<@NonNull ? extends T>[] sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatestArrayDelayError(@NonNull Publisher extends T>[] sources,
@NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
Objects.requireNonNull(sources, "sources is null");
Objects.requireNonNull(combiner, "combiner is null");
@@ -579,7 +596,7 @@ public static Flowable combineLatest(@NonNull Iterable<@NonNull ? exte
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatestDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatestDelayError(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources,
@NonNull Function super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
}
@@ -629,7 +646,7 @@ public static Flowable combineLatest(@NonNull Iterable<@NonNull ? exte
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
- public static Flowable combineLatestDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources,
+ public static <@NonNull T, @NonNull R> Flowable combineLatestDelayError(@NonNull Iterable<@NonNull ? extends Publisher extends T>> sources,
@NonNull Function super Object[], ? extends R> combiner, int bufferSize) {
Objects.requireNonNull(sources, "sources is null");
Objects.requireNonNull(combiner, "combiner is null");
@@ -646,7 +663,7 @@ public static Flowable combineLatestDelayError(@NonNull Iterable<@NonN
* resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
*
- *
+ *
*
* - Backpressure:
* - The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -674,8 +691,8 @@ public static
Flowable combineLatestDelayError(@NonNull Iterable<@NonN
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public static Flowable combineLatest(
- @NonNull Publisher<@NonNull ? extends T1> source1, @NonNull Publisher<@NonNull ? extends T2> source2,
+ public static <@NonNull T1, @NonNull T2, @NonNull R> Flowable combineLatest(
+ @NonNull Publisher extends T1> source1, @NonNull Publisher extends T2> source2,
@NonNull BiFunction super T1, ? super T2, ? extends R> combiner) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
@@ -692,7 +709,7 @@ public static Flowable combineLatest(
* resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
*
- *
+ *
*
* - Backpressure:
* - The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -723,9 +740,9 @@ public static
Flowable combineLatest(
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
- public static Flowable combineLatest(
- @NonNull Publisher<@NonNull ? extends T1> source1, @NonNull Publisher<@NonNull ? extends T2> source2,
- @NonNull Publisher<@NonNull ? extends T3> source3,
+ public static <@NonNull T1, @NonNull T2, @NonNull T3, @NonNull R> Flowable combineLatest(
+ @NonNull Publisher extends T1> source1, @NonNull Publisher extends T2> source2,
+ @NonNull Publisher extends T3> source3,
@NonNull Function3 super T1, ? super T2, ? super T3, ? extends R> combiner) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
@@ -743,7 +760,7 @@ public static Flowable combineLatest(
* resulting sequence terminates immediately (normally or with all the errors accumulated until that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
*