Skip to content

Commit ae2aaa6

Browse files
committed
2.x: introduce op-fusion to Observable + disposable-chaining fixes
1 parent 7a5320f commit ae2aaa6

File tree

58 files changed

+1276
-299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1276
-299
lines changed

src/main/java/io/reactivex/Observable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1659,7 +1659,7 @@ public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSu
16591659
return lift(new NbpOperator<T, T>() {
16601660
@Override
16611661
public Observer<? super T> apply(Observer<? super T> s) {
1662-
return new NbpSubscriptionLambdaSubscriber<T>(s, onSubscribe, onCancel);
1662+
return new SubscriptionLambdaObserver<T>(s, onSubscribe, onCancel);
16631663
}
16641664
});
16651665
}
@@ -2626,7 +2626,7 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
26262626
Objects.requireNonNull(onComplete, "onComplete is null");
26272627
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
26282628

2629-
NbpLambdaSubscriber<T> ls = new NbpLambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
2629+
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
26302630

26312631
unsafeSubscribe(ls);
26322632

src/main/java/io/reactivex/internal/operators/observable/ObservableAll.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ protected void subscribeActual(Observer<? super Boolean> t) {
3232
source.subscribe(new AllSubscriber<T>(t, predicate));
3333
}
3434

35-
static final class AllSubscriber<T> implements Observer<T> {
35+
static final class AllSubscriber<T> implements Observer<T>, Disposable {
3636
final Observer<? super Boolean> actual;
3737
final Predicate<? super T> predicate;
3838

@@ -48,7 +48,7 @@ public AllSubscriber(Observer<? super Boolean> actual, Predicate<? super T> pred
4848
public void onSubscribe(Disposable s) {
4949
if (DisposableHelper.validate(this.s, s)) {
5050
this.s = s;
51-
actual.onSubscribe(s);
51+
actual.onSubscribe(this);
5252
}
5353
}
5454

@@ -93,5 +93,15 @@ public void onComplete() {
9393
actual.onNext(true);
9494
actual.onComplete();
9595
}
96+
97+
@Override
98+
public void dispose() {
99+
s.dispose();
100+
}
101+
102+
@Override
103+
public boolean isDisposed() {
104+
return s.isDisposed();
105+
}
96106
}
97107
}

src/main/java/io/reactivex/internal/operators/observable/ObservableAny.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ protected void subscribeActual(Observer<? super Boolean> t) {
3030
source.subscribe(new AnySubscriber<T>(t, predicate));
3131
}
3232

33-
static final class AnySubscriber<T> implements Observer<T> {
33+
static final class AnySubscriber<T> implements Observer<T>, Disposable {
3434

3535
final Observer<? super Boolean> actual;
3636
final Predicate<? super T> predicate;
@@ -47,7 +47,7 @@ public AnySubscriber(Observer<? super Boolean> actual, Predicate<? super T> pred
4747
public void onSubscribe(Disposable s) {
4848
if (DisposableHelper.validate(this.s, s)) {
4949
this.s = s;
50-
actual.onSubscribe(s);
50+
actual.onSubscribe(this);
5151
}
5252
}
5353

@@ -89,5 +89,15 @@ public void onComplete() {
8989
actual.onComplete();
9090
}
9191
}
92+
93+
@Override
94+
public void dispose() {
95+
s.dispose();
96+
}
97+
98+
@Override
99+
public boolean isDisposed() {
100+
return s.isDisposed();
101+
}
92102
}
93103
}

src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java

+25-4
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ protected void subscribeActual(Observer<? super U> t) {
4848
}
4949
}
5050

