28
28
public abstract class BasicFuseableObserver <T , R > implements Observer <T >, QueueDisposable <R > {
29
29
30
30
/** The downstream subscriber. */
31
- protected final Observer <? super R > actual ;
31
+ protected final Observer <? super R > downstream ;
32
32
33
33
/** The upstream subscription. */
34
- protected Disposable s ;
34
+ protected Disposable upstream ;
35
35
36
36
/** The upstream's QueueDisposable if not null. */
37
- protected QueueDisposable <T > qs ;
37
+ protected QueueDisposable <T > qd ;
38
38
39
39
/** Flag indicating no further onXXX event should be accepted. */
40
40
protected boolean done ;
@@ -44,26 +44,26 @@ public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueD
44
44
45
45
/**
46
46
* Construct a BasicFuseableObserver by wrapping the given subscriber.
47
- * @param actual the subscriber, not null (not verified)
47
+ * @param downstream the subscriber, not null (not verified)
48
48
*/
49
- public BasicFuseableObserver (Observer <? super R > actual ) {
50
- this .actual = actual ;
49
+ public BasicFuseableObserver (Observer <? super R > downstream ) {
50
+ this .downstream = downstream ;
51
51
}
52
52
53
53
// final: fixed protocol steps to support fuseable and non-fuseable upstream
54
54
@ SuppressWarnings ("unchecked" )
55
55
@ Override
56
- public final void onSubscribe (Disposable s ) {
57
- if (DisposableHelper .validate (this .s , s )) {
56
+ public final void onSubscribe (Disposable d ) {
57
+ if (DisposableHelper .validate (this .upstream , d )) {
58
58
59
- this .s = s ;
60
- if (s instanceof QueueDisposable ) {
61
- this .qs = (QueueDisposable <T >)s ;
59
+ this .upstream = d ;
60
+ if (d instanceof QueueDisposable ) {
61
+ this .qd = (QueueDisposable <T >)d ;
62
62
}
63
63
64
64
if (beforeDownstream ()) {
65
65
66
- actual .onSubscribe (this );
66
+ downstream .onSubscribe (this );
67
67
68
68
afterDownstream ();
69
69
}
@@ -97,7 +97,7 @@ public void onError(Throwable t) {
97
97
return ;
98
98
}
99
99
done = true ;
100
- actual .onError (t );
100
+ downstream .onError (t );
101
101
}
102
102
103
103
/**
@@ -106,7 +106,7 @@ public void onError(Throwable t) {
106
106
*/
107
107
protected final void fail (Throwable t ) {
108
108
Exceptions .throwIfFatal (t );
109
- s .dispose ();
109
+ upstream .dispose ();
110
110
onError (t );
111
111
}
112
112
@@ -116,24 +116,24 @@ public void onComplete() {
116
116
return ;
117
117
}
118
118
done = true ;
119
- actual .onComplete ();
119
+ downstream .onComplete ();
120
120
}
121
121
122
122
/**
123
123
* Calls the upstream's QueueDisposable.requestFusion with the mode and
124
124
* saves the established mode in {@link #sourceMode} if that mode doesn't
125
125
* have the {@link QueueDisposable#BOUNDARY} flag set.
126
126
* <p>
127
- * If the upstream doesn't support fusion ({@link #qs } is null), the method
127
+ * If the upstream doesn't support fusion ({@link #qd } is null), the method
128
128
* returns {@link QueueDisposable#NONE}.
129
129
* @param mode the fusion mode requested
130
130
* @return the established fusion mode
131
131
*/
132
132
protected final int transitiveBoundaryFusion (int mode ) {
133
- QueueDisposable <T > qs = this .qs ;
134
- if (qs != null ) {
133
+ QueueDisposable <T > qd = this .qd ;
134
+ if (qd != null ) {
135
135
if ((mode & BOUNDARY ) == 0 ) {
136
- int m = qs .requestFusion (mode );
136
+ int m = qd .requestFusion (mode );
137
137
if (m != NONE ) {
138
138
sourceMode = m ;
139
139
}
@@ -149,22 +149,22 @@ protected final int transitiveBoundaryFusion(int mode) {
149
149
150
150
@ Override
151
151
public void dispose () {
152
- s .dispose ();
152
+ upstream .dispose ();
153
153
}
154
154
155
155
@ Override
156
156
public boolean isDisposed () {
157
- return s .isDisposed ();
157
+ return upstream .isDisposed ();
158
158
}
159
159
160
160
@ Override
161
161
public boolean isEmpty () {
162
- return qs .isEmpty ();
162
+ return qd .isEmpty ();
163
163
}
164
164
165
165
@ Override
166
166
public void clear () {
167
- qs .clear ();
167
+ qd .clear ();
168
168
}
169
169
170
170
// -----------------------------------------------------------
0 commit comments