Skip to content

Commit 7f07f46

Browse files
JakeWhartonakarnokd
authored andcommitted
Extract 'WithUpstream' interfaces. (#4326)
This allows use with types that do not extend directly from their base stream types.
1 parent 344453f commit 7f07f46

File tree

153 files changed

+250
-208
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

153 files changed

+250
-208
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import org.reactivestreams.Publisher;
17+
18+
import io.reactivex.Flowable;
19+
import io.reactivex.internal.functions.Objects;
20+
21+
/**
22+
* Abstract base class for operators that take an upstream
23+
* source {@link Publisher}.
24+
*
25+
* @param <T> the upstream value type
26+
* @param <R> the output value type
27+
*/
28+
abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R> implements FlowableWithUpstream<T> {
29+
30+
/**
31+
* The upstream source Publisher.
32+
*/
33+
protected final Publisher<T> source;
34+
35+
/**
36+
* Constructs a FlowableSource wrapping the given non-null (verified)
37+
* source Publisher.
38+
* @param source the source (upstream) Publisher instance, not null (verified)
39+
*/
40+
public AbstractFlowableWithUpstream(Publisher<T> source) {
41+
this.source = Objects.requireNonNull(source, "source is null");
42+
}
43+
44+
@Override
45+
public final Publisher<T> source() {
46+
return source;
47+
}
48+
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableAll.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import io.reactivex.internal.subscriptions.*;
1919
import io.reactivex.plugins.RxJavaPlugins;
2020

21-
public final class FlowableAll<T> extends FlowableWithUpstream<T, Boolean> {
21+
public final class FlowableAll<T> extends AbstractFlowableWithUpstream<T, Boolean> {
2222

2323
final Predicate<? super T> predicate;
2424

src/main/java/io/reactivex/internal/operators/flowable/FlowableAny.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import io.reactivex.functions.Predicate;
1818
import io.reactivex.internal.subscriptions.*;
1919

20-
public final class FlowableAny<T> extends FlowableWithUpstream<T, Boolean> {
20+
public final class FlowableAny<T> extends AbstractFlowableWithUpstream<T, Boolean> {
2121
final Predicate<? super T> predicate;
2222
public FlowableAny(Publisher<T> source, Predicate<? super T> predicate) {
2323
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.internal.util.*;
2727
import io.reactivex.plugins.RxJavaPlugins;
2828

29-
public final class FlowableBuffer<T, C extends Collection<? super T>> extends FlowableWithUpstream<T, C> {
29+
public final class FlowableBuffer<T, C extends Collection<? super T>> extends AbstractFlowableWithUpstream<T, C> {
3030
final int size;
3131

3232
final int skip;

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import io.reactivex.subscribers.*;
3131

3232
public final class FlowableBufferBoundary<T, U extends Collection<? super T>, Open, Close>
33-
extends FlowableWithUpstream<T, U> {
33+
extends AbstractFlowableWithUpstream<T, U> {
3434
final Callable<U> bufferSupplier;
3535
final Publisher<? extends Open> bufferOpen;
3636
final Function<? super Open, ? extends Publisher<? extends Close>> bufferClose;

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import io.reactivex.subscribers.*;
3030

3131
public final class FlowableBufferBoundarySupplier<T, U extends Collection<? super T>, B>
32-
extends FlowableWithUpstream<T, U> {
32+
extends AbstractFlowableWithUpstream<T, U> {
3333
final Callable<? extends Publisher<B>> boundarySupplier;
3434
final Callable<U> bufferSupplier;
3535

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferExactBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.subscribers.*;
2727

2828
public final class FlowableBufferExactBoundary<T, U extends Collection<? super T>, B>
29-
extends FlowableWithUpstream<T, U> {
29+
extends AbstractFlowableWithUpstream<T, U> {
3030
final Publisher<B> boundary;
3131
final Callable<U> bufferSupplier;
3232

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import io.reactivex.internal.util.QueueDrainHelper;
3030
import io.reactivex.subscribers.SerializedSubscriber;
3131

32-
public final class FlowableBufferTimed<T, U extends Collection<? super T>> extends FlowableWithUpstream<T, U> {
32+
public final class FlowableBufferTimed<T, U extends Collection<? super T>> extends AbstractFlowableWithUpstream<T, U> {
3333

3434
final long timespan;
3535
final long timeskip;

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
*
2929
* @param <T> the source element type
3030
*/
31-
public final class FlowableCache<T> extends FlowableWithUpstream<T, T> {
31+
public final class FlowableCache<T> extends AbstractFlowableWithUpstream<T, T> {
3232
/** The cache and replay state. */
3333
final CacheState<T> state;
3434

src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.functions.BiConsumer;
2121
import io.reactivex.internal.subscriptions.*;
2222

23-
public final class FlowableCollect<T, U> extends FlowableWithUpstream<T, U> {
23+
public final class FlowableCollect<T, U> extends AbstractFlowableWithUpstream<T, U> {
2424

2525
final Callable<? extends U> initialSupplier;
2626
final BiConsumer<? super U, ? super T> collector;

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.internal.util.ExceptionHelper;
2727
import io.reactivex.plugins.RxJavaPlugins;
2828

29-
public final class FlowableConcatMap<T, R> extends FlowableWithUpstream<T, R> {
29+
public final class FlowableConcatMap<T, R> extends AbstractFlowableWithUpstream<T, R> {
3030

3131
final Function<? super T, ? extends Publisher<? extends R>> mapper;
3232

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.reactivex.internal.util.*;
2929
import io.reactivex.plugins.RxJavaPlugins;
3030

31-
public class FlowableConcatMapEager<T, R> extends FlowableWithUpstream<T, R> {
31+
public class FlowableConcatMapEager<T, R> extends AbstractFlowableWithUpstream<T, R> {
3232

3333
final Function<? super T, ? extends Publisher<? extends R>> mapper;
3434

src/main/java/io/reactivex/internal/operators/flowable/FlowableCount.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.internal.subscriptions.*;
1919

20-
public final class FlowableCount<T> extends FlowableWithUpstream<T, Long> {
20+
public final class FlowableCount<T> extends AbstractFlowableWithUpstream<T, Long> {
2121

2222
public FlowableCount(Publisher<T> source) {
2323
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import io.reactivex.plugins.RxJavaPlugins;
2626
import io.reactivex.subscribers.*;
2727

28-
public final class FlowableDebounce<T, U> extends FlowableWithUpstream<T, T> {
28+
public final class FlowableDebounce<T, U> extends AbstractFlowableWithUpstream<T, T> {
2929
final Function<? super T, ? extends Publisher<U>> debounceSelector;
3030

3131
public FlowableDebounce(Publisher<T> source, Function<? super T, ? extends Publisher<U>> debounceSelector) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import io.reactivex.plugins.RxJavaPlugins;
2828
import io.reactivex.subscribers.SerializedSubscriber;
2929

30-
public final class FlowableDebounceTimed<T> extends FlowableWithUpstream<T, T> {
30+
public final class FlowableDebounceTimed<T> extends AbstractFlowableWithUpstream<T, T> {
3131
final long timeout;
3232
final TimeUnit unit;
3333
final Scheduler scheduler;

src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2323
import io.reactivex.subscribers.SerializedSubscriber;
2424

25-
public final class FlowableDelay<T> extends FlowableWithUpstream<T, T> {
25+
public final class FlowableDelay<T> extends AbstractFlowableWithUpstream<T, T> {
2626
final long delay;
2727
final TimeUnit unit;
2828
final Scheduler scheduler;

src/main/java/io/reactivex/internal/operators/flowable/FlowableDematerialize.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2020
import io.reactivex.plugins.RxJavaPlugins;
2121

22-
public final class FlowableDematerialize<T> extends FlowableWithUpstream<Try<Optional<T>>, T> {
22+
public final class FlowableDematerialize<T> extends AbstractFlowableWithUpstream<Try<Optional<T>>, T> {
2323

2424
public FlowableDematerialize(Publisher<Try<Optional<T>>> source) {
2525
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableDetach.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import io.reactivex.internal.subscribers.flowable.EmptyComponent;
1919
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2020

21-
public final class FlowableDetach<T> extends FlowableWithUpstream<T, T> {
21+
public final class FlowableDetach<T> extends AbstractFlowableWithUpstream<T, T> {
2222

2323
public FlowableDetach(Publisher<T> source) {
2424
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.internal.functions.*;
2424
import io.reactivex.internal.subscriptions.*;
2525

26-
public final class FlowableDistinct<T, K> extends FlowableWithUpstream<T, T> {
26+
public final class FlowableDistinct<T, K> extends AbstractFlowableWithUpstream<T, T> {
2727
final Function<? super T, K> keySelector;
2828
final Callable<? extends Predicate<? super K>> predicateSupplier;
2929

src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2020
import io.reactivex.internal.subscribers.flowable.*;
2121

22-
public final class FlowableDistinctUntilChanged<T> extends FlowableWithUpstream<T, T> {
22+
public final class FlowableDistinctUntilChanged<T> extends AbstractFlowableWithUpstream<T, T> {
2323

2424
final BiPredicate<? super T, ? super T> comparer;
2525

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.internal.subscribers.flowable.*;
2222
import io.reactivex.plugins.RxJavaPlugins;
2323

24-
public final class FlowableDoOnEach<T> extends FlowableWithUpstream<T, T> {
24+
public final class FlowableDoOnEach<T> extends AbstractFlowableWithUpstream<T, T> {
2525
final Consumer<? super T> onNext;
2626
final Consumer<? super Throwable> onError;
2727
final Runnable onComplete;

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.reactivestreams.Subscriber;
2020
import org.reactivestreams.Subscription;
2121

22-
public final class FlowableDoOnLifecycle<T> extends FlowableWithUpstream<T, T> {
22+
public final class FlowableDoOnLifecycle<T> extends AbstractFlowableWithUpstream<T, T> {
2323
private final Consumer<? super Subscription> onSubscribe;
2424
private final LongConsumer onRequest;
2525
private final Runnable onCancel;

src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.internal.subscriptions.*;
1919

20-
public final class FlowableElementAt<T> extends FlowableWithUpstream<T, T> {
20+
public final class FlowableElementAt<T> extends AbstractFlowableWithUpstream<T, T> {
2121
final long index;
2222
final T defaultValue;
2323
public FlowableElementAt(Publisher<T> source, long index, T defaultValue) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.internal.fuseable.*;
2020
import io.reactivex.internal.subscribers.flowable.*;
2121

22-
public final class FlowableFilter<T> extends FlowableWithUpstream<T, T> {
22+
public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T, T> {
2323
final Predicate<? super T> predicate;
2424
public FlowableFilter(Publisher<T> source, Predicate<? super T> predicate) {
2525
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2727
import io.reactivex.internal.util.BackpressureHelper;
2828

29-
public final class FlowableFlatMap<T, U> extends FlowableWithUpstream<T, U> {
29+
public final class FlowableFlatMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
3030
final Function<? super T, ? extends Publisher<? extends U>> mapper;
3131
final boolean delayErrors;
3232
final int maxConcurrency;

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.reactivex.internal.util.*;
2929
import io.reactivex.plugins.RxJavaPlugins;
3030

31-
public final class FlowableFlattenIterable<T, R> extends FlowableWithUpstream<T, R> {
31+
public final class FlowableFlattenIterable<T, R> extends AbstractFlowableWithUpstream<T, R> {
3232

3333
final Function<? super T, ? extends Iterable<? extends R>> mapper;
3434

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.reactivex.internal.util.BackpressureHelper;
2929
import io.reactivex.plugins.RxJavaPlugins;
3030

31-
public final class FlowableGroupBy<T, K, V> extends FlowableWithUpstream<T, GroupedFlowable<K, V>> {
31+
public final class FlowableGroupBy<T, K, V> extends AbstractFlowableWithUpstream<T, GroupedFlowable<K, V>> {
3232
final Function<? super T, ? extends K> keySelector;
3333
final Function<? super T, ? extends V> valueSelector;
3434
final int bufferSize;

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import io.reactivex.plugins.RxJavaPlugins;
3434
import io.reactivex.processors.UnicastProcessor;
3535

36-
public class FlowableGroupJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends FlowableWithUpstream<TLeft, R> {
36+
public class FlowableGroupJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends AbstractFlowableWithUpstream<TLeft, R> {
3737

3838
final Publisher<? extends TRight> other;
3939

src/main/java/io/reactivex/internal/operators/flowable/FlowableHide.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*
2424
* @since 2.0
2525
*/
26-
public class FlowableHide<T> extends FlowableWithUpstream<T, T> {
26+
public class FlowableHide<T> extends AbstractFlowableWithUpstream<T, T> {
2727

2828
public FlowableHide(Publisher<T> source) {
2929
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElements.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import io.reactivex.internal.fuseable.QueueSubscription;
1919
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2020

21-
public final class FlowableIgnoreElements<T> extends FlowableWithUpstream<T, T> {
21+
public final class FlowableIgnoreElements<T> extends AbstractFlowableWithUpstream<T, T> {
2222

2323
public FlowableIgnoreElements(Publisher<T> source) {
2424
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableJoin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import io.reactivex.internal.util.*;
3030
import io.reactivex.plugins.RxJavaPlugins;
3131

32-
public class FlowableJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends FlowableWithUpstream<TLeft, R> {
32+
public class FlowableJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends AbstractFlowableWithUpstream<TLeft, R> {
3333

3434
final Publisher<? extends TRight> other;
3535

src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* @param <T> the upstream value type
2828
* @param <R> the downstream parameter type
2929
*/
30-
public final class FlowableLift<R, T> extends FlowableWithUpstream<T, R> {
30+
public final class FlowableLift<R, T> extends AbstractFlowableWithUpstream<T, R> {
3131
/** The actual operator. */
3232
final FlowableOperator<? extends R, ? super T> operator;
3333

src/main/java/io/reactivex/internal/operators/flowable/FlowableMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2121
import io.reactivex.internal.subscribers.flowable.*;
2222

23-
public final class FlowableMap<T, U> extends FlowableWithUpstream<T, U> {
23+
public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
2424
final Function<? super T, ? extends U> mapper;
2525
public FlowableMap(Publisher<T> source, Function<? super T, ? extends U> mapper) {
2626
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2323
import io.reactivex.internal.util.BackpressureHelper;
2424

25-
public final class FlowableMapNotification<T, R> extends FlowableWithUpstream<T, R> {
25+
public final class FlowableMapNotification<T, R> extends AbstractFlowableWithUpstream<T, R> {
2626

2727
final Function<? super T, ? extends R> onNextMapper;
2828
final Function<? super Throwable, ? extends R> onErrorMapper;

src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2222
import io.reactivex.internal.util.BackpressureHelper;
2323

24-
public final class FlowableMaterialize<T> extends FlowableWithUpstream<T, Try<Optional<T>>> {
24+
public final class FlowableMaterialize<T> extends AbstractFlowableWithUpstream<T, Try<Optional<T>>> {
2525

2626
public FlowableMaterialize(Publisher<T> source) {
2727
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.internal.subscriptions.*;
2727
import io.reactivex.internal.util.BackpressureHelper;
2828

29-
public final class FlowableObserveOn<T> extends FlowableWithUpstream<T, T> {
29+
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
3030
final Scheduler scheduler;
3131

3232
final boolean delayError;

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2424
import io.reactivex.internal.util.BackpressureHelper;
2525

26-
public final class FlowableOnBackpressureBuffer<T> extends FlowableWithUpstream<T, T> {
26+
public final class FlowableOnBackpressureBuffer<T> extends AbstractFlowableWithUpstream<T, T> {
2727
final int bufferSize;
2828
final boolean unbounded;
2929
final boolean delayError;

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2222
import io.reactivex.internal.util.BackpressureHelper;
2323

24-
public final class FlowableOnBackpressureDrop<T> extends FlowableWithUpstream<T, T> implements Consumer<T> {
24+
public final class FlowableOnBackpressureDrop<T> extends AbstractFlowableWithUpstream<T, T> implements Consumer<T> {
2525

2626
final Consumer<? super T> onDrop;
2727

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2121
import io.reactivex.internal.util.BackpressureHelper;
2222

23-
public final class FlowableOnBackpressureLatest<T> extends FlowableWithUpstream<T, T> {
23+
public final class FlowableOnBackpressureLatest<T> extends AbstractFlowableWithUpstream<T, T> {
2424

2525
public FlowableOnBackpressureLatest(Publisher<T> source) {
2626
super(source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.internal.subscriptions.SubscriptionArbiter;
2121
import io.reactivex.plugins.RxJavaPlugins;
2222

23-
public final class FlowableOnErrorNext<T> extends FlowableWithUpstream<T, T> {
23+
public final class FlowableOnErrorNext<T> extends AbstractFlowableWithUpstream<T, T> {
2424
final Function<? super Throwable, ? extends Publisher<? extends T>> nextSupplier;
2525
final boolean allowFatal;
2626

0 commit comments

Comments
 (0)