File tree 2 files changed +34
-2
lines changed
main/java/io/reactivex/internal/operators/flowable
test/java/io/reactivex/internal/operators/flowable
2 files changed +34
-2
lines changed Original file line number Diff line number Diff line change 21
21
import io .reactivex .exceptions .Exceptions ;
22
22
import io .reactivex .internal .functions .ObjectHelper ;
23
23
import io .reactivex .internal .subscriptions .DeferredScalarSubscription ;
24
+ import io .reactivex .plugins .RxJavaPlugins ;
24
25
25
26
public final class FlowableFromCallable <T > extends Flowable <T > implements Callable <T > {
26
27
final Callable <? extends T > callable ;
@@ -38,7 +39,11 @@ public void subscribeActual(Subscriber<? super T> s) {
38
39
t = ObjectHelper .requireNonNull (callable .call (), "The callable returned a null value" );
39
40
} catch (Throwable ex ) {
40
41
Exceptions .throwIfFatal (ex );
41
- s .onError (ex );
42
+ if (deferred .isCancelled ()) {
43
+ RxJavaPlugins .onError (ex );
44
+ } else {
45
+ s .onError (ex );
46
+ }
42
47
return ;
43
48
}
44
49
Original file line number Diff line number Diff line change 16
16
17
17
package io .reactivex .internal .operators .flowable ;
18
18
19
+ import static org .junit .Assert .assertEquals ;
20
+ import static org .mockito .ArgumentMatchers .any ;
19
21
import static org .mockito .Mockito .*;
20
- import static org .junit .Assert .*;
21
22
23
+ import java .util .List ;
22
24
import java .util .concurrent .*;
23
25
24
26
import org .junit .Test ;
27
29
import org .reactivestreams .*;
28
30
29
31
import io .reactivex .*;
32
+ import io .reactivex .exceptions .TestException ;
30
33
import io .reactivex .functions .Function ;
34
+ import io .reactivex .plugins .RxJavaPlugins ;
31
35
import io .reactivex .schedulers .Schedulers ;
32
36
import io .reactivex .subscribers .TestSubscriber ;
33
37
@@ -238,4 +242,27 @@ public Object call() throws Exception {
238
242
.test ()
239
243
.assertFailure (NullPointerException .class );
240
244
}
245
+
246
+ @ Test (timeout = 5000 )
247
+ public void undeliverableUponCancellation () throws Exception {
248
+ List <Throwable > errors = TestHelper .trackPluginErrors ();
249
+ try {
250
+ final TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
251
+
252
+ Flowable .fromCallable (new Callable <Integer >() {
253
+ @ Override
254
+ public Integer call () throws Exception {
255
+ ts .cancel ();
256
+ throw new TestException ();
257
+ }
258
+ })
259
+ .subscribe (ts );
260
+
261
+ ts .assertEmpty ();
262
+
263
+ TestHelper .assertUndeliverable (errors , 0 , TestException .class );
264
+ } finally {
265
+ RxJavaPlugins .reset ();
266
+ }
267
+ }
241
268
}
You can’t perform that action at this time.
0 commit comments