File tree 4 files changed +34
-3
lines changed
main/java/io/reactivex/internal/operators
test/java/io/reactivex/internal/operators
4 files changed +34
-3
lines changed Original file line number Diff line number Diff line change @@ -175,6 +175,7 @@ public void request(long n) {
175
175
public void cancel () {
176
176
if (!cancelled ) {
177
177
cancelled = true ;
178
+ s .cancel ();
178
179
179
180
disposeInner ();
180
181
}
@@ -186,7 +187,7 @@ void disposeInner() {
186
187
if (a != CANCELLED ) {
187
188
a = active .getAndSet ((SwitchMapInnerSubscriber <T , R >)CANCELLED );
188
189
if (a != CANCELLED && a != null ) {
189
- s .cancel ();
190
+ a .cancel ();
190
191
}
191
192
}
192
193
}
Original file line number Diff line number Diff line change @@ -160,7 +160,7 @@ public void onComplete() {
160
160
public void dispose () {
161
161
if (!cancelled ) {
162
162
cancelled = true ;
163
-
163
+ s . dispose ();
164
164
disposeInner ();
165
165
}
166
166
}
@@ -176,7 +176,7 @@ void disposeInner() {
176
176
if (a != CANCELLED ) {
177
177
a = active .getAndSet ((SwitchMapInnerSubscriber <T , R >)CANCELLED );
178
178
if (a != CANCELLED && a != null ) {
179
- s . dispose ();
179
+ a . cancel ();
180
180
}
181
181
}
182
182
}
Original file line number Diff line number Diff line change @@ -797,4 +797,18 @@ public Publisher<Integer> apply(Object v) throws Exception {
797
797
798
798
}
799
799
800
+ @ Test
801
+ public void switchMapInnerCancelled () {
802
+ PublishProcessor <Integer > pp = PublishProcessor .create ();
803
+
804
+ TestSubscriber <Integer > ts = Flowable .just (1 )
805
+ .switchMap (Functions .justFunction (pp ))
806
+ .test ();
807
+
808
+ assertTrue (pp .hasSubscribers ());
809
+
810
+ ts .cancel ();
811
+
812
+ assertFalse (pp .hasSubscribers ());
813
+ }
800
814
}
Original file line number Diff line number Diff line change @@ -607,4 +607,20 @@ public ObservableSource<Integer> apply(Object v) throws Exception {
607
607
608
608
}
609
609
610
+
611
+ @ Test
612
+ public void switchMapInnerCancelled () {
613
+ PublishSubject <Integer > pp = PublishSubject .create ();
614
+
615
+ TestObserver <Integer > ts = Observable .just (1 )
616
+ .switchMap (Functions .justFunction (pp ))
617
+ .test ();
618
+
619
+ assertTrue (pp .hasObservers ());
620
+
621
+ ts .cancel ();
622
+
623
+ assertFalse (pp .hasObservers ());
624
+ }
625
+
610
626
}
You can’t perform that action at this time.
0 commit comments