51-
static final class BufferExactSubscriber<T, U extends Collection<? super T>> implements Observer<T> {
51+
static final class BufferExactSubscriber<T, U extends Collection<? super T>> implements Observer<T>, Disposable {
5252
final Observer<? super U> actual;
5353
final int count;
5454
final Supplier<U> bufferSupplier;
@@ -98,10 +98,20 @@ boolean createBuffer() {
9898
public void onSubscribe(Disposable s) {
9999
if (DisposableHelper.validate(this.s, s)) {
100100
this.s = s;
101-
actual.onSubscribe(s);
101+
actual.onSubscribe(this);
102102
}
103103
}
104+
105+
@Override
106+
public void dispose() {
107+
s.dispose();
108+
}
104109

110+
@Override
111+
public boolean isDisposed() {
112+
return s.isDisposed();
113+
}
114+
105115
@Override
106116
public void onNext(T t) {
107117
U b = buffer;
@@ -137,7 +147,7 @@ public void onComplete() {
137147
}
138148

139149
static final class BufferSkipSubscriber<T, U extends Collection<? super T>>
140-
extends AtomicBoolean implements Observer<T> {
150+
extends AtomicBoolean implements Observer<T>, Disposable {
141151
/** */
142152
private static final long serialVersionUID = -8223395059921494546L;
143153
final Observer<? super U> actual;
@@ -163,10 +173,21 @@ public BufferSkipSubscriber(Observer<? super U> actual, int count, int skip, Sup
163173
public void onSubscribe(Disposable s) {
164174
if (DisposableHelper.validate(this.s, s)) {
165175
this.s = s;
166-
actual.onSubscribe(s);
176+
actual.onSubscribe(this);
167177
}
168178
}
169179

180+
181+
@Override
182+
public void dispose() {
183+
s.dispose();
184+
}
185+
186+
@Override
187+
public boolean isDisposed() {
188+
return s.isDisposed();
189+
}
190+
170191
@Override
171192
public void onNext(T t) {
172193
if (index++ % skip == 0) {

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected void subscribeActual(Observer<? super U> t) {
5252
}
5353

5454
static final class BufferBoundarySubscriber<T, U extends Collection<? super T>, Open, Close>
55-
extends NbpQueueDrainSubscriber<T, U, U> implements Disposable {
55+
extends QueueDrainObserver<T, U, U> implements Disposable {
5656
final ObservableConsumable<? extends Open> bufferOpen;
5757
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;
5858
final Supplier<U> bufferSupplier;

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ protected void subscribeActual(Observer<? super U> t) {
4444
}
4545

4646
static final class BufferBondarySupplierSubscriber<T, U extends Collection<? super T>, B>
47-
extends NbpQueueDrainSubscriber<T, U, U> implements Observer<T>, Disposable {
47+
extends QueueDrainObserver<T, U, U> implements Observer<T>, Disposable {
4848
/** */
4949
final Supplier<U> bufferSupplier;
5050
final Supplier<? extends ObservableConsumable<B>> boundarySupplier;

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferExactBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ protected void subscribeActual(Observer<? super U> t) {
4242
}
4343

4444
static final class BufferExactBondarySubscriber<T, U extends Collection<? super T>, B>
45-
extends NbpQueueDrainSubscriber<T, U, U> implements Observer<T>, Disposable {
45+
extends QueueDrainObserver<T, U, U> implements Observer<T>, Disposable {
4646
/** */
4747
final Supplier<U> bufferSupplier;
4848
final ObservableConsumable<B> boundary;

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import io.reactivex.functions.Supplier;
2626
import io.reactivex.internal.disposables.*;
2727
import io.reactivex.internal.queue.MpscLinkedQueue;
28-
import io.reactivex.internal.subscribers.observable.NbpQueueDrainSubscriber;
28+
import io.reactivex.internal.subscribers.observable.QueueDrainObserver;
2929
import io.reactivex.internal.util.QueueDrainHelper;
3030
import io.reactivex.observers.SerializedObserver;
3131

@@ -80,7 +80,7 @@ protected void subscribeActual(Observer<? super U> t) {
8080
}
8181

8282
static final class BufferExactUnboundedSubscriber<T, U extends Collection<? super T>>
83-
extends NbpQueueDrainSubscriber<T, U, U> implements Runnable, Disposable {
83+
extends QueueDrainObserver<T, U, U> implements Runnable, Disposable {
8484
final Supplier<U> bufferSupplier;
8585
final long timespan;
8686
final TimeUnit unit;
@@ -242,7 +242,7 @@ public void accept(Observer<? super U> a, U v) {
242242
}
243243

244244
static final class BufferSkipBoundedSubscriber<T, U extends Collection<? super T>>
245-
extends NbpQueueDrainSubscriber<T, U, U> implements Runnable, Disposable {
245+
extends QueueDrainObserver<T, U, U> implements Runnable, Disposable {
246246
final Supplier<U> bufferSupplier;
247247
final long timespan;
248248
final long timeskip;
@@ -408,7 +408,7 @@ public void accept(Observer<? super U> a, U v) {
408408
}
409409

410410
static final class BufferExactBoundedSubscriber<T, U extends Collection<? super T>>
411-
extends NbpQueueDrainSubscriber<T, U, U> implements Runnable, Disposable {
411+
extends QueueDrainObserver<T, U, U> implements Runnable, Disposable {
412412
final Supplier<U> bufferSupplier;
413413
final long timespan;
414414
final TimeUnit unit;

src/main/java/io/reactivex/internal/operators/observable/ObservableCollect.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ protected void subscribeActual(Observer<? super U> t) {
4747

4848
}
4949

50-
static final class CollectSubscriber<T, U> implements Observer<T> {
50+
static final class CollectSubscriber<T, U> implements Observer<T>, Disposable {
5151
final Observer<? super U> actual;
5252
final BiConsumer<? super U, ? super T> collector;
5353
final U u;
@@ -64,10 +64,22 @@ public CollectSubscriber(Observer<? super U> actual, U u, BiConsumer<? super U,
6464
public void onSubscribe(Disposable s) {
6565
if (DisposableHelper.validate(this.s, s)) {
6666
this.s = s;
67-
actual.onSubscribe(s);
67+
actual.onSubscribe(this);
6868
}
6969
}
7070

71+
72+
@Override
73+
public void dispose() {
74+
s.dispose();
75+
}
76+
77+
@Override
78+
public boolean isDisposed() {
79+
return s.isDisposed();
80+
}
81+
82+
7183
@Override
7284
public void onNext(T t) {
7385
try {

0 commit comments

Comments
 (0)