Skip to content

Commit 4c22f96

Browse files
SleimanJneidiakarnokd
authored andcommitted
2.x: WIP removes anonymous inner classes. (ReactiveX#5174)
* removes anonymous inner classes from flowable * address comments * remove remaining private classes * cleanups in observable * cleanups in operators
1 parent f059ded commit 4c22f96

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2033
-1366
lines changed

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java

+37-25
Original file line numberDiff line numberDiff line change
@@ -63,31 +63,7 @@ public void subscribeActual(final CompletableObserver s) {
6363

6464
final AtomicBoolean once = new AtomicBoolean();
6565

66-
CompletableObserver inner = new CompletableObserver() {
67-
@Override
68-
public void onComplete() {
69-
if (once.compareAndSet(false, true)) {
70-
set.dispose();
71-
s.onComplete();
72-
}
73-
}
74-
75-
@Override
76-
public void onError(Throwable e) {
77-
if (once.compareAndSet(false, true)) {
78-
set.dispose();
79-
s.onError(e);
80-
} else {
81-
RxJavaPlugins.onError(e);
82-
}
83-
}
84-
85-
@Override
86-
public void onSubscribe(Disposable d) {
87-
set.add(d);
88-
}
89-
90-
};
66+
CompletableObserver inner = new Amb(once, set, s);
9167

9268
for (int i = 0; i < count; i++) {
9369
CompletableSource c = sources[i];
@@ -113,4 +89,40 @@ public void onSubscribe(Disposable d) {
11389
s.onComplete();
11490
}
11591
}
92+
93+
static final class Amb implements CompletableObserver {
94+
private final AtomicBoolean once;
95+
private final CompositeDisposable set;
96+
private final CompletableObserver s;
97+
98+
Amb(AtomicBoolean once, CompositeDisposable set, CompletableObserver s) {
99+
this.once = once;
100+
this.set = set;
101+
this.s = s;
102+
}
103+
104+
@Override
105+
public void onComplete() {
106+
if (once.compareAndSet(false, true)) {
107+
set.dispose();
108+
s.onComplete();
109+
}
110+
}
111+
112+
@Override
113+
public void onError(Throwable e) {
114+
if (once.compareAndSet(false, true)) {
115+
set.dispose();
116+
s.onError(e);
117+
} else {
118+
RxJavaPlugins.onError(e);
119+
}
120+
}
121+
122+
@Override
123+
public void onSubscribe(Disposable d) {
124+
set.add(d);
125+
}
126+
127+
}
116128
}

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java

+39-22
Original file line numberDiff line numberDiff line change
@@ -42,36 +42,53 @@ public CompletableDelay(CompletableSource source, long delay, TimeUnit unit, Sch
4242
protected void subscribeActual(final CompletableObserver s) {
4343
final CompositeDisposable set = new CompositeDisposable();
4444

45-
source.subscribe(new CompletableObserver() {
45+
source.subscribe(new Delay(set, s));
46+
}
47+
48+
final class Delay implements CompletableObserver {
49+
50+
private final CompositeDisposable set;
51+
private final CompletableObserver s;
52+
53+
Delay(CompositeDisposable set, CompletableObserver s) {
54+
this.set = set;
55+
this.s = s;
56+
}
57+
58+
@Override
59+
public void onComplete() {
60+
set.add(scheduler.scheduleDirect(new OnComplete(), delay, unit));
61+
}
4662

63+
@Override
64+
public void onError(final Throwable e) {
65+
set.add(scheduler.scheduleDirect(new OnError(e), delayError ? delay : 0, unit));
66+
}
4767

68+
@Override
69+
public void onSubscribe(Disposable d) {
70+
set.add(d);
71+
s.onSubscribe(set);
72+
}
73+
74+
final class OnComplete implements Runnable {
4875
@Override
49-
public void onComplete() {
50-
set.add(scheduler.scheduleDirect(new Runnable() {
51-
@Override
52-
public void run() {
53-
s.onComplete();
54-
}
55-
}, delay, unit));
76+
public void run() {
77+
s.onComplete();
5678
}
79+
}
5780

58-
@Override
59-
public void onError(final Throwable e) {
60-
set.add(scheduler.scheduleDirect(new Runnable() {
61-
@Override
62-
public void run() {
63-
s.onError(e);
64-
}
65-
}, delayError ? delay : 0, unit));
81+
final class OnError implements Runnable {
82+
private final Throwable e;
83+
84+
OnError(Throwable e) {
85+
this.e = e;
6686
}
6787

6888
@Override
69-
public void onSubscribe(Disposable d) {
70-
set.add(d);
71-
s.onSubscribe(set);
89+
public void run() {
90+
s.onError(e);
7291
}
73-
74-
});
92+
}
7593
}
76-
7794
}

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java

+33-25
Original file line numberDiff line numberDiff line change
@@ -32,36 +32,44 @@ public CompletableDoOnEvent(final CompletableSource source, final Consumer<? sup
3232

3333
@Override
3434
protected void subscribeActual(final CompletableObserver s) {
35-
source.subscribe(new CompletableObserver() {
36-
@Override
37-
public void onComplete() {
38-
try {
39-
onEvent.accept(null);
40-
} catch (Throwable e) {
41-
Exceptions.throwIfFatal(e);
42-
s.onError(e);
43-
return;
44-
}
35+
source.subscribe(new DoOnEvent(s));
36+
}
4537

46-
s.onComplete();
47-
}
38+
final class DoOnEvent implements CompletableObserver {
39+
private final CompletableObserver observer;
4840

49-
@Override
50-
public void onError(Throwable e) {
51-
try {
52-
onEvent.accept(e);
53-
} catch (Throwable ex) {
54-
Exceptions.throwIfFatal(ex);
55-
e = new CompositeException(e, ex);
56-
}
41+
DoOnEvent(CompletableObserver observer) {
42+
this.observer = observer;
43+
}
5744

58-
s.onError(e);
45+
@Override
46+
public void onComplete() {
47+
try {
48+
onEvent.accept(null);
49+
} catch (Throwable e) {
50+
Exceptions.throwIfFatal(e);
51+
observer.onError(e);
52+
return;
5953
}
6054

61-
@Override
62-
public void onSubscribe(final Disposable d) {
63-
s.onSubscribe(d);
55+
observer.onComplete();
56+
}
57+
58+
@Override
59+
public void onError(Throwable e) {
60+
try {
61+
onEvent.accept(e);
62+
} catch (Throwable ex) {
63+
Exceptions.throwIfFatal(ex);
64+
e = new CompositeException(e, ex);
6465
}
65-
});
66+
67+
observer.onError(e);
68+
}
69+
70+
@Override
71+
public void onSubscribe(final Disposable d) {
72+
observer.onSubscribe(d);
73+
}
6674
}
6775
}

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableOnErrorComplete.java

+34-27
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,45 @@ public CompletableOnErrorComplete(CompletableSource source, Predicate<? super Th
3232
@Override
3333
protected void subscribeActual(final CompletableObserver s) {
3434

35-
source.subscribe(new CompletableObserver() {
35+
source.subscribe(new OnError(s));
36+
}
3637

37-
@Override
38-
public void onComplete() {
39-
s.onComplete();
40-
}
38+
final class OnError implements CompletableObserver {
39+
40+
private final CompletableObserver s;
41+
42+
OnError(CompletableObserver s) {
43+
this.s = s;
44+
}
4145

42-
@Override
43-
public void onError(Throwable e) {
44-
boolean b;
45-
46-
try {
47-
b = predicate.test(e);
48-
} catch (Throwable ex) {
49-
Exceptions.throwIfFatal(ex);
50-
s.onError(new CompositeException(e, ex));
51-
return;
52-
}
53-
54-
if (b) {
55-
s.onComplete();
56-
} else {
57-
s.onError(e);
58-
}
46+
@Override
47+
public void onComplete() {
48+
s.onComplete();
49+
}
50+
51+
@Override
52+
public void onError(Throwable e) {
53+
boolean b;
54+
55+
try {
56+
b = predicate.test(e);
57+
} catch (Throwable ex) {
58+
Exceptions.throwIfFatal(ex);
59+
s.onError(new CompositeException(e, ex));
60+
return;
5961
}
6062

61-
@Override
62-
public void onSubscribe(Disposable d) {
63-
s.onSubscribe(d);
63+
if (b) {
64+
s.onComplete();
65+
} else {
66+
s.onError(e);
6467
}
68+
}
6569

66-
});
67-
}
70+
@Override
71+
public void onSubscribe(Disposable d) {
72+
s.onSubscribe(d);
73+
}
6874

75+
}
6976
}

Diff for: src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java

+48-38
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,52 @@ protected void subscribeActual(final CompletableObserver s) {
3838

3939
final SequentialDisposable sd = new SequentialDisposable();
4040
s.onSubscribe(sd);
41-
source.subscribe(new CompletableObserver() {
41+
source.subscribe(new ResumeNext(s, sd));
42+
}
43+
44+
final class ResumeNext implements CompletableObserver {
45+
46+
private final CompletableObserver s;
47+
private final SequentialDisposable sd;
48+
49+
ResumeNext(CompletableObserver s, SequentialDisposable sd) {
50+
this.s = s;
51+
this.sd = sd;
52+
}
53+
54+
@Override
55+
public void onComplete() {
56+
s.onComplete();
57+
}
58+
59+
@Override
60+
public void onError(Throwable e) {
61+
CompletableSource c;
62+
63+
try {
64+
c = errorMapper.apply(e);
65+
} catch (Throwable ex) {
66+
Exceptions.throwIfFatal(ex);
67+
s.onError(new CompositeException(ex, e));
68+
return;
69+
}
70+
71+
if (c == null) {
72+
NullPointerException npe = new NullPointerException("The CompletableConsumable returned is null");
73+
npe.initCause(e);
74+
s.onError(npe);
75+
return;
76+
}
77+
78+
c.subscribe(new OnErrorObserver());
79+
}
80+
81+
@Override
82+
public void onSubscribe(Disposable d) {
83+
sd.update(d);
84+
}
85+
86+
final class OnErrorObserver implements CompletableObserver {
4287

4388
@Override
4489
public void onComplete() {
@@ -47,49 +92,14 @@ public void onComplete() {
4792

4893
@Override
4994
public void onError(Throwable e) {
50-
CompletableSource c;
51-
52-
try {
53-
c = errorMapper.apply(e);
54-
} catch (Throwable ex) {
55-
Exceptions.throwIfFatal(ex);
56-
s.onError(new CompositeException(ex, e));
57-
return;
58-
}
59-
60-
if (c == null) {
61-
NullPointerException npe = new NullPointerException("The CompletableConsumable returned is null");
62-
npe.initCause(e);
63-
s.onError(npe);
64-
return;
65-
}
66-
67-
c.subscribe(new CompletableObserver() {
68-
69-
@Override
70-
public void onComplete() {
71-
s.onComplete();
72-
}
73-
74-
@Override
75-
public void onError(Throwable e) {
76-
s.onError(e);
77-
}
78-
79-
@Override
80-
public void onSubscribe(Disposable d) {
81-
sd.update(d);
82-
}
83-
84-
});
95+
s.onError(e);
8596
}
8697

8798
@Override
8899
public void onSubscribe(Disposable d) {
89100
sd.update(d);
90101
}
91102

92-
});
103+
}
93104
}
94-
95105
}

0 commit comments

Comments
 (0)