Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added nullable annotations to subjects #5890

Merged
merged 1 commit into from
Mar 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/main/java/io/reactivex/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -85,22 +86,22 @@
* AsyncSubject<Object> subject = AsyncSubject.create();
*
* TestObserver<Object> to1 = subject.test();
*
*
* to1.assertEmpty();
*
*
* subject.onNext(1);
*
*
* // AsyncSubject only emits when onComplete was called.
* to1.assertEmpty();
*
* subject.onNext(2);
* subject.onComplete();
*
*
* // onComplete triggers the emission of the last cached item and the onComplete event.
* to1.assertResult(2);
*
*
* TestObserver<Object> to2 = subject.test();
*
*
* // late Observers receive the last cached item too
* to2.assertResult(2);
* </code></pre>
Expand Down Expand Up @@ -313,6 +314,7 @@ public boolean hasValue() {
* <p>The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
*/
@Nullable
public T getValue() {
return subscribers.get() == TERMINATED ? value : null;
}
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/io/reactivex/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.subjects;

import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.Nullable;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.*;
Expand Down Expand Up @@ -63,19 +64,19 @@
* observable.onNext(1);
* // this will "clear" the cache
* observable.onNext(EMPTY);
*
*
* TestObserver&lt;Integer&gt; to2 = observable.test();
*
*
* subject.onNext(2);
* subject.onComplete();
*
*
* // to1 received both non-empty items
* to1.assertResult(1, 2);
*
*
* // to2 received only 2 even though the current item was EMPTY
* // when it got subscribed
* to2.assertResult(2);
*
*
* // Observers coming after the subject was terminated receive
* // no items and only the onComplete event in this case.
* observable.test().assertResult();
Expand Down Expand Up @@ -300,6 +301,7 @@ public boolean hasObservers() {
}

@Override
@Nullable
public Throwable getThrowable() {
Object o = value.get();
if (NotificationLite.isError(o)) {
Expand All @@ -313,6 +315,7 @@ public Throwable getThrowable() {
* <p>The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
*/
@Nullable
public T getValue() {
Object o = value.get();
if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/reactivex/subjects/CompletableSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import java.util.concurrent.atomic.*;

import io.reactivex.*;
Expand Down Expand Up @@ -65,12 +66,12 @@
* Example usage:
* <pre><code>
* CompletableSubject subject = CompletableSubject.create();
*
*
* TestObserver&lt;Void&gt; to1 = subject.test();
*
* // a fresh CompletableSubject is empty
* to1.assertEmpty();
*
*
* subject.onComplete();
*
* // a CompletableSubject is always void of items
Expand Down Expand Up @@ -213,6 +214,7 @@ void remove(CompletableDisposable inner) {
* Returns the terminal error if this CompletableSubject has been terminated with an error, null otherwise.
* @return the terminal error or null if not terminated or not with an error
*/
@Nullable
public Throwable getThrowable() {
if (observers.get() == TERMINATED) {
return error;
Expand Down
18 changes: 10 additions & 8 deletions src/main/java/io/reactivex/subjects/MaybeSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,20 @@
* Example usage:
* <pre><code>
* MaybeSubject&lt;Integer&gt; subject1 = MaybeSubject.create();
*
*
* TestObserver&lt;Integer&gt; to1 = subject1.test();
*
*
* // MaybeSubjects are empty by default
* to1.assertEmpty();
*
*
* subject1.onSuccess(1);
*
*
* // onSuccess is a terminal event with MaybeSubjects
* // TestObserver converts onSuccess into onNext + onComplete
* to1.assertResult(1);
*
* TestObserver&lt;Integer&gt; to2 = subject1.test();
*
*
* // late Observers receive the terminal signal (onSuccess) too
* to2.assertResult(1);
*
Expand All @@ -94,14 +94,14 @@
* MaybeSubject&lt;Integer&gt; subject2 = MaybeSubject.create();
*
* TestObserver&lt;Integer&gt; to3 = subject2.test();
*
*
* subject2.onComplete();
*
*
* // a completed MaybeSubject completes its MaybeObservers
* to3.assertResult();
*
* TestObserver&lt;Integer&gt; to4 = subject1.test();
*
*
* // late Observers receive the terminal signal (onComplete) too
* to4.assertResult();
* </code></pre>
Expand Down Expand Up @@ -263,6 +263,7 @@ void remove(MaybeDisposable<T> inner) {
* Returns the success value if this MaybeSubject was terminated with a success value.
* @return the success value or null
*/
@Nullable
public T getValue() {
if (observers.get() == TERMINATED) {
return value;
Expand All @@ -282,6 +283,7 @@ public boolean hasValue() {
* Returns the terminal error if this MaybeSubject has been terminated with an error, null otherwise.
* @return the terminal error or null if not terminated or not with an error
*/
@Nullable
public Throwable getThrowable() {
if (observers.get() == TERMINATED) {
return error;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.subjects;

import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.Nullable;
import java.util.concurrent.atomic.*;

import io.reactivex.Observer;
Expand Down Expand Up @@ -263,6 +264,7 @@ public boolean hasObservers() {
}

@Override
@Nullable
public Throwable getThrowable() {
if (subscribers.get() == TERMINATED) {
return error;
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -395,6 +396,7 @@ public boolean hasObservers() {
}

@Override
@Nullable
public Throwable getThrowable() {
Object o = buffer.get();
if (NotificationLite.isError(o)) {
Expand All @@ -408,6 +410,7 @@ public Throwable getThrowable() {
* <p>The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
*/
@Nullable
public T getValue() {
return buffer.getValue();
}
Expand Down Expand Up @@ -542,6 +545,7 @@ interface ReplayBuffer<T> {

int size();

@Nullable
T getValue();

T[] getValues(T[] array);
Expand Down Expand Up @@ -620,6 +624,7 @@ public void addFinal(Object notificationLite) {
}

@Override
@Nullable
@SuppressWarnings("unchecked")
public T getValue() {
int s = size;
Expand Down Expand Up @@ -838,6 +843,7 @@ public void addFinal(Object notificationLite) {
}

@Override
@Nullable
@SuppressWarnings("unchecked")
public T getValue() {
Node<Object> prev = null;
Expand Down Expand Up @@ -1080,6 +1086,7 @@ public void addFinal(Object notificationLite) {
}

@Override
@Nullable
@SuppressWarnings("unchecked")
public T getValue() {
TimedNode<Object> prev = null;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/SerializedSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.subjects;

import io.reactivex.Observer;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.util.*;
import io.reactivex.internal.util.AppendOnlyLinkedArrayList.NonThrowingPredicate;
Expand Down Expand Up @@ -193,6 +194,7 @@ public boolean hasThrowable() {
}

@Override
@Nullable
public Throwable getThrowable() {
return actual.getThrowable();
}
Expand Down
25 changes: 13 additions & 12 deletions src/main/java/io/reactivex/subjects/UnicastSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,37 +105,37 @@
* Example usage:
* <pre><code>
* UnicastSubject&lt;Integer&gt; subject = UnicastSubject.create();
*
*
* TestObserver&lt;Integer&gt; to1 = subject.test();
*
*
* // fresh UnicastSubjects are empty
* to1.assertEmpty();
*
*
* TestObserver&lt;Integer&gt; to2 = subject.test();
*
*
* // A UnicastSubject only allows one Observer during its lifetime
* to2.assertFailure(IllegalStateException.class);
*
*
* subject.onNext(1);
* to1.assertValue(1);
*
*
* subject.onNext(2);
* to1.assertValues(1, 2);
*
*
* subject.onComplete();
* to1.assertResult(1, 2);
*
*
* // ----------------------------------------------------
*
*
* UnicastSubject&lt;Integer&gt; subject2 = UnicastSubject.create();
*
*
* // a UnicastSubject caches events util its single Observer subscribes
* subject.onNext(1);
* subject.onNext(2);
* subject.onComplete();
*
*
* TestObserver&lt;Integer&gt; to3 = subject2.test();
*
*
* // the cached events are emitted in order
* to3.assertResult(1, 2);
* </code></pre>
Expand Down Expand Up @@ -498,6 +498,7 @@ public boolean hasObservers() {
}

@Override
@Nullable
public Throwable getThrowable() {
if (done) {
return error;
Expand Down