Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add withLatestFrom many, cleanups and other enhancements #4368

Merged
merged 1 commit into from
Aug 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public static Completable create(CompletableSource source) {
} catch (NullPointerException ex) { // NOPMD
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
throw toNpe(ex);
}
Expand Down Expand Up @@ -219,6 +220,7 @@ public static Completable unsafeCreate(CompletableSource source) {
} catch (NullPointerException ex) { // NOPMD
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
throw toNpe(ex);
}
Expand Down Expand Up @@ -1384,6 +1386,7 @@ public final void subscribe(CompletableObserver s) {
} catch (NullPointerException ex) { // NOPMD
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
throw toNpe(ex);
}
Expand Down Expand Up @@ -1606,6 +1609,7 @@ public final <U> U to(Function<? super Completable, U> converter) {
try {
return converter.apply(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw Exceptions.propagate(ex);
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/reactivex/CompletableObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
* Represents the subscription API callbacks when subscribing to a Completable instance.
*/
public interface CompletableObserver {
/**
* Called once by the Completable to set a Disposable on this instance which
* then can be used to cancel the subscription at any time.
* @param d the Disposable instance to call dispose on for cancellation, not null
*/
void onSubscribe(Disposable d);

/**
* Called once the deferred computation completes normally.
*/
Expand All @@ -29,11 +36,4 @@ public interface CompletableObserver {
* @param e the exception, not null.
*/
void onError(Throwable e);

/**
* Called once by the Completable to set a Disposable on this instance which
* then can be used to cancel the subscription at any time.
* @param d the Disposable instance to call dispose on for cancellation, not null
*/
void onSubscribe(Disposable d);
}
229 changes: 110 additions & 119 deletions src/main/java/io/reactivex/Flowable.java

Large diffs are not rendered by default.

193 changes: 109 additions & 84 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

50 changes: 49 additions & 1 deletion src/main/java/io/reactivex/Observer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,62 @@

import io.reactivex.disposables.Disposable;

/**
* Provides a mechanism for receiving push-based notifications.
* <p>
* After an Observer calls an {@link Observable}'s {@link Observable#subscribe subscribe} method,
* first the Observable calls {@link #onSubscribe(Disposable)} with a {@link Disposable} that allows
* cancelling the sequence at any time, then the
* {@code Observable} may call the Observer's {@link #onNext} method any number of times
* to provide notifications. A well-behaved
* {@code Observable} will call an Observer's {@link #onComplete} method exactly once or the Observer's
* {@link #onError} method exactly once.
*
* @see <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation: Observable</a>
* @param <T>
* the type of item the Observer expects to observe
*/
public interface Observer<T> {


/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(Disposable d);

/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param value
* the item emitted by the Observable
*/
void onNext(T value);

/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(Throwable e);

/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();

}
8 changes: 5 additions & 3 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

import java.util.concurrent.TimeUnit;

import io.reactivex.disposables.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.plugins.RxJavaPlugins;

public abstract class Scheduler {
Expand Down Expand Up @@ -106,9 +107,9 @@ public Disposable schedule(Runnable run) {
}

public Disposable schedulePeriodically(Runnable run, final long initialDelay, final long period, final TimeUnit unit) {
final SerialDisposable first = new SerialDisposable();
final SequentialDisposable first = new SequentialDisposable();

final SerialDisposable sd = new SerialDisposable(first);
final SequentialDisposable sd = new SequentialDisposable(first);

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

Expand Down Expand Up @@ -182,6 +183,7 @@ public void run() {
try {
run.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
worker.dispose();
throw Exceptions.propagate(ex);
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2783,6 +2783,7 @@ public final <R> R to(Function<? super Single<T>, R> convert) {
try {
return convert.apply(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw Exceptions.propagate(ex);
}
}
Expand Down
39 changes: 39 additions & 0 deletions src/main/java/io/reactivex/SingleObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,50 @@

import io.reactivex.disposables.Disposable;

/**
* Provides a mechanism for receiving push-based notifications.
* <p>
* After a SingleSubscriber calls a {@link Single}'s {@link Single#subscribe subscribe} method,
* first the Single calls {@link #onSubscribe(Disposable)} with a {@link Disposable} that allows
* cancelling the sequence at any time, then the
* {@code Single} calls only one of the SingleSubscriber's {@link #onSuccess} and {@link #onError} methods to provide
* notifications.
*
* @see <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation: Observable</a>
* @param <T>
* the type of item the SingleSubscriber expects to observe
* @since 2.0
*/
public interface SingleObserver<T> {

/**
* Provides the SingleObserver with the means of cancelling (disposing) the
* connection (channel) with the Single in both
* synchronous (from within {@link #onSubscribe(Disposable)} itself) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(Disposable d);

/**
* Notifies the SingleSubscriber with a single item and that the {@link Single} has finished sending
* push-based notifications.
* <p>
* The {@link Single} will not call this method if it calls {@link #onError}.
*
* @param value
* the item emitted by the Single
*/
void onSuccess(T value);

/**
* Notifies the SingleSubscriber that the {@link Single} has experienced an error condition.
* <p>
* If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.
*
* @param e
* the exception encountered by the Single
*/
void onError(Throwable e);
}
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/annotations/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Annotations for indicating experimental and beta operators, classes, methods, types or fields.
*/
package io.reactivex.annotations;
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ protected void onDisposed(Action value) {
try {
value.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw Exceptions.propagate(ex);
}
}
Expand Down
33 changes: 32 additions & 1 deletion src/main/java/io/reactivex/disposables/CompositeDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@
import io.reactivex.internal.util.OpenHashSet;

/**
* A disposable container that can hold onto multiple other disposables.
* A disposable container that can hold onto multiple other disposables and
* offers O(1) add and removal complexity.
*/
public final class CompositeDisposable implements Disposable, DisposableContainer {

OpenHashSet<Disposable> resources;

volatile boolean disposed;

/**
* Creates an empty CompositeDisposable.
*/
public CompositeDisposable() {
}

/**
* Creates a CompositeDisposables with the given array of initial elements.
* @param resources the array of Disposables to start with
*/
public CompositeDisposable(Disposable... resources) {
Objects.requireNonNull(resources, "resources is null");
this.resources = new OpenHashSet<Disposable>(resources.length + 1);
Expand All @@ -40,6 +48,10 @@ public CompositeDisposable(Disposable... resources) {
}
}

/**
* Creates a CompositeDisposables with the given Iterable sequence of initial elements.
* @param resources the Iterable sequence of Disposables to start with
*/
public CompositeDisposable(Iterable<? extends Disposable> resources) {
Objects.requireNonNull(resources, "resources is null");
for (Disposable d : resources) {
Expand Down Expand Up @@ -91,6 +103,12 @@ public boolean add(Disposable d) {
return false;
}

/**
* Atomically adds the givel array of Disposables to the container or
* disposes them all if the container has been disposed.
* @param ds the array of Disposables
* @return true if the operation was successful, false if the container has been disposed
*/
public boolean addAll(Disposable... ds) {
Objects.requireNonNull(ds, "ds is null");
if (!disposed) {
Expand Down Expand Up @@ -143,6 +161,9 @@ public boolean delete(Disposable d) {
return true;
}

/**
* Atomically clears the container, then disposes all the previously contained Disposables.
*/
public void clear() {
if (disposed) {
return;
Expand All @@ -160,6 +181,10 @@ public void clear() {
dispose(set);
}

/**
* Returns the number of currently held Disposables.
* @return the number of currently held Disposables
*/
public int size() {
if (disposed) {
return 0;
Expand All @@ -172,6 +197,11 @@ public int size() {
}
}

/**
* Dispose the contents of the OpenHashSet by suppressing non-fatal
* Throwables till the end.
* @param set the OpenHashSet to dispose elements of
*/
void dispose(OpenHashSet<Disposable> set) {
if (set == null) {
return;
Expand All @@ -183,6 +213,7 @@ void dispose(OpenHashSet<Disposable> set) {
try {
((Disposable) o).dispose();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (errors == null) {
errors = new ArrayList<Throwable>();
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/reactivex/disposables/FutureDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import java.util.concurrent.Future;

/**
* A Disposable container that cancels a Future instance.
*/
final class FutureDisposable extends ReferenceDisposable<Future<?>> {
/** */
private static final long serialVersionUID = 6545242830671168775L;
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/reactivex/disposables/RefCountDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.Objects;

/**
* Keeps track of the sub-subscriptions and unsubscribes the underlying subscription once all sub-subscriptions
* have unsubscribed.
*/
public final class RefCountDisposable implements Disposable {

final AtomicReference<Disposable> resource = new AtomicReference<Disposable>();
Expand All @@ -26,6 +30,14 @@ public final class RefCountDisposable implements Disposable {

final AtomicBoolean once = new AtomicBoolean();

/**
* Creates a {@code RefCountDisposable} by wrapping the given non-null {@code Subscription}.
*
* @param resource
* the {@link Disposable} to wrap
* @throws NullPointerException
* if {@code s} is {@code null}
*/
public RefCountDisposable(Disposable resource) {
Objects.requireNonNull(resource, "resource is null");
this.resource.lazySet(resource);
Expand All @@ -39,6 +51,11 @@ public void dispose() {
}
}

/**
* Returns a new sub-Disposable
*
* @return a new sub-Disposable.
*/
public Disposable get() {
count.getAndIncrement();
return new InnerDisposable(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

import io.reactivex.internal.functions.Objects;

/**
* Base class for Disposable containers that manage some other type that
* has to be run when the container is disposed.
*
* @param <T> the type contained
*/
abstract class ReferenceDisposable<T> extends AtomicReference<T> implements Disposable {
/** */
private static final long serialVersionUID = 6537757548749041217L;
Expand Down
Loading