Skip to content

Commit a00ea07

Browse files
authored
2.x: Flowable as a Publisher to be fully RS compliant (ReactiveX#5112)
1 parent 8700965 commit a00ea07

File tree

315 files changed

+1702
-734
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

315 files changed

+1702
-734
lines changed

Diff for: src/main/java/io/reactivex/Flowable.java

+58-7
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
import io.reactivex.flowables.*;
2424
import io.reactivex.functions.*;
2525
import io.reactivex.internal.functions.*;
26-
import io.reactivex.internal.fuseable.ScalarCallable;
26+
import io.reactivex.internal.fuseable.*;
2727
import io.reactivex.internal.operators.flowable.*;
28+
import io.reactivex.internal.operators.flowable.FlowableStrict.StrictSubscriber;
2829
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
2930
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3031
import io.reactivex.internal.subscribers.*;
@@ -1557,7 +1558,7 @@ public static <T> Flowable<T> concatEager(Publisher<? extends Publisher<? extend
15571558
@SchedulerSupport(SchedulerSupport.NONE)
15581559
@SuppressWarnings({ "rawtypes", "unchecked" })
15591560
public static <T> Flowable<T> concatEager(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch) {
1560-
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(sources, Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE));
1561+
return RxJavaPlugins.onAssembly(new FlowableConcatMapEagerPublisher(sources, Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE));
15611562
}
15621563

15631564
/**
@@ -11679,7 +11680,7 @@ public final Flowable<T> retryWhen(
1167911680
public final void safeSubscribe(Subscriber<? super T> s) {
1168011681
ObjectHelper.requireNonNull(s, "s is null");
1168111682
if (s instanceof SafeSubscriber) {
11682-
subscribe(s);
11683+
subscribe((SafeSubscriber<? super T>)s);
1168311684
} else {
1168411685
subscribe(new SafeSubscriber<T>(s));
1168511686
}
@@ -12713,13 +12714,15 @@ public final Flowable<T> startWithArray(T... items) {
1271312714
* </dl>
1271412715
* @return the new Flowable instance
1271512716
* @since 2.0.5 - experimental
12717+
* @deprecated 2.0.7, will be removed in 2.1.0; by default, the Publisher interface is always strict
1271612718
*/
1271712719
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
1271812720
@SchedulerSupport(SchedulerSupport.NONE)
1271912721
@Experimental
1272012722
@CheckReturnValue
12723+
@Deprecated
1272112724
public final Flowable<T> strict() {
12722-
return RxJavaPlugins.onAssembly(new FlowableStrict<T>(this));
12725+
return this;
1272312726
}
1272412727

1272512728
/**
@@ -12892,13 +12895,61 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
1289212895
@SchedulerSupport(SchedulerSupport.NONE)
1289312896
@Override
1289412897
public final void subscribe(Subscriber<? super T> s) {
12898+
if (s instanceof FlowableSubscriber) {
12899+
subscribe((FlowableSubscriber<? super T>)s);
12900+
} else {
12901+
ObjectHelper.requireNonNull(s, "s is null");
12902+
subscribe(new StrictSubscriber<T>(s));
12903+
}
12904+
}
12905+
12906+
/**
12907+
* Establish a connection between this Flowable and the given FlowableSubscriber and
12908+
* start streaming events based on the demand of the FlowableSubscriber.
12909+
* <p>
12910+
* This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
12911+
* <p>
12912+
* Each {@link Subscription} will work for only a single {@link FlowableSubscriber}.
12913+
* <p>
12914+
* If the same {@link FlowableSubscriber} instance is subscribed to multiple {@link Flowable}s and/or the
12915+
* same {@link Flowable} multiple times, it must ensure the serialization over its {@code onXXX}
12916+
* methods manually.
12917+
* <p>
12918+
* If the {@link Flowable} rejects the subscription attempt or otherwise fails it will signal
12919+
* the error via {@link FlowableSubscriber#onError(Throwable)}.
12920+
* <p>
12921+
* This subscribe method relaxes the following Reactive-Streams rules:
12922+
* <ul>
12923+
* <li>§1.3: onNext should not be called concurrently until onSubscribe returns.
12924+
* <b>FlowableSubscriber.onSubscribe should make sure a sync or async call triggered by request() is safe.</b></li>
12925+
* <li>§2.3: onError or onComplete must not call cancel.
12926+
* <b>Calling request() or cancel() is NOP at this point.</b></li>
12927+
* <li>§2.12: onSubscribe must be called at most once on the same instance.
12928+
* <b>FlowableSubscriber reuse is not checked and if happens, it is the responsibility of
12929+
* the FlowableSubscriber to ensure proper serialization of its onXXX methods.</b></li>
12930+
* <li>§3.9: negative requests should emit an onError(IllegalArgumentException).
12931+
* <b>Non-positive requests signal via RxJavaPlugins.onError and the stream is not affected.</b></li>
12932+
* </ul>
12933+
* <dl>
12934+
* <dt><b>Backpressure:</b></dt>
12935+
* <dd>The backpressure behavior/expectation is determined by the supplied {@code FlowableSubscriber}.</dd>
12936+
* <dt><b>Scheduler:</b></dt>
12937+
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
12938+
* </dl>
12939+
* @param s the FlowableSubscriber that will consume signals from this Flowable
12940+
* @since 2.0.7 - experimental
12941+
*/
12942+
@BackpressureSupport(BackpressureKind.SPECIAL)
12943+
@SchedulerSupport(SchedulerSupport.NONE)
12944+
@Experimental
12945+
public final void subscribe(FlowableSubscriber<? super T> s) {
1289512946
ObjectHelper.requireNonNull(s, "s is null");
1289612947
try {
12897-
s = RxJavaPlugins.onSubscribe(this, s);
12948+
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
1289812949

12899-
ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");
12950+
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");
1290012951

12901-
subscribeActual(s);
12952+
subscribeActual(z);
1290212953
} catch (NullPointerException e) { // NOPMD
1290312954
throw e;
1290412955
} catch (Throwable e) {

Diff for: src/main/java/io/reactivex/FlowableSubscriber.java

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.annotations.Experimental;
19+
20+
/**
21+
* Represents a Reactive-Streams inspired Subscriber that is RxJava 2 only
22+
* and weakens rules §1.3 and §3.9 of the specification for gaining performance.
23+
*
24+
* @param <T> the value type
25+
* @since 2.0.7 - experimental
26+
*/
27+
@Experimental
28+
public interface FlowableSubscriber<T> extends Subscriber<T> {
29+
30+
/**
31+
* Implementors of this method should make sure everything that needs
32+
* to be visible in {@link #onNext(Object)} is established before
33+
* calling {@link Subscription#request(long)}. In practice this means
34+
* no initialization should happen after the {@code request()} call and
35+
* additional behavior is thread safe in respect to {@code onNext}.
36+
*
37+
* {@inheritDoc}
38+
*/
39+
@Override
40+
void onSubscribe(Subscription s);
41+
}

Diff for: src/main/java/io/reactivex/Maybe.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public static <T> Flowable<T> concat(Publisher<? extends MaybeSource<? extends T
254254
public static <T> Flowable<T> concat(Publisher<? extends MaybeSource<? extends T>> sources, int prefetch) {
255255
ObjectHelper.requireNonNull(sources, "sources is null");
256256
ObjectHelper.verifyPositive(prefetch, "prefetch");
257-
return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, MaybeToPublisher.instance(), prefetch, ErrorMode.IMMEDIATE));
257+
return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, MaybeToPublisher.instance(), prefetch, ErrorMode.IMMEDIATE));
258258
}
259259

