File tree 4 files changed +16
-62
lines changed
internal/subscribers/flowable
test/java/io/reactivex/internal/operators/flowable
4 files changed +16
-62
lines changed Load Diff This file was deleted.
Original file line number Diff line number Diff line change 18
18
import io .reactivex .plugins .RxJavaPlugins ;
19
19
20
20
/**
21
- * A subscriber that ignores all events (onError is forwarded to RxJavaPlugins though) .
21
+ * A subscriber that ignores all events.
22
22
*/
23
23
public enum EmptySubscriber implements Subscriber <Object > {
24
24
/** Empty instance that reports error to the plugins. */
25
- INSTANCE (true ),
25
+ INSTANCE (true , false ),
26
26
/** Empty instance that doesn't report to the plugins to avoid flooding the test output. */
27
- INSTANCE_NOERROR (false );
27
+ INSTANCE_NOERROR (false , false ),
28
+ /** Empty instance that cancels subscriptions. */
29
+ CANCELLED (true , true );
28
30
29
- final boolean reportError ;
31
+ private final boolean reportError ;
32
+ private final boolean cancelSubscription ;
30
33
31
- EmptySubscriber (boolean reportError ) {
34
+ EmptySubscriber (boolean reportError , boolean cancelSubscription ) {
32
35
this .reportError = reportError ;
36
+ this .cancelSubscription = cancelSubscription ;
33
37
}
34
38
35
39
@ Override
36
40
public void onSubscribe (Subscription s ) {
37
-
41
+ if (cancelSubscription ) {
42
+ s .cancel ();
43
+ }
38
44
}
39
45
40
46
@ Override
Original file line number Diff line number Diff line change @@ -37,7 +37,7 @@ public static <T> Subscriber<T> empty() {
37
37
38
38
@ SuppressWarnings ("unchecked" )
39
39
public static <T > Subscriber <T > cancelled () {
40
- return (Subscriber <T >)CancelledSubscriber . INSTANCE ;
40
+ return (Subscriber <T >)EmptySubscriber . CANCELLED ;
41
41
}
42
42
43
43
public static <T > DisposableSubscriber <T > emptyDisposable () {
Original file line number Diff line number Diff line change 29
29
import io .reactivex .disposables .Disposable ;
30
30
import io .reactivex .flowable .TestHelper ;
31
31
import io .reactivex .functions .*;
32
- import io .reactivex .internal .subscribers .flowable .CancelledSubscriber ;
33
32
import io .reactivex .processors .ReplayProcessor ;
34
33
import io .reactivex .schedulers .*;
35
- import io .reactivex .subscribers .TestSubscriber ;
34
+ import io .reactivex .subscribers .* ;
36
35
37
36
public class FlowableRefCountTest {
38
37
@@ -441,7 +440,7 @@ public void accept(Long t1) {
441
440
442
441
@ Test
443
442
public void testAlreadyUnsubscribedClient () {
444
- Subscriber <Integer > done = CancelledSubscriber . instance ();
443
+ Subscriber <Integer > done = Subscribers . cancelled ();
445
444
446
445
Subscriber <Integer > o = TestHelper .mockSubscriber ();
447
446
@@ -460,7 +459,7 @@ public void testAlreadyUnsubscribedClient() {
460
459
public void testAlreadyUnsubscribedInterleavesWithClient () {
461
460
ReplayProcessor <Integer > source = ReplayProcessor .create ();
462
461
463
- Subscriber <Integer > done = CancelledSubscriber . instance ();
462
+ Subscriber <Integer > done = Subscribers . cancelled ();
464
463
465
464
Subscriber <Integer > o = TestHelper .mockSubscriber ();
466
465
InOrder inOrder = inOrder (o );
You can’t perform that action at this time.
0 commit comments