Skip to content

Commit 2037399

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Rename Flowable Base Interface Types for consistency (ReactiveX#4299)
1 parent ed9a2c4 commit 2037399

File tree

12 files changed

+91
-53
lines changed

12 files changed

+91
-53
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,26 +44,6 @@ public abstract class Flowable<T> implements Publisher<T> {
4444
static {
4545
BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
4646
}
47-
48-
/**
49-
* Interface to map/wrap a downstream subscriber to an upstream subscriber.
50-
*
51-
* @param <Downstream> the value type of the downstream
52-
* @param <Upstream> the value type of the upstream
53-
*/
54-
public interface Operator<Downstream, Upstream> extends Function<Subscriber<? super Downstream>, Subscriber<? super Upstream>> {
55-
56-
}
57-
58-
/**
59-
* Interface to compose observables.
60-
*
61-
* @param <T> the upstream value type
62-
* @param <R> the downstream value type
63-
*/
64-
public interface Transformer<T, R> extends Function<Flowable<T>, Publisher<? extends R>> {
65-
66-
}
6747

6848
/** A never observable instance as there is no need to instantiate this more than once. */
6949
static final Flowable<Object> NEVER = new Flowable<Object>() { // FIXME factor out
@@ -1525,7 +1505,7 @@ public U call() {
15251505
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
15261506
@SchedulerSupport(SchedulerSupport.NONE)
15271507
// TODO generics
1528-
public final <R> Flowable<R> compose(Transformer<T, R> composer) {
1508+
public final <R> Flowable<R> compose(FlowableTransformer<T, R> composer) {
15291509
return fromPublisher(to(composer));
15301510
}
15311511

@@ -1929,7 +1909,7 @@ public final Flowable<T> doOnLifecycle(final Consumer<? super Subscription> onSu
19291909
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
19301910
Objects.requireNonNull(onRequest, "onRequest is null");
19311911
Objects.requireNonNull(onCancel, "onCancel is null");
1932-
return lift(new Operator<T, T>() {
1912+
return lift(new FlowableOperator<T, T>() {
19331913
@Override
19341914
public Subscriber<? super T> apply(Subscriber<? super T> s) {
19351915
return new SubscriptionLambdaSubscriber<T>(s, onSubscribe, onRequest, onCancel);
@@ -2335,7 +2315,7 @@ public final Flowable<T> last(T defaultValue) {
23352315

23362316
@BackpressureSupport(BackpressureKind.SPECIAL)
23372317
@SchedulerSupport(SchedulerSupport.NONE)
2338-
public final <R> Flowable<R> lift(Operator<? extends R, ? super T> lifter) {
2318+
public final <R> Flowable<R> lift(FlowableOperator<? extends R, ? super T> lifter) {
23392319
Objects.requireNonNull(lifter, "lifter is null");
23402320
// using onSubscribe so the fusing has access to the underlying raw Publisher
23412321
return new FlowableLift<R, T>(this, lifter);
@@ -2918,7 +2898,7 @@ public final <R> Flowable<R> scanWith(Callable<R> seedSupplier, BiFunction<R, ?
29182898
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
29192899
@SchedulerSupport(SchedulerSupport.NONE)
29202900
public final Flowable<T> serialize() {
2921-
return lift(new Operator<T, T>() {
2901+
return lift(new FlowableOperator<T, T>() {
29222902
@Override
29232903
public Subscriber<? super T> apply(Subscriber<? super T> s) {
29242904
return new SerializedSubscriber<T>(s);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import org.reactivestreams.Subscriber;
17+
18+
import io.reactivex.functions.Function;
19+
20+
/**
21+
* Interface to map/wrap a downstream subscriber to an upstream subscriber.
22+
*
23+
* @param <Downstream> the value type of the downstream
24+
* @param <Upstream> the value type of the upstream
25+
*/
26+
public interface FlowableOperator<Downstream, Upstream> extends Function<Subscriber<? super Downstream>, Subscriber<? super Upstream>> {
27+
28+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import org.reactivestreams.Publisher;
17+
18+
import io.reactivex.functions.Function;
19+
20+
/**
21+
* Interface to compose observables.
22+
*
23+
* @param <T> the upstream value type
24+
* @param <R> the downstream value type
25+
*/
26+
public interface FlowableTransformer<T, R> extends Function<Flowable<T>, Publisher<? extends R>> {
27+
28+
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.reactivestreams.*;
1717

1818
import io.reactivex.Flowable;
19+
import io.reactivex.FlowableOperator;
1920
import io.reactivex.plugins.RxJavaPlugins;
2021

2122
/**
@@ -29,11 +30,11 @@
2930
*/
3031
public final class FlowableLift<R, T> extends Flowable<R> {
3132
/** The actual operator. */
32-
final Operator<? extends R, ? super T> operator;
33+
final FlowableOperator<? extends R, ? super T> operator;
3334
/** The source publisher. */
3435
final Publisher<? extends T> source;
3536

36-
public FlowableLift(Publisher<? extends T> source, Operator<? extends R, ? super T> operator) {
37+
public FlowableLift(Publisher<? extends T> source, FlowableOperator<? extends R, ? super T> operator) {
3738
this.source = source;
3839
this.operator = operator;
3940
}
@@ -42,7 +43,7 @@ public FlowableLift(Publisher<? extends T> source, Operator<? extends R, ? super
4243
* Returns the operator of this lift publisher.
4344
* @return the operator of this lift publisher
4445
*/
45-
public Operator<? extends R, ? super T> operator() {
46+
public FlowableOperator<? extends R, ? super T> operator() {
4647
return operator;
4748
}
4849

src/test/java/io/reactivex/flowable/FlowableConversionTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.reactivestreams.*;
2222

2323
import io.reactivex.Flowable;
24-
import io.reactivex.Flowable.Operator;
24+
import io.reactivex.FlowableOperator;
2525
import io.reactivex.exceptions.Exceptions;
2626
import io.reactivex.functions.*;
2727
import io.reactivex.internal.operators.flowable.*;
@@ -55,7 +55,7 @@ public void subscribe(Subscriber<T> subscriber) {
5555
onSubscribe.subscribe(subscriber);
5656
}
5757

58-
public <R> CylonDetectorObservable<R> lift(Operator<? extends R, ? super T> operator) {
58+
public <R> CylonDetectorObservable<R> lift(FlowableOperator<? extends R, ? super T> operator) {
5959
return x(new RobotConversionFunc<T, R>(operator));
6060
}
6161

@@ -105,9 +105,9 @@ private static void throwOutTheAirlock(Object cylon) {
105105
}
106106

107107
public static class RobotConversionFunc<T, R> implements Function<Publisher<T>, CylonDetectorObservable<R>> {
108-
private Operator<? extends R, ? super T> operator;
108+
private FlowableOperator<? extends R, ? super T> operator;
109109

110-
public RobotConversionFunc(Operator<? extends R, ? super T> operator) {
110+
public RobotConversionFunc(FlowableOperator<? extends R, ? super T> operator) {
111111
this.operator = operator;
112112
}
113113

src/test/java/io/reactivex/flowable/FlowableCovarianceTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.reactivestreams.Publisher;
2525

2626
import io.reactivex.Flowable;
27-
import io.reactivex.Flowable.Transformer;
27+
import io.reactivex.FlowableTransformer;
2828
import io.reactivex.flowables.GroupedFlowable;
2929
import io.reactivex.functions.*;
3030
import io.reactivex.subscribers.TestSubscriber;
@@ -92,7 +92,7 @@ public void accept(Movie v) {
9292
System.out.println(v);
9393
}
9494
})
95-
.compose(new Transformer<Movie, Object>() {
95+
.compose(new FlowableTransformer<Movie, Object>() {
9696
@Override
9797
public Publisher<? extends Object> apply(Flowable<Movie> m) {
9898
return m.concatWith(Flowable.just(new ActionMovie()));
@@ -118,7 +118,7 @@ public String apply(Object v) {
118118
@Test
119119
public void testCovarianceOfCompose() {
120120
Flowable<HorrorMovie> movie = Flowable.just(new HorrorMovie());
121-
Flowable<Movie> movie2 = movie.compose(new Transformer<HorrorMovie, Movie>() {
121+
Flowable<Movie> movie2 = movie.compose(new FlowableTransformer<HorrorMovie, Movie>() {
122122
@Override
123123
public Publisher<? extends Movie> apply(Flowable<HorrorMovie> t) {
124124
return Flowable.just(new Movie());
@@ -130,7 +130,7 @@ public Publisher<? extends Movie> apply(Flowable<HorrorMovie> t) {
130130
@Test
131131
public void testCovarianceOfCompose2() {
132132
Flowable<Movie> movie = Flowable.<Movie> just(new HorrorMovie());
133-
Flowable<HorrorMovie> movie2 = movie.compose(new Transformer<Movie, HorrorMovie>() {
133+
Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<Movie, HorrorMovie>() {
134134
@Override
135135
public Publisher<? extends HorrorMovie> apply(Flowable<Movie> t) {
136136
return Flowable.just(new HorrorMovie());
@@ -142,7 +142,7 @@ public Publisher<? extends HorrorMovie> apply(Flowable<Movie> t) {
142142
@Test
143143
public void testCovarianceOfCompose3() {
144144
Flowable<Movie> movie = Flowable.<Movie>just(new HorrorMovie());
145-
Flowable<HorrorMovie> movie2 = movie.compose(new Transformer<Movie, HorrorMovie>() {
145+
Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<Movie, HorrorMovie>() {
146146
@Override
147147
public Publisher<? extends HorrorMovie> apply(Flowable<Movie> t) {
148148
return Flowable.just(new HorrorMovie()).map(new Function<HorrorMovie, HorrorMovie>() {
@@ -160,7 +160,7 @@ public HorrorMovie apply(HorrorMovie v) {
160160
@Test
161161
public void testCovarianceOfCompose4() {
162162
Flowable<HorrorMovie> movie = Flowable.just(new HorrorMovie());
163-
Flowable<HorrorMovie> movie2 = movie.compose(new Transformer<HorrorMovie, HorrorMovie>() {
163+
Flowable<HorrorMovie> movie2 = movie.compose(new FlowableTransformer<HorrorMovie, HorrorMovie>() {
164164
@Override
165165
public Publisher<? extends HorrorMovie> apply(Flowable<HorrorMovie> t1) {
166166
return t1.map(new Function<HorrorMovie, HorrorMovie>() {
@@ -209,7 +209,7 @@ public Flowable<Movie> apply(List<List<Movie>> listOfLists) {
209209
}
210210
};
211211

212-
static Transformer<List<Movie>, Movie> deltaTransformer = new Transformer<List<Movie>, Movie>() {
212+
static FlowableTransformer<List<Movie>, Movie> deltaTransformer = new FlowableTransformer<List<Movie>, Movie>() {
213213
@Override
214214
public Publisher<? extends Movie> apply(Flowable<List<Movie>> movieList) {
215215
return movieList

src/test/java/io/reactivex/flowable/FlowableNullTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.reactivestreams.*;
2222

2323
import io.reactivex.*;
24-
import io.reactivex.Flowable.Operator;
24+
import io.reactivex.FlowableOperator;
2525
import io.reactivex.Optional;
2626
import io.reactivex.exceptions.TestException;
2727
import io.reactivex.functions.*;
@@ -1642,7 +1642,7 @@ public void liftNull() {
16421642

16431643
@Test(expected = NullPointerException.class)
16441644
public void liftReturnsNull() {
1645-
just1.lift(new Operator<Object, Integer>() {
1645+
just1.lift(new FlowableOperator<Object, Integer>() {
16461646
@Override
16471647
public Subscriber<? super Integer> apply(Subscriber<? super Object> s) {
16481648
return null;

src/test/java/io/reactivex/flowable/FlowableSubscriberTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.reactivestreams.*;
2424

2525
import io.reactivex.Flowable;
26-
import io.reactivex.Flowable.Operator;
26+
import io.reactivex.FlowableOperator;
2727
import io.reactivex.internal.functions.Functions;
2828
import io.reactivex.subscribers.*;
2929

@@ -79,7 +79,7 @@ public void cancel() {
7979
@Test
8080
public void testRequestFromChainedOperator() throws Exception {
8181
TestSubscriber<String> s = new TestSubscriber<String>();
82-
Operator<String, String> o = new Operator<String, String>() {
82+
FlowableOperator<String, String> o = new FlowableOperator<String, String>() {
8383
@Override
8484
public Subscriber<? super String> apply(final Subscriber<? super String> s1) {
8585
return new Subscriber<String>() {
@@ -132,7 +132,7 @@ public void cancel() {
132132
@Test
133133
public void testRequestFromDecoupledOperator() throws Exception {
134134
TestSubscriber<String> s = new TestSubscriber<String>(0L);
135-
Operator<String, String> o = new Operator<String, String>() {
135+
FlowableOperator<String, String> o = new FlowableOperator<String, String>() {
136136
@Override
137137
public Subscriber<? super String> apply(final Subscriber<? super String> s1) {
138138
return new Subscriber<String>() {
@@ -186,7 +186,7 @@ public void cancel() {
186186
public void testRequestFromDecoupledOperatorThatRequestsN() throws Exception {
187187
TestSubscriber<String> s = new TestSubscriber<String>();
188188
final AtomicLong innerR = new AtomicLong();
189-
Operator<String, String> o = new Operator<String, String>() {
189+
FlowableOperator<String, String> o = new FlowableOperator<String, String>() {
190190
@Override
191191
public Subscriber<? super String> apply(Subscriber<? super String> child) {
192192
// we want to decouple the chain so set our own Producer on the child instead of it coming from the parent
@@ -424,7 +424,7 @@ public void onNext(Integer t) {
424424
@Test
425425
public void testOnStartCalledOnceViaLift() {
426426
final AtomicInteger c = new AtomicInteger();
427-
Flowable.just(1, 2, 3, 4).lift(new Operator<Integer, Integer>() {
427+
Flowable.just(1, 2, 3, 4).lift(new FlowableOperator<Integer, Integer>() {
428428

429429
@Override
430430
public Subscriber<? super Integer> apply(final Subscriber<? super Integer> child) {

src/test/java/io/reactivex/flowable/FlowableTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.reactivestreams.*;
2727

2828
import io.reactivex.*;
29-
import io.reactivex.Flowable.Transformer;
29+
import io.reactivex.FlowableTransformer;
3030
import io.reactivex.disposables.Disposable;
3131
import io.reactivex.flowables.ConnectableFlowable;
3232
import io.reactivex.functions.*;
@@ -999,7 +999,7 @@ public void accept(List<Boolean> booleans) {
999999
@Test
10001000
public void testCompose() {
10011001
TestSubscriber<String> ts = new TestSubscriber<String>();
1002-
Flowable.just(1, 2, 3).compose(new Transformer<Integer, String>() {
1002+
Flowable.just(1, 2, 3).compose(new FlowableTransformer<Integer, String>() {
10031003
@Override
10041004
public Publisher<String> apply(Flowable<Integer> t1) {
10051005
return t1.map(new Function<Integer, String>() {

src/test/java/io/reactivex/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.reactivestreams.*;
2525

2626
import io.reactivex.*;
27-
import io.reactivex.Flowable.Operator;
27+
import io.reactivex.FlowableOperator;
2828
import io.reactivex.functions.Function;
2929
import io.reactivex.internal.subscriptions.BooleanSubscription;
3030
import io.reactivex.schedulers.Schedulers;
@@ -151,7 +151,7 @@ public Flowable<String> apply(Throwable t1) {
151151
@Ignore("Failed operator may leave the child subscriber in an inconsistent state which prevents further error delivery.")
152152
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperator() {
153153
TestSubscriber<String> ts = new TestSubscriber<String>();
154-
Flowable.just(1).lift(new Operator<String, Integer>() {
154+
Flowable.just(1).lift(new FlowableOperator<String, Integer>() {
155155

156156
@Override
157157
public Subscriber<? super Integer> apply(Subscriber<? super String> t1) {
@@ -184,7 +184,7 @@ public Flowable<String> apply(Throwable t1) {
184184
@Ignore("A crashing operator may leave the downstream in an inconsistent state and not suitable for event delivery")
185185
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperatorOnNext() {
186186
TestSubscriber<String> ts = new TestSubscriber<String>();
187-
Flowable.just(1).lift(new Operator<String, Integer>() {
187+
Flowable.just(1).lift(new FlowableOperator<String, Integer>() {
188188

189189
@Override
190190
public Subscriber<? super Integer> apply(final Subscriber<? super String> t1) {

0 commit comments

Comments
 (0)