Skip to content

Commit 73a85c1

Browse files
authored
2.x: more eager cancellation in flatMapX (#5422)
* 2.x: more eager cancellation in flatMapX * Add more eager check to Observable.flatMapX
1 parent b7086ef commit 73a85c1

8 files changed

+20
-8
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ static final class FlatMapCompletableMainSubscriber<T> extends BasicIntQueueSubs
7272

7373
Subscription s;
7474

75+
volatile boolean cancelled;
76+
7577
FlatMapCompletableMainSubscriber(Subscriber<? super T> observer,
7678
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors,
7779
int maxConcurrency) {
@@ -117,7 +119,7 @@ public void onNext(T value) {
117119

118120
InnerConsumer inner = new InnerConsumer();
119121

120-
if (set.add(inner)) {
122+
if (!cancelled && set.add(inner)) {
121123
cs.subscribe(inner);
122124
}
123125
}
@@ -164,6 +166,7 @@ public void onComplete() {
164166

165167
@Override
166168
public void cancel() {
169+
cancelled = true;
167170
s.cancel();
168171
set.dispose();
169172
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ static final class FlatMapCompletableMainSubscriber<T> extends AtomicInteger
7979

8080
Subscription s;
8181

82+
volatile boolean disposed;
83+
8284
FlatMapCompletableMainSubscriber(CompletableObserver observer,
8385
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors,
8486
int maxConcurrency) {
@@ -124,7 +126,7 @@ public void onNext(T value) {
124126

125127
InnerObserver inner = new InnerObserver();
126128

127-
if (set.add(inner)) {
129+
if (!disposed && set.add(inner)) {
128130
cs.subscribe(inner);
129131
}
130132
}
@@ -171,6 +173,7 @@ public void onComplete() {
171173

172174
@Override
173175
public void dispose() {
176+
disposed = true;
174177
s.cancel();
175178
set.dispose();
176179
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void onNext(T t) {
128128

129129
InnerObserver inner = new InnerObserver();
130130

131-
if (set.add(inner)) {
131+
if (!cancelled && set.add(inner)) {
132132
ms.subscribe(inner);
133133
}
134134
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void onNext(T t) {
128128

129129
InnerObserver inner = new InnerObserver();
130130

131-
if (set.add(inner)) {
131+
if (!cancelled && set.add(inner)) {
132132
ms.subscribe(inner);
133133
}
134134
}

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ static final class FlatMapCompletableMainObserver<T> extends BasicIntQueueDispos
6464

6565
Disposable d;
6666

67+
volatile boolean disposed;
68+
6769
FlatMapCompletableMainObserver(Observer<? super T> observer, Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
6870
this.actual = observer;
6971
this.mapper = mapper;
@@ -99,7 +101,7 @@ public void onNext(T value) {
99101

100102
InnerObserver inner = new InnerObserver();
101103

102-
if (set.add(inner)) {
104+
if (!disposed && set.add(inner)) {
103105
cs.subscribe(inner);
104106
}
105107
}
@@ -138,6 +140,7 @@ public void onComplete() {
138140

139141
@Override
140142
public void dispose() {
143+
disposed = true;
141144
d.dispose();
142145
set.dispose();
143146
}

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ static final class FlatMapCompletableMainObserver<T> extends AtomicInteger imple
6969

7070
Disposable d;
7171

72+
volatile boolean disposed;
73+
7274
FlatMapCompletableMainObserver(CompletableObserver observer, Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
7375
this.actual = observer;
7476
this.mapper = mapper;
@@ -104,7 +106,7 @@ public void onNext(T value) {
104106

105107
InnerObserver inner = new InnerObserver();
106108

107-
if (set.add(inner)) {
109+
if (!disposed && set.add(inner)) {
108110
cs.subscribe(inner);
109111
}
110112
}
@@ -143,6 +145,7 @@ public void onComplete() {
143145

144146
@Override
145147
public void dispose() {
148+
disposed = true;
146149
d.dispose();
147150
set.dispose();
148151
}

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void onNext(T t) {
109109

110110
InnerObserver inner = new InnerObserver();
111111

112-
if (set.add(inner)) {
112+
if (!cancelled && set.add(inner)) {
113113
ms.subscribe(inner);
114114
}
115115
}

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void onNext(T t) {
109109

110110
InnerObserver inner = new InnerObserver();
111111

112-
if (set.add(inner)) {
112+
if (!cancelled && set.add(inner)) {
113113
ms.subscribe(inner);
114114
}
115115
}

0 commit comments

Comments
 (0)