Skip to content

Commit ab64982

Browse files
authored
2.x: Make observeOn not let worker.dispose() called prematurely (#6167)
* 2.x: Make observeOn not let worker.dispose() called prematurely * Merge in master.
1 parent 5c67fe4 commit ab64982

File tree

4 files changed

+262
-7
lines changed

4 files changed

+262
-7
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java

+13
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
191191
if (d) {
192192
if (delayError) {
193193
if (empty) {
194+
cancelled = true;
194195
Throwable e = error;
195196
if (e != null) {
196197
a.onError(e);
@@ -203,12 +204,14 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
203204
} else {
204205
Throwable e = error;
205206
if (e != null) {
207+
cancelled = true;
206208
clear();
207209
a.onError(e);
208210
worker.dispose();
209211
return true;
210212
} else
211213
if (empty) {
214+
cancelled = true;
212215
a.onComplete();
213216
worker.dispose();
214217
return true;
@@ -314,6 +317,7 @@ void runSync() {
314317
v = q.poll();
315318
} catch (Throwable ex) {
316319
Exceptions.throwIfFatal(ex);
320+
cancelled = true;
317321
upstream.cancel();
318322
a.onError(ex);
319323
worker.dispose();
@@ -324,6 +328,7 @@ void runSync() {
324328
return;
325329
}
326330
if (v == null) {
331+
cancelled = true;
327332
a.onComplete();
328333
worker.dispose();
329334
return;
@@ -339,6 +344,7 @@ void runSync() {
339344
}
340345

341346
if (q.isEmpty()) {
347+
cancelled = true;
342348
a.onComplete();
343349
worker.dispose();
344350
return;
@@ -379,6 +385,7 @@ void runAsync() {
379385
} catch (Throwable ex) {
380386
Exceptions.throwIfFatal(ex);
381387

388+
cancelled = true;
382389
upstream.cancel();
383390
q.clear();
384391

@@ -441,6 +448,7 @@ void runBackfused() {
441448
downstream.onNext(null);
442449

443450
if (d) {
451+
cancelled = true;
444452
Throwable e = error;
445453
if (e != null) {
446454
downstream.onError(e);
@@ -552,6 +560,7 @@ void runSync() {
552560
v = q.poll();
553561
} catch (Throwable ex) {
554562
Exceptions.throwIfFatal(ex);
563+
cancelled = true;
555564
upstream.cancel();
556565
a.onError(ex);
557566
worker.dispose();
@@ -562,6 +571,7 @@ void runSync() {
562571
return;
563572
}
564573
if (v == null) {
574+
cancelled = true;
565575
a.onComplete();
566576
worker.dispose();
567577
return;
@@ -577,6 +587,7 @@ void runSync() {
577587
}
578588

579589
if (q.isEmpty()) {
590+
cancelled = true;
580591
a.onComplete();
581592
worker.dispose();
582593
return;
@@ -617,6 +628,7 @@ void runAsync() {
617628
} catch (Throwable ex) {
618629
Exceptions.throwIfFatal(ex);
619630

631+
cancelled = true;
620632
upstream.cancel();
621633
q.clear();
622634

@@ -680,6 +692,7 @@ void runBackfused() {
680692
downstream.onNext(null);
681693

682694
if (d) {
695+
cancelled = true;
683696
Throwable e = error;
684697
if (e != null) {
685698
downstream.onError(e);

src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
6262
Throwable error;
6363
volatile boolean done;
6464

65-
volatile boolean cancelled;
65+
volatile boolean disposed;
6666

6767
int sourceMode;
6868

@@ -141,8 +141,8 @@ public void onComplete() {
141141

142142
@Override
143143
public void dispose() {
144-
if (!cancelled) {
145-
cancelled = true;
144+
if (!disposed) {
145+
disposed = true;
146146
upstream.dispose();
147147
worker.dispose();
148148
if (getAndIncrement() == 0) {
@@ -153,7 +153,7 @@ public void dispose() {
153153

154154
@Override
155155
public boolean isDisposed() {
156-
return cancelled;
156+
return disposed;
157157
}
158158

159159
void schedule() {
@@ -181,6 +181,7 @@ void drainNormal() {
181181
v = q.poll();
182182
} catch (Throwable ex) {
183183
Exceptions.throwIfFatal(ex);
184+
disposed = true;
184185
upstream.dispose();
185186
q.clear();
186187
a.onError(ex);
@@ -211,14 +212,15 @@ void drainFused() {
211212
int missed = 1;
212213

213214
for (;;) {
214-
if (cancelled) {
215+
if (disposed) {
215216
return;
216217
}
217218

218219
boolean d = done;
219220
Throwable ex = error;
220221

221222
if (!delayError && d && ex != null) {
223+
disposed = true;
222224
downstream.onError(error);
223225
worker.dispose();
224226
return;
@@ -227,6 +229,7 @@ void drainFused() {
227229
downstream.onNext(null);
228230

229231
if (d) {
232+
disposed = true;
230233
ex = error;
231234
if (ex != null) {
232235
downstream.onError(ex);
@@ -254,14 +257,15 @@ public void run() {
254257
}
255258

256259
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
257-
if (cancelled) {
260+
if (disposed) {
258261
queue.clear();
259262
return true;
260263
}
261264
if (d) {
262265
Throwable e = error;
263266
if (delayError) {
264267
if (empty) {
268+
disposed = true;
265269
if (e != null) {
266270
a.onError(e);
267271
} else {
@@ -272,12 +276,14 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
272276
}
273277
} else {
274278
if (e != null) {
279+
disposed = true;
275280
queue.clear();
276281
a.onError(e);
277282
worker.dispose();
278283
return true;
279284
} else
280285
if (empty) {
286+
disposed = true;
281287
a.onComplete();
282288
worker.dispose();
283289
return true;

src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java

+161-1
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@
2121
import java.util.concurrent.*;
2222
import java.util.concurrent.atomic.*;
2323

24-
import io.reactivex.annotations.Nullable;
2524
import org.junit.Test;
2625
import org.mockito.InOrder;
2726
import org.reactivestreams.*;
2827

2928
import io.reactivex.*;
29+
import io.reactivex.annotations.Nullable;
30+
import io.reactivex.disposables.*;
3031
import io.reactivex.exceptions.*;
3132
import io.reactivex.functions.*;
3233
import io.reactivex.internal.functions.Functions;
@@ -1781,4 +1782,163 @@ public void syncFusedRequestOneByOneConditional() {
17811782
.test()
17821783
.assertResult(1, 2, 3, 4, 5);
17831784
}
1785+
1786+
public static final class DisposeTrackingScheduler extends Scheduler {
1787+
1788+
public final AtomicInteger disposedCount = new AtomicInteger();
1789+
1790+
@Override
1791+
public Worker createWorker() {
1792+
return new TrackingWorker();
1793+
}
1794+
1795+
final class TrackingWorker extends Scheduler.Worker {
1796+
1797+
@Override
1798+
public void dispose() {
1799+
disposedCount.getAndIncrement();
1800+
}
1801+
1802+
@Override
1803+
public boolean isDisposed() {
1804+
return false;
1805+
}
1806+
1807+
@Override
1808+
public Disposable schedule(Runnable run, long delay,
1809+
TimeUnit unit) {
1810+
run.run();
1811+
return Disposables.empty();
1812+
}
1813+
}
1814+
}
1815+
1816+
@Test
1817+
public void workerNotDisposedPrematurelyNormalInNormalOut() {
1818+
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
1819+
1820+
Flowable.concat(
1821+
Flowable.just(1).hide().observeOn(s),
1822+
Flowable.just(2)
1823+
)
1824+
.test()
1825+
.assertResult(1, 2);
1826+
1827+
assertEquals(1, s.disposedCount.get());
1828+
}
1829+
1830+
@Test
1831+
public void workerNotDisposedPrematurelySyncInNormalOut() {
1832+
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
1833+
1834+
Flowable.concat(
1835+
Flowable.just(1).observeOn(s),
1836+
Flowable.just(2)
1837+
)
1838+
.test()
1839+
.assertResult(1, 2);
1840+
1841+
assertEquals(1, s.disposedCount.get());
1842+
}
1843+
1844+
@Test
1845+
public void workerNotDisposedPrematurelyAsyncInNormalOut() {
1846+
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
1847+
1848+
UnicastProcessor<Integer> up = UnicastProcessor.create();
1849+
up.onNext(1);
1850+
up.onComplete();
1851+
1852+
Flowable.concat(
1853+
up.observeOn(s),
1854+
Flowable.just(2)
1855+
)
1856+
.test()
1857+
.assertResult(1, 2);
1858+
1859+
assertEquals(1, s.disposedCount.get());
1860+
}
1861+
1862+
static final class TestSubscriberFusedCanceling
1863+
extends TestSubscriber<Integer> {
1864+
1865+
public TestSubscriberFusedCanceling() {
1866+
super();
1867+
initialFusionMode = QueueFuseable.ANY;
1868+
}
1869+
1870+
@Override
1871+
public void onComplete() {
1872+
cancel();
1873+
super.onComplete();
1874+
}
1875+
}
1876+
1877+
@Test
1878+
public void workerNotDisposedPrematurelyNormalInAsyncOut() {
1879+
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
1880+
1881+
TestSubscriber<Integer> ts = new TestSubscriberFusedCanceling();
1882+
1883+
Flowable.just(1).hide().observeOn(s).subscribe(ts);
1884+
1885+
assertEquals(1, s.disposedCount.get());
1886+
}
1887+
1888+
@Test
1889+
public void workerNotDisposedPrematurelyNormalInNormalOutConditional() {
1890+
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
1891+
1892+
Flowable.concat(
1893+
Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()),
1894+
Flowable.just(2)
1895+
)
1896+
.test()
1897+
.assertResult(1, 2);
1898+
1899+
assertEquals(1, s.disposedCount.get());
1900+
}
1901+
1902+
@Test
1903+
public void workerNotDisposedPrematurelySyncInNormalOutConditional() {
1904+
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
1905+
1906+
Flowable.concat(
1907+
Flowable.just(1).observeOn(s).filter(Functions.alwaysTrue()),
1908+
Flowable.just(2)
1909+
)
1910+
.test()
1911+
.assertResult(1, 2);
1912+
1913+
assertEquals(1, s.disposedCount.get());
1914+
}
1915+
1916+
@Test
1917+
public void workerNotDisposedPrematurelyAsyncInNormalOutConditional() {
1918+
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
1919+
1920+
UnicastProcessor<Integer> up = UnicastProcessor.create();
1921+
up.onNext(1);
1922+
up.onComplete();
1923+
1924+
Flowable.concat(
1925+
up.observeOn(s).filter(Functions.alwaysTrue()),
1926+
Flowable.just(2)
1927+
)
1928+
.test()
1929+
.assertResult(1, 2);
1930+
1931+
assertEquals(1, s.disposedCount.get());
1932+
}
1933+
1934+
@Test
1935+
public void workerNotDisposedPrematurelyNormalInAsyncOutConditional() {
1936+
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
1937+
1938+
TestSubscriber<Integer> ts = new TestSubscriberFusedCanceling();
1939+
1940+
Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()).subscribe(ts);
1941+
1942+
assertEquals(1, s.disposedCount.get());
1943+
}
17841944
}

0 commit comments

Comments
 (0)