13
13
14
14
package io .reactivex .internal .operators .flowable ;
15
15
16
+ import io .reactivex .internal .functions .ObjectHelper ;
16
17
import java .util .Collection ;
17
18
import java .util .concurrent .Callable ;
18
19
import java .util .concurrent .atomic .AtomicReference ;
@@ -76,7 +77,7 @@ public void onSubscribe(Subscription s) {
76
77
U b ;
77
78
78
79
try {
79
- b = bufferSupplier .call ();
80
+ b = ObjectHelper . requireNonNull ( bufferSupplier .call (), "The buffer supplied is null" );
80
81
} catch (Throwable e ) {
81
82
Exceptions .throwIfFatal (e );
82
83
cancelled = true ;
@@ -85,18 +86,12 @@ public void onSubscribe(Subscription s) {
85
86
return ;
86
87
}
87
88
88
- if (b == null ) {
89
- cancelled = true ;
90
- s .cancel ();
91
- EmptySubscription .error (new NullPointerException ("The buffer supplied is null" ), actual );
92
- return ;
93
- }
94
89
buffer = b ;
95
90
96
91
Publisher <B > boundary ;
97
92
98
93
try {
99
- boundary = boundarySupplier .call ();
94
+ boundary = ObjectHelper . requireNonNull ( boundarySupplier .call (), "The boundary publisher supplied is null" );
100
95
} catch (Throwable ex ) {
101
96
Exceptions .throwIfFatal (ex );
102
97
cancelled = true ;
@@ -105,13 +100,6 @@ public void onSubscribe(Subscription s) {
105
100
return ;
106
101
}
107
102
108
- if (boundary == null ) {
109
- cancelled = true ;
110
- s .cancel ();
111
- EmptySubscription .error (new NullPointerException ("The boundary publisher supplied is null" ), actual );
112
- return ;
113
- }
114
-
115
103
BufferBoundarySubscriber <T , U , B > bs = new BufferBoundarySubscriber <T , U , B >(this );
116
104
other .set (bs );
117
105
@@ -185,24 +173,18 @@ void next() {
185
173
U next ;
186
174
187
175
try {
188
- next = bufferSupplier .call ();
176
+ next = ObjectHelper . requireNonNull ( bufferSupplier .call (), "The buffer supplied is null" );
189
177
} catch (Throwable e ) {
190
178
Exceptions .throwIfFatal (e );
191
179
cancel ();
192
180
actual .onError (e );
193
181
return ;
194
182
}
195
183
196
- if (next == null ) {
197
- cancel ();
198
- actual .onError (new NullPointerException ("The buffer supplied is null" ));
199
- return ;
200
- }
201
-
202
184
Publisher <B > boundary ;
203
185
204
186
try {
205
- boundary = boundarySupplier .call ();
187
+ boundary = ObjectHelper . requireNonNull ( boundarySupplier .call (), "The boundary publisher supplied is null" );
206
188
} catch (Throwable ex ) {
207
189
Exceptions .throwIfFatal (ex );
208
190
cancelled = true ;
@@ -211,13 +193,6 @@ void next() {
211
193
return ;
212
194
}
213
195
214
- if (boundary == null ) {
215
- cancelled = true ;
216
- s .cancel ();
217
- actual .onError (new NullPointerException ("The boundary publisher supplied is null" ));
218
- return ;
219
- }
220
-
221
196
BufferBoundarySubscriber <T , U , B > bs = new BufferBoundarySubscriber <T , U , B >(this );
222
197
223
198
Disposable o = other .get ();
0 commit comments