260260
/**
@@ -827,7 +827,7 @@ public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>
827827
@SchedulerSupport(SchedulerSupport.NONE)
828828
@SuppressWarnings({ "unchecked", "rawtypes" })
829829
public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
830-
return RxJavaPlugins.onAssembly(new FlowableFlatMap(sources, MaybeToPublisher.instance(), false, maxConcurrency, Flowable.bufferSize()));
830+
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, Flowable.bufferSize()));
831831
}
832832

833833
/**

Diff for: src/main/java/io/reactivex/Single.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,9 @@ public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends
188188
@SchedulerSupport(SchedulerSupport.NONE)
189189
@SuppressWarnings({ "unchecked", "rawtypes" })
190190
public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends T>> sources, int prefetch) {
191+
ObjectHelper.requireNonNull(sources, "sources is null");
191192
ObjectHelper.verifyPositive(prefetch, "prefetch");
192-
return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
193+
return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
193194
}
194195

195196
/**
@@ -684,7 +685,7 @@ public static <T> Flowable<T> merge(Iterable<? extends SingleSource<? extends T>
684685
@SchedulerSupport(SchedulerSupport.NONE)
685686
@SuppressWarnings({ "unchecked", "rawtypes" })
686687
public static <T> Flowable<T> merge(Publisher<? extends SingleSource<? extends T>> sources) {
687-
return RxJavaPlugins.onAssembly(new FlowableFlatMap(sources, SingleInternalHelper.toFlowable(), false, Integer.MAX_VALUE, Flowable.bufferSize()));
688+
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), false, Integer.MAX_VALUE, Flowable.bufferSize()));
688689
}
689690

690691
/**

Diff for: src/main/java/io/reactivex/internal/fuseable/ConditionalSubscriber.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.internal.fuseable;
1515

16-
import org.reactivestreams.Subscriber;
16+
import io.reactivex.FlowableSubscriber;
1717

1818
/**
1919
* A Subscriber with an additional onNextIf(T) method that
@@ -25,7 +25,7 @@
2525
*
2626
* @param <T> the value type
2727
*/
28-
public interface ConditionalSubscriber<T> extends Subscriber<T> {
28+
public interface ConditionalSubscriber<T> extends FlowableSubscriber<T> {
2929
/**
3030
* Conditionally takes the value.
3131
* @param t the value to deliver

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableConcat.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void subscribeActual(CompletableObserver s) {
4242

4343
static final class CompletableConcatSubscriber
4444
extends AtomicInteger
45-
implements Subscriber<CompletableSource>, Disposable {
45+
implements FlowableSubscriber<CompletableSource>, Disposable {
4646
private static final long serialVersionUID = 9032184911934499404L;
4747

4848
final CompletableObserver actual;

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableFromPublisher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ protected void subscribeActual(final CompletableObserver cs) {
3232
flowable.subscribe(new FromPublisherSubscriber<T>(cs));
3333
}
3434

35-
static final class FromPublisherSubscriber<T> implements Subscriber<T>, Disposable {
35+
static final class FromPublisherSubscriber<T> implements FlowableSubscriber<T>, Disposable {
3636

3737
final CompletableObserver cs;
3838

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableMerge.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void subscribeActual(CompletableObserver s) {
4343

4444
static final class CompletableMergeSubscriber
4545
extends AtomicInteger
46-
implements Subscriber<CompletableSource>, Disposable {
46+
implements FlowableSubscriber<CompletableSource>, Disposable {
4747

4848
private static final long serialVersionUID = -2108443387387077490L;
4949

Diff for: src/main/java/io/reactivex/internal/operators/flowable/AbstractFlowableWithUpstream.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@ abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R> implements
3131
/**
3232
* The upstream source Publisher.
3333
*/
34-
protected final Publisher<T> source;
34+
protected final Flowable<T> source;
3535

3636
/**
3737
* Constructs a FlowableSource wrapping the given non-null (verified)
3838
* source Publisher.
3939
* @param source the source (upstream) Publisher instance, not null (verified)
4040
*/
41-
AbstractFlowableWithUpstream(Publisher<T> source) {
41+
AbstractFlowableWithUpstream(Flowable<T> source) {
4242
this.source = ObjectHelper.requireNonNull(source, "source is null");
4343
}
4444

Diff for: src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@
1717
import java.util.concurrent.atomic.AtomicReference;
1818
import java.util.concurrent.locks.*;
1919

20-
import org.reactivestreams.*;
20+
import org.reactivestreams.Subscription;
2121

22+
import io.reactivex.*;
2223
import io.reactivex.disposables.Disposable;
2324
import io.reactivex.exceptions.MissingBackpressureException;
2425
import io.reactivex.internal.queue.SpscArrayQueue;
2526
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2627
import io.reactivex.internal.util.*;
2728

2829
public final class BlockingFlowableIterable<T> implements Iterable<T> {
29-
final Publisher<? extends T> source;
30+
final Flowable<? extends T> source;
3031

3132
final int bufferSize;
3233

33-
public BlockingFlowableIterable(Publisher<? extends T> source, int bufferSize) {
34+
public BlockingFlowableIterable(Flowable<? extends T> source, int bufferSize) {
3435
this.source = source;
3536
this.bufferSize = bufferSize;
3637
}
@@ -44,7 +45,7 @@ public Iterator<T> iterator() {
4445

4546
static final class BlockingFlowableIterator<T>
4647
extends AtomicReference<Subscription>
47-
implements Subscriber<T>, Iterator<T>, Runnable, Disposable {
48+
implements FlowableSubscriber<T>, Iterator<T>, Runnable, Disposable {
4849

4950
private static final long serialVersionUID = 6695226475494099826L;
5051

Diff for: src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515

1616
import java.util.*;
1717

18-
import org.reactivestreams.Publisher;
19-
18+
import io.reactivex.Flowable;
2019
import io.reactivex.internal.util.*;
2120
import io.reactivex.subscribers.DefaultSubscriber;
2221

@@ -30,11 +29,11 @@
3029
*/
3130
public final class BlockingFlowableMostRecent<T> implements Iterable<T> {
3231

33-
final Publisher<? extends T> source;
32+
final Flowable<? extends T> source;
3433

3534
final T initialValue;
3635

37-
public BlockingFlowableMostRecent(Publisher<? extends T> source, T initialValue) {
36+
public BlockingFlowableMostRecent(Flowable<? extends T> source, T initialValue) {
3837
this.source = source;
3938
this.initialValue = initialValue;
4039
}

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableAll.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import org.reactivestreams.*;
1616

17+
import io.reactivex.*;
1718
import io.reactivex.exceptions.Exceptions;
1819
import io.reactivex.functions.Predicate;
1920
import io.reactivex.internal.subscriptions.*;
@@ -23,7 +24,7 @@ public final class FlowableAll<T> extends AbstractFlowableWithUpstream<T, Boolea
2324

2425
final Predicate<? super T> predicate;
2526

26-
public FlowableAll(Publisher<T> source, Predicate<? super T> predicate) {
27+
public FlowableAll(Flowable<T> source, Predicate<? super T> predicate) {
2728
super(source);
2829
this.predicate = predicate;
2930
}
@@ -33,7 +34,7 @@ protected void subscribeActual(Subscriber<? super Boolean> s) {
3334
source.subscribe(new AllSubscriber<T>(s, predicate));
3435
}
3536

36-
static final class AllSubscriber<T> extends DeferredScalarSubscription<Boolean> implements Subscriber<T> {
37+
static final class AllSubscriber<T> extends DeferredScalarSubscription<Boolean> implements FlowableSubscriber<T> {
3738

3839
private static final long serialVersionUID = -3521127104134758517L;
3940
final Predicate<? super T> predicate;

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableAllSingle.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424

2525
public final class FlowableAllSingle<T> extends Single<Boolean> implements FuseToFlowable<Boolean> {
2626

27-
final Publisher<T> source;
27+
final Flowable<T> source;
2828

2929
final Predicate<? super T> predicate;
3030

31-
public FlowableAllSingle(Publisher<T> source, Predicate<? super T> predicate) {
31+
public FlowableAllSingle(Flowable<T> source, Predicate<? super T> predicate) {
3232
this.source = source;
3333
this.predicate = predicate;
3434
}
@@ -43,7 +43,7 @@ public Flowable<Boolean> fuseToFlowable() {
4343
return RxJavaPlugins.onAssembly(new FlowableAll<T>(source, predicate));
4444
}
4545

46-
static final class AllSubscriber<T> implements Subscriber<T>, Disposable {
46+
static final class AllSubscriber<T> implements FlowableSubscriber<T>, Disposable {
4747

4848
final SingleObserver<? super Boolean> actual;
4949

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import org.reactivestreams.*;
1919

20-
import io.reactivex.Flowable;
20+
import io.reactivex.*;
2121
import io.reactivex.exceptions.Exceptions;
2222
import io.reactivex.internal.subscriptions.*;
2323
import io.reactivex.plugins.RxJavaPlugins;
@@ -147,7 +147,7 @@ public void cancel() {
147147
}
148148
}
149149

150-
static final class AmbInnerSubscriber<T> extends AtomicReference<Subscription> implements Subscriber<T>, Subscription {
150+
static final class AmbInnerSubscriber<T> extends AtomicReference<Subscription> implements FlowableSubscriber<T>, Subscription {
151151

152152
private static final long serialVersionUID = -1185974347409665484L;
153153
final AmbCoordinator<T> parent;

0 commit comments

Comments
 (0)