Skip to content

Commit 2c3ec38

Browse files
authored
2.x: move DisposableObserver to public area, add some javadocs (#4337)
1 parent 125cf73 commit 2c3ec38

13 files changed

+38
-17
lines changed

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.Observable;
2222
import io.reactivex.Optional;
2323
import io.reactivex.exceptions.Exceptions;
24-
import io.reactivex.internal.subscribers.observable.DisposableObserver;
24+
import io.reactivex.observers.DisposableObserver;
2525

2626
/**
2727
* Wait for and iterate over the latest values of the source observable. If the source works faster than the

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.Optional;
2222
import io.reactivex.Try;
2323
import io.reactivex.exceptions.Exceptions;
24-
import io.reactivex.internal.subscribers.observable.DisposableObserver;
24+
import io.reactivex.observers.DisposableObserver;
2525

2626
/**
2727
* Returns an Iterable that blocks until the Observable emits another item, then returns that item.

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.internal.queue.MpscLinkedQueue;
2727
import io.reactivex.internal.subscribers.observable.*;
2828
import io.reactivex.internal.util.QueueDrainHelper;
29-
import io.reactivex.observers.SerializedObserver;
29+
import io.reactivex.observers.*;
3030
import io.reactivex.plugins.RxJavaPlugins;
3131

3232
public final class ObservableBufferBoundary<T, U extends Collection<? super T>, Open, Close>

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.internal.queue.MpscLinkedQueue;
2424
import io.reactivex.internal.subscribers.observable.*;
2525
import io.reactivex.internal.util.QueueDrainHelper;
26-
import io.reactivex.observers.SerializedObserver;
26+
import io.reactivex.observers.*;
2727
import io.reactivex.plugins.RxJavaPlugins;
2828

2929
public final class ObservableBufferBoundarySupplier<T, U extends Collection<? super T>, B>

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferExactBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.internal.queue.MpscLinkedQueue;
2323
import io.reactivex.internal.subscribers.observable.*;
2424
import io.reactivex.internal.util.QueueDrainHelper;
25-
import io.reactivex.observers.SerializedObserver;
25+
import io.reactivex.observers.*;
2626

2727
public final class ObservableBufferExactBoundary<T, U extends Collection<? super T>, B>
2828
extends AbstractObservableWithUpstream<T, U> {

src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.functions.Function;
2121
import io.reactivex.internal.disposables.DisposableHelper;
22-
import io.reactivex.internal.subscribers.observable.DisposableObserver;
23-
import io.reactivex.observers.SerializedObserver;
22+
import io.reactivex.observers.*;
2423
import io.reactivex.plugins.RxJavaPlugins;
2524

2625
public final class ObservableDebounce<T, U> extends AbstractObservableWithUpstream<T, T> {

src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.functions.Function;
2222
import io.reactivex.internal.disposables.*;
2323
import io.reactivex.internal.subscribers.observable.*;
24-
import io.reactivex.observers.SerializedObserver;
24+
import io.reactivex.observers.*;
2525
import io.reactivex.plugins.RxJavaPlugins;
2626

2727
public final class ObservableTimeout<T, U, V> extends AbstractObservableWithUpstream<T, T> {

src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundary.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.internal.queue.MpscLinkedQueue;
2424
import io.reactivex.internal.subscribers.observable.*;
2525
import io.reactivex.internal.util.NotificationLite;
26-
import io.reactivex.observers.SerializedObserver;
26+
import io.reactivex.observers.*;
2727
import io.reactivex.plugins.RxJavaPlugins;
2828
import io.reactivex.subjects.UnicastSubject;
2929

src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import io.reactivex.internal.queue.MpscLinkedQueue;
2828
import io.reactivex.internal.subscribers.observable.*;
2929
import io.reactivex.internal.util.NotificationLite;
30-
import io.reactivex.observers.SerializedObserver;
30+
import io.reactivex.observers.*;
3131
import io.reactivex.plugins.RxJavaPlugins;
3232
import io.reactivex.subjects.UnicastSubject;
3333

src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.internal.queue.MpscLinkedQueue;
2424
import io.reactivex.internal.subscribers.observable.*;
2525
import io.reactivex.internal.util.NotificationLite;
26-
import io.reactivex.observers.SerializedObserver;
26+
import io.reactivex.observers.*;
2727
import io.reactivex.plugins.RxJavaPlugins;
2828
import io.reactivex.subjects.UnicastSubject;
2929

src/main/java/io/reactivex/internal/subscribers/observable/DisposableObserver.java src/main/java/io/reactivex/observers/DisposableObserver.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.observable;
14+
package io.reactivex.observers;
1515

1616
import java.util.concurrent.atomic.AtomicReference;
1717

@@ -20,9 +20,9 @@
2020
import io.reactivex.internal.disposables.DisposableHelper;
2121

2222
/**
23-
* An abstract subscription that allows asynchronous cancellation.
23+
* An abstract Observer that allows asynchronous cancellation by implementing Disposable.
2424
*
25-
* @param <T>
25+
* @param <T> the received value type
2626
*/
2727
public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
2828
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
@@ -34,6 +34,9 @@ public final void onSubscribe(Disposable s) {
3434
}
3535
}
3636

37+
/**
38+
* Called once the single upstream Disposable is set via onSubscribe.
39+
*/
3740
protected void onStart() {
3841
}
3942

src/main/java/io/reactivex/observers/Observers.java

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.reactivex.functions.Consumer;
1919
import io.reactivex.internal.disposables.DisposableHelper;
2020
import io.reactivex.internal.functions.*;
21-
import io.reactivex.internal.subscribers.observable.DisposableObserver;
2221
import io.reactivex.plugins.RxJavaPlugins;
2322

2423
/**

src/main/java/io/reactivex/subscribers/DisposableSubscriber.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2222

2323
/**
24-
* An abstract subscription that allows asynchronous cancellation.
24+
* An abstract Subscriber that allows asynchronous cancellation by implementing Disposable.
2525
*
26-
* @param <T>
26+
* @param <T> the received value type.
2727
*/
2828
public abstract class DisposableSubscriber<T> implements Subscriber<T>, Disposable {
2929
final AtomicReference<Subscription> s = new AtomicReference<Subscription>();
@@ -35,18 +35,38 @@ public final void onSubscribe(Subscription s) {
3535
}
3636
}
3737

38+
/**
39+
* Returns the current Subscription sent to this Subscriber via onSubscribe().
40+
* @return the current Subscription, may be null
41+
*/
3842
protected final Subscription subscription() {
3943
return s.get();
4044
}
4145

46+
/**
47+
* Called once the single upstream Subscription is set via onSubscribe.
48+
*/
4249
protected void onStart() {
4350
s.get().request(Long.MAX_VALUE);
4451
}
4552

53+
/**
54+
* Requests the specified amount from the upstream if its Subscription is set via
55+
* onSubscribe already.
56+
* <p>Note that calling this method before a Subscription is set via onSubscribe
57+
* leads to NullPointerException and meant to be called from inside onStart or
58+
* onNext.
59+
* @param n the request amount, positive
60+
*/
4661
protected final void request(long n) {
4762
s.get().request(n);
4863
}
4964

65+
/**
66+
* Cancels the Subscription set via onSubscribe or makes sure a
67+
* Subscription set asynchronously (later) is cancelled immediately.
68+
* <p>This method is thread-safe and can be exposed as a public API.
69+
*/
5070
protected final void cancel() {
5171
dispose();
5272
}

0 commit comments

Comments
 (0)