@@ -64,13 +64,13 @@ public static <U, R> Flowable<R> multicastSelector(
64
64
* Child Subscribers will observe the events of the ConnectableObservable on the
65
65
* specified scheduler.
66
66
* @param <T> the value type
67
- * @param co the ConnectableFlowable to wrap
67
+ * @param cf the ConnectableFlowable to wrap
68
68
* @param scheduler the target scheduler
69
69
* @return the new ConnectableObservable instance
70
70
*/
71
- public static <T > ConnectableFlowable <T > observeOn (final ConnectableFlowable <T > co , final Scheduler scheduler ) {
72
- final Flowable <T > observable = co .observeOn (scheduler );
73
- return RxJavaPlugins .onAssembly (new ConnectableFlowableReplay <T >(co , observable ));
71
+ public static <T > ConnectableFlowable <T > observeOn (final ConnectableFlowable <T > cf , final Scheduler scheduler ) {
72
+ final Flowable <T > observable = cf .observeOn (scheduler );
73
+ return RxJavaPlugins .onAssembly (new ConnectableFlowableReplay <T >(cf , observable ));
74
74
}
75
75
76
76
/**
@@ -1100,9 +1100,9 @@ static final class MulticastFlowable<R, U> extends Flowable<R> {
1100
1100
1101
1101
@ Override
1102
1102
protected void subscribeActual (Subscriber <? super R > child ) {
1103
- ConnectableFlowable <U > co ;
1103
+ ConnectableFlowable <U > cf ;
1104
1104
try {
1105
- co = ObjectHelper .requireNonNull (connectableFactory .call (), "The connectableFactory returned null" );
1105
+ cf = ObjectHelper .requireNonNull (connectableFactory .call (), "The connectableFactory returned null" );
1106
1106
} catch (Throwable e ) {
1107
1107
Exceptions .throwIfFatal (e );
1108
1108
EmptySubscription .error (e , child );
@@ -1111,7 +1111,7 @@ protected void subscribeActual(Subscriber<? super R> child) {
1111
1111
1112
1112
Publisher <R > observable ;
1113
1113
try {
1114
- observable = ObjectHelper .requireNonNull (selector .apply (co ), "The selector returned a null Publisher" );
1114
+ observable = ObjectHelper .requireNonNull (selector .apply (cf ), "The selector returned a null Publisher" );
1115
1115
} catch (Throwable e ) {
1116
1116
Exceptions .throwIfFatal (e );
1117
1117
EmptySubscription .error (e , child );
@@ -1122,7 +1122,7 @@ protected void subscribeActual(Subscriber<? super R> child) {
1122
1122
1123
1123
observable .subscribe (srw );
1124
1124
1125
- co .connect (new DisposableConsumer (srw ));
1125
+ cf .connect (new DisposableConsumer (srw ));
1126
1126
}
1127
1127
1128
1128
final class DisposableConsumer implements Consumer <Disposable > {
@@ -1140,17 +1140,17 @@ public void accept(Disposable r) {
1140
1140
}
1141
1141
1142
1142
static final class ConnectableFlowableReplay <T > extends ConnectableFlowable <T > {
1143
- private final ConnectableFlowable <T > co ;
1143
+ private final ConnectableFlowable <T > cf ;
1144
1144
private final Flowable <T > observable ;
1145
1145
1146
- ConnectableFlowableReplay (ConnectableFlowable <T > co , Flowable <T > observable ) {
1147
- this .co = co ;
1146
+ ConnectableFlowableReplay (ConnectableFlowable <T > cf , Flowable <T > observable ) {
1147
+ this .cf = cf ;
1148
1148
this .observable = observable ;
1149
1149
}
1150
1150
1151
1151
@ Override
1152
1152
public void connect (Consumer <? super Disposable > connection ) {
1153
- co .connect (connection );
1153
+ cf .connect (connection );
1154
1154
}
1155
1155
1156
1156
@ Override
0 commit comments