Skip to content

Commit a008e03

Browse files
authored
2.x: Improve Completable.onErrorResumeNext internals (#6123)
* 2.x: Improve Completable.onErrorResumeNext internals * Use ObjectHelper
1 parent 7ade77a commit a008e03

File tree

3 files changed

+104
-40
lines changed

3 files changed

+104
-40
lines changed

src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java

+38-38
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313

1414
package io.reactivex.internal.operators.completable;
1515

16+
import java.util.concurrent.atomic.AtomicReference;
17+
1618
import io.reactivex.*;
1719
import io.reactivex.disposables.Disposable;
1820
import io.reactivex.exceptions.*;
1921
import io.reactivex.functions.Function;
20-
import io.reactivex.internal.disposables.SequentialDisposable;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.functions.ObjectHelper;
2124

2225
public final class CompletableResumeNext extends Completable {
2326

@@ -35,20 +38,32 @@ public CompletableResumeNext(CompletableSource source,
3538

3639
@Override
3740
protected void subscribeActual(final CompletableObserver observer) {
38-
39-
final SequentialDisposable sd = new SequentialDisposable();
40-
observer.onSubscribe(sd);
41-
source.subscribe(new ResumeNext(observer, sd));
41+
ResumeNextObserver parent = new ResumeNextObserver(observer, errorMapper);
42+
observer.onSubscribe(parent);
43+
source.subscribe(parent);
4244
}
4345

44-
final class ResumeNext implements CompletableObserver {
46+
static final class ResumeNextObserver
47+
extends AtomicReference<Disposable>
48+
implements CompletableObserver, Disposable {
49+
50+
private static final long serialVersionUID = 5018523762564524046L;
4551

4652
final CompletableObserver downstream;
47-
final SequentialDisposable sd;
4853

49-
ResumeNext(CompletableObserver observer, SequentialDisposable sd) {
54+
final Function<? super Throwable, ? extends CompletableSource> errorMapper;
55+
56+
boolean once;
57+
58+
ResumeNextObserver(CompletableObserver observer, Function<? super Throwable, ? extends CompletableSource> errorMapper) {
5059
this.downstream = observer;
51-
this.sd = sd;
60+
this.errorMapper = errorMapper;
61+
}
62+
63+
64+
@Override
65+
public void onSubscribe(Disposable d) {
66+
DisposableHelper.replace(this, d);
5267
}
5368

5469
@Override
@@ -58,48 +73,33 @@ public void onComplete() {
5873

5974
@Override
6075
public void onError(Throwable e) {
76+
if (once) {
77+
downstream.onError(e);
78+
return;
79+
}
80+
once = true;
81+
6182
CompletableSource c;
6283

6384
try {
64-
c = errorMapper.apply(e);
85+
c = ObjectHelper.requireNonNull(errorMapper.apply(e), "The errorMapper returned a null CompletableSource");
6586
} catch (Throwable ex) {
6687
Exceptions.throwIfFatal(ex);
67-
downstream.onError(new CompositeException(ex, e));
88+
downstream.onError(new CompositeException(e, ex));
6889
return;
6990
}
7091

71-
if (c == null) {
72-
NullPointerException npe = new NullPointerException("The CompletableConsumable returned is null");
73-
npe.initCause(e);
74-
downstream.onError(npe);
75-
return;
76-
}
77-
78-
c.subscribe(new OnErrorObserver());
92+
c.subscribe(this);
7993
}
8094

8195
@Override
82-
public void onSubscribe(Disposable d) {
83-
sd.update(d);
96+
public boolean isDisposed() {
97+
return DisposableHelper.isDisposed(get());
8498
}
8599

86-
final class OnErrorObserver implements CompletableObserver {
87-
88-
@Override
89-
public void onComplete() {
90-
downstream.onComplete();
91-
}
92-
93-
@Override
94-
public void onError(Throwable e) {
95-
downstream.onError(e);
96-
}
97-
98-
@Override
99-
public void onSubscribe(Disposable d) {
100-
sd.update(d);
101-
}
102-
100+
@Override
101+
public void dispose() {
102+
DisposableHelper.dispose(this);
103103
}
104104
}
105105
}

src/test/java/io/reactivex/completable/CompletableTest.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -2170,8 +2170,11 @@ public Completable apply(Throwable e) {
21702170
try {
21712171
c.blockingAwait();
21722172
Assert.fail("Did not throw an exception");
2173-
} catch (NullPointerException ex) {
2174-
Assert.assertTrue(ex.getCause() instanceof TestException);
2173+
} catch (CompositeException ex) {
2174+
List<Throwable> errors = ex.getExceptions();
2175+
TestHelper.assertError(errors, 0, TestException.class);
2176+
TestHelper.assertError(errors, 1, NullPointerException.class);
2177+
assertEquals(2, errors.size());
21752178
}
21762179
}
21772180

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import org.junit.Test;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.exceptions.TestException;
20+
import io.reactivex.functions.Function;
21+
import io.reactivex.internal.functions.Functions;
22+
23+
public class CompletableResumeNextTest {
24+
25+
@Test
26+
public void resumeWithError() {
27+
Completable.error(new TestException())
28+
.onErrorResumeNext(Functions.justFunction(Completable.error(new TestException("second"))))
29+
.test()
30+
.assertFailureAndMessage(TestException.class, "second");
31+
}
32+
33+
@Test
34+
public void disposeInMain() {
35+
TestHelper.checkDisposedCompletable(new Function<Completable, CompletableSource>() {
36+
@Override
37+
public CompletableSource apply(Completable c) throws Exception {
38+
return c.onErrorResumeNext(Functions.justFunction(Completable.complete()));
39+
}
40+
});
41+
}
42+
43+
44+
@Test
45+
public void disposeInResume() {
46+
TestHelper.checkDisposedCompletable(new Function<Completable, CompletableSource>() {
47+
@Override
48+
public CompletableSource apply(Completable c) throws Exception {
49+
return Completable.error(new TestException()).onErrorResumeNext(Functions.justFunction(c));
50+
}
51+
});
52+
}
53+
54+
@Test
55+
public void disposed() {
56+
TestHelper.checkDisposed(
57+
Completable.error(new TestException())
58+
.onErrorResumeNext(Functions.justFunction(Completable.never()))
59+
);
60+
}
61+
}

0 commit comments

Comments
 (0)