Skip to content

Commit 39e5d91

Browse files
authored
2.x: concat to report isDisposed consistently with termination (#5440)
1 parent 4c19753 commit 39e5d91

File tree

3 files changed

+186
-3
lines changed

3 files changed

+186
-3
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ void drain() {
202202
boolean empty = t == null;
203203

204204
if (d && empty) {
205+
disposed = true;
205206
actual.onComplete();
206207
return;
207208
}
@@ -367,7 +368,7 @@ public void onComplete() {
367368

368369
@Override
369370
public boolean isDisposed() {
370-
return d.isDisposed();
371+
return cancelled;
371372
}
372373

373374
@Override
@@ -400,7 +401,7 @@ void drain() {
400401
Throwable ex = error.get();
401402
if (ex != null) {
402403
queue.clear();
403-
404+
cancelled = true;
404405
actual.onError(error.terminate());
405406
return;
406407
}
@@ -414,6 +415,7 @@ void drain() {
414415
v = queue.poll();
415416
} catch (Throwable ex) {
416417
Exceptions.throwIfFatal(ex);
418+
cancelled = true;
417419
this.d.dispose();
418420
error.addThrowable(ex);
419421
actual.onError(error.terminate());
@@ -423,6 +425,7 @@ void drain() {
423425
boolean empty = v == null;
424426

425427
if (d && empty) {
428+
cancelled = true;
426429
Throwable ex = error.terminate();
427430
if (ex != null) {
428431
actual.onError(ex);
@@ -440,6 +443,7 @@ void drain() {
440443
o = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null ObservableSource");
441444
} catch (Throwable ex) {
442445
Exceptions.throwIfFatal(ex);
446+
cancelled = true;
443447
this.d.dispose();
444448
queue.clear();
445449
error.addThrowable(ex);

src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapTest.java

+66-1
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16+
import static org.junit.Assert.assertTrue;
17+
1618
import java.util.List;
1719
import java.util.concurrent.Callable;
1820

1921
import org.junit.Test;
2022

2123
import io.reactivex.*;
22-
import io.reactivex.disposables.Disposables;
24+
import io.reactivex.disposables.*;
2325
import io.reactivex.exceptions.TestException;
2426
import io.reactivex.functions.Function;
27+
import io.reactivex.internal.functions.Functions;
2528
import io.reactivex.observers.TestObserver;
2629
import io.reactivex.plugins.RxJavaPlugins;
2730
import io.reactivex.schedulers.Schedulers;
@@ -367,4 +370,66 @@ protected void subscribeActual(Observer<? super Integer> observer) {
367370
RxJavaPlugins.reset();
368371
}
369372
}
373+
374+
@SuppressWarnings("unchecked")
375+
@Test
376+
public void concatReportsDisposedOnComplete() {
377+
final Disposable[] disposable = { null };
378+
379+
Observable.fromArray(Observable.just(1), Observable.just(2))
380+
.hide()
381+
.concatMap(Functions.<Observable<Integer>>identity())
382+
.subscribe(new Observer<Integer>() {
383+
384+
@Override
385+
public void onSubscribe(Disposable d) {
386+
disposable[0] = d;
387+
}
388+
389+
@Override
390+
public void onNext(Integer t) {
391+
}
392+
393+
@Override
394+
public void onError(Throwable e) {
395+
}
396+
397+
@Override
398+
public void onComplete() {
399+
}
400+
});
401+
402+
assertTrue(disposable[0].isDisposed());
403+
}
404+
405+
@Test
406+
@SuppressWarnings("unchecked")
407+
public void concatReportsDisposedOnError() {
408+
final Disposable[] disposable = { null };
409+
410+
Observable.fromArray(Observable.just(1), Observable.<Integer>error(new TestException()))
411+
.hide()
412+
.concatMap(Functions.<Observable<Integer>>identity())
413+
.subscribe(new Observer<Integer>() {
414+
415+
@Override
416+
public void onSubscribe(Disposable d) {
417+
disposable[0] = d;
418+
}
419+
420+
@Override
421+
public void onNext(Integer t) {
422+
}
423+
424+
@Override
425+
public void onError(Throwable e) {
426+
}
427+
428+
@Override
429+
public void onComplete() {
430+
}
431+
});
432+
433+
assertTrue(disposable[0].isDisposed());
434+
}
370435
}

src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java

+114
Original file line numberDiff line numberDiff line change
@@ -1041,4 +1041,118 @@ public void subscribe(ObservableEmitter<Integer> s) throws Exception {
10411041

10421042
assertEquals(1, calls[0]);
10431043
}
1044+
1045+
@Test
1046+
public void concatReportsDisposedOnComplete() {
1047+
final Disposable[] disposable = { null };
1048+
1049+
Observable.concat(Observable.just(1), Observable.just(2))
1050+
.subscribe(new Observer<Integer>() {
1051+
1052+
@Override
1053+
public void onSubscribe(Disposable d) {
1054+
disposable[0] = d;
1055+
}
1056+
1057+
@Override
1058+
public void onNext(Integer t) {
1059+
}
1060+
1061+
@Override
1062+
public void onError(Throwable e) {
1063+
}
1064+
1065+
@Override
1066+
public void onComplete() {
1067+
}
1068+
});
1069+
1070+
assertTrue(disposable[0].isDisposed());
1071+
}
1072+
1073+
@Test
1074+
@SuppressWarnings("unchecked")
1075+
public void concatReportsDisposedOnCompleteDelayError() {
1076+
final Disposable[] disposable = { null };
1077+
1078+
Observable.concatArrayDelayError(Observable.just(1), Observable.just(2))
1079+
.subscribe(new Observer<Integer>() {
1080+
1081+
@Override
1082+
public void onSubscribe(Disposable d) {
1083+
disposable[0] = d;
1084+
}
1085+
1086+
@Override
1087+
public void onNext(Integer t) {
1088+
}
1089+
1090+
@Override
1091+
public void onError(Throwable e) {
1092+
}
1093+
1094+
@Override
1095+
public void onComplete() {
1096+
}
1097+
});
1098+
1099+
assertTrue(disposable[0].isDisposed());
1100+
}
1101+
1102+
@Test
1103+
public void concatReportsDisposedOnError() {
1104+
final Disposable[] disposable = { null };
1105+
1106+
Observable.concat(Observable.just(1), Observable.<Integer>error(new TestException()))
1107+
.subscribe(new Observer<Integer>() {
1108+
1109+
@Override
1110+
public void onSubscribe(Disposable d) {
1111+
disposable[0] = d;
1112+
}
1113+
1114+
@Override
1115+
public void onNext(Integer t) {
1116+
}
1117+
1118+
@Override
1119+
public void onError(Throwable e) {
1120+
}
1121+
1122+
@Override
1123+
public void onComplete() {
1124+
}
1125+
});
1126+
1127+
assertTrue(disposable[0].isDisposed());
1128+
}
1129+
1130+
@Test
1131+
@SuppressWarnings("unchecked")
1132+
public void concatReportsDisposedOnErrorDelayError() {
1133+
final Disposable[] disposable = { null };
1134+
1135+
Observable.concatArrayDelayError(Observable.just(1), Observable.<Integer>error(new TestException()))
1136+
.subscribe(new Observer<Integer>() {
1137+
1138+
@Override
1139+
public void onSubscribe(Disposable d) {
1140+
disposable[0] = d;
1141+
}
1142+
1143+
@Override
1144+
public void onNext(Integer t) {
1145+
}
1146+
1147+
@Override
1148+
public void onError(Throwable e) {
1149+
}
1150+
1151+
@Override
1152+
public void onComplete() {
1153+
}
1154+
});
1155+
1156+
assertTrue(disposable[0].isDisposed());
1157+
}
10441158
}

0 commit comments

Comments
 (0)