Skip to content

3.x: Sync up with 2.2.10 snapshot #6507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add the Flowable javadoc changes too.
  • Loading branch information
akarnokd committed Jun 17, 2019
commit 7cd99ea8966399dbbac808a3e3d26de902901773
108 changes: 100 additions & 8 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import io.reactivex.subscribers.*;

/**
* The Flowable class that implements the Reactive-Streams Pattern and offers factory methods,
* intermediate operators and the ability to consume reactive dataflows.
* The Flowable class that implements the <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a>
* Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
* <p>
* Reactive-Streams operates with {@code Publisher}s which {@code Flowable} extends. Many operators
* Reactive Streams operates with {@link Publisher}s which {@code Flowable} extends. Many operators
* therefore accept general {@code Publisher}s directly and allow direct interoperation with other
* Reactive-Streams implementations.
* Reactive Streams implementations.
* <p>
* The Flowable hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
Expand All @@ -51,11 +51,103 @@
* <p>
* <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png" alt="">
* <p>
* The {@code Flowable} follows the protocol
* <pre><code>
* onSubscribe onNext* (onError | onComplete)?
* </code></pre>
* where the stream can be disposed through the {@link Subscription} instance provided to consumers through
* {@link Subscriber#onSubscribe(Subscription)}.
* Unlike the {@code Observable.subscribe()} of version 1.x, {@link #subscribe(Subscriber)} does not allow external cancellation
* of a subscription and the {@link Subscriber} instance is expected to expose such capability if needed.
* <p>
* Flowables support backpressure and require {@link Subscriber}s to signal demand via {@link Subscription#request(long)}.
* <p>
* Example:
* <pre><code>
* Disposable d = Flowable.just("Hello world!")
* .delay(1, TimeUnit.SECONDS)
* .subscribeWith(new DisposableSubscriber&lt;String&gt;() {
* &#64;Override public void onStart() {
* System.out.println("Start!");
* request(1);
* }
* &#64;Override public void onNext(String t) {
* System.out.println(t);
* request(1);
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* }
* });
*
* Thread.sleep(500);
* // the sequence can now be cancelled via dispose()
* d.dispose();
* </code></pre>
* <p>
* The Reactive Streams specification is relatively strict when defining interactions between {@code Publisher}s and {@code Subscriber}s, so much so
* that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
* request amounts via {@link Subscription#request(long)}.
* Therefore, RxJava has introduced the {@link FlowableSubscriber} interface that indicates the consumer can be driven with relaxed rules.
* All RxJava operators are implemented with these relaxed rules in mind.
* If the subscribing {@code Subscriber} does not implement this interface, for example, due to it being from another Reactive Streams compliant
* library, the Flowable will automatically apply a compliance wrapper around it.
* <p>
* {@code Flowable} is an abstract class, but it is not advised to implement sources and custom operators by extending the class directly due
* to the large amounts of <a href="https://github.com/reactive-streams/reactive-streams-jvm#specification">Reactive Streams</a>
* rules to be followed to the letter. See <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">the wiki</a> for
* some guidance if such custom implementations are necessary.
* <p>
* The recommended way of creating custom {@code Flowable}s is by using the {@link #create(FlowableOnSubscribe, BackpressureStrategy)} factory method:
* <pre><code>
* Flowable&lt;String&gt; source = Flowable.create(new FlowableOnSubscribe&lt;String&gt;() {
* &#64;Override
* public void subscribe(FlowableEmitter&lt;String&gt; emitter) throws Exception {
*
* // signal an item
* emitter.onNext("Hello");
*
* // could be some blocking operation
* Thread.sleep(1000);
*
* // the consumer might have cancelled the flow
* if (emitter.isCancelled() {
* return;
* }
*
* emitter.onNext("World");
*
* Thread.sleep(1000);
*
* // the end-of-sequence has to be signaled, otherwise the
* // consumers may never finish
* emitter.onComplete();
* }
* }, BackpressureStrategy.BUFFER);
*
* System.out.println("Subscribe!");
*
* source.subscribe(System.out::println);
*
* System.out.println("Done!");
* </code></pre>
* <p>
* RxJava reactive sources, such as {@code Flowable}, are generally synchronous and sequential in nature. In the ReactiveX design, the location (thread)
* where operators run is <i>orthogonal</i> to when the operators can work with data. This means that asynchrony and parallelism
* has to be explicitly expressed via operators such as {@link #subscribeOn(Scheduler)}, {@link #observeOn(Scheduler)} and {@link #parallel()}. In general,
* operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
* <p>
* For more information see the <a href="http://reactivex.io/documentation/Publisher.html">ReactiveX
* documentation</a>.
*
* @param <T>
* the type of the items emitted by the Flowable
* @see Observable
* @see ParallelFlowable
* @see io.reactivex.subscribers.DisposableSubscriber
*/
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
Expand Down Expand Up @@ -2199,7 +2291,7 @@ public static <T> Flowable<T> fromIterable(Iterable<? extends T> source) {
}

/**
* Converts an arbitrary Reactive-Streams Publisher into a Flowable if not already a
* Converts an arbitrary Reactive Streams Publisher into a Flowable if not already a
* Flowable.
* <p>
* The {@link Publisher} must follow the
Expand Down Expand Up @@ -4385,7 +4477,7 @@ public static Flowable<Long> timer(long delay, TimeUnit unit, Scheduler schedule

/**
* Create a Flowable by wrapping a Publisher <em>which has to be implemented according
* to the Reactive-Streams specification by handling backpressure and
* to the Reactive Streams specification by handling backpressure and
* cancellation correctly; no safeguards are provided by the Flowable itself</em>.
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -13569,7 +13661,7 @@ public final Flowable<T> retryWhen(
* Subscribes to the current Flowable and wraps the given Subscriber into a SafeSubscriber
* (if not already a SafeSubscriber) that
* deals with exceptions thrown by a misbehaving Subscriber (that doesn't follow the
* Reactive-Streams specification).
* Reactive Streams specification).
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator leaves the reactive world and the backpressure behavior depends on the Subscriber's behavior.</dd>
Expand Down Expand Up @@ -14792,7 +14884,7 @@ public final void subscribe(Subscriber<? super T> s) {
* If the {@link Flowable} rejects the subscription attempt or otherwise fails it will signal
* the error via {@link FlowableSubscriber#onError(Throwable)}.
* <p>
* This subscribe method relaxes the following Reactive-Streams rules:
* This subscribe method relaxes the following Reactive Streams rules:
* <ul>
* <li>§1.3: onNext should not be called concurrently until onSubscribe returns.
* <b>FlowableSubscriber.onSubscribe should make sure a sync or async call triggered by request() is safe.</b></li>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* Many operators in the class accept {@code ObservableSource}(s), the base reactive interface
* for such non-backpressured flows, which {@code Observable} itself implements as well.
* <p>
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()},
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
* overloads that allow setting their internal buffer size explicitly.
* <p>
Expand Down