From 55556414235ebbcedd12bc45a8cb706e57f5a19f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 20 Jun 2016 21:58:25 +0200 Subject: [PATCH] 2.x: Single operators factored out, headers added --- .../io/reactivex/CompletableSubscriber.java | 13 + src/main/java/io/reactivex/Single.java | 1055 +---------------- .../java/io/reactivex/SingleSubscriber.java | 14 +- .../disposables/DisposableHelper.java | 13 + .../completable/CompletableAmbIterable.java | 13 + .../completable/CompletableAwait.java | 13 + .../completable/CompletableDefer.java | 13 + .../completable/CompletableDelay.java | 13 + .../completable/CompletableError.java | 13 + .../completable/CompletableErrorSupplier.java | 13 + .../completable/CompletableFromCallable.java | 13 + .../completable/CompletableFromFlowable.java | 13 + .../CompletableFromObservable.java | 13 + .../completable/CompletableFromRunnable.java | 13 + .../completable/CompletableFromSingle.java | 13 + .../completable/CompletableLift.java | 13 + .../completable/CompletableObserveOn.java | 13 + .../CompletableOnErrorComplete.java | 13 + .../completable/CompletablePeek.java | 13 + .../completable/CompletableResumeNext.java | 13 + .../completable/CompletableSubscribeOn.java | 13 + .../completable/CompletableTimer.java | 13 + .../completable/CompletableToFlowable.java | 13 + .../completable/CompletableToObservable.java | 13 + .../completable/CompletableToSingle.java | 13 + .../completable/CompletableUnsubscribeOn.java | 13 + .../completable/CompletableUsing.java | 13 + .../completable/CompletableWrapper.java | 13 + .../operators/flowable/FlowableWrapper.java | 13 + .../observable/ObservableWrapper.java | 13 + .../operators/single/SingleAmbArray.java | 80 ++ .../operators/single/SingleAmbIterable.java | 125 ++ .../operators/single/SingleAwait.java | 61 + .../operators/single/SingleCache.java | 109 ++ .../operators/single/SingleContains.java | 57 + .../operators/single/SingleDefer.java | 49 + .../operators/single/SingleDelay.java | 65 + .../operators/single/SingleDoOnCancel.java | 54 + .../operators/single/SingleDoOnError.java | 59 + .../operators/single/SingleDoOnSubscribe.java | 73 ++ .../operators/single/SingleDoOnSuccess.java | 59 + .../operators/single/SingleEquals.java | 82 ++ .../operators/single/SingleError.java | 46 + ...peratorFlatMap.java => SingleFlatMap.java} | 4 +- .../operators/single/SingleFromCallable.java | 45 + .../operators/single/SingleFromPublisher.java | 67 ++ .../internal/operators/single/SingleHide.java | 67 ++ .../internal/operators/single/SingleJust.java | 33 + .../internal/operators/single/SingleLift.java | 50 + ...{SingleOperatorMap.java => SingleMap.java} | 16 +- .../operators/single/SingleNever.java | 26 + .../operators/single/SingleObserveOn.java | 66 ++ .../operators/single/SingleOnErrorReturn.java | 79 ++ .../operators/single/SingleResumeNext.java | 91 ++ .../operators/single/SingleSubscribeOn.java | 41 + .../operators/single/SingleTimeout.java | 114 ++ .../operators/single/SingleTimer.java | 47 + .../operators/single/SingleUsing.java | 134 +++ .../operators/single/SingleWrapper.java | 13 + .../CallbackCompletableSubscriber.java | 13 + .../EmptyCompletableSubscriber.java | 13 + .../ObserverCompletableSubscriber.java | 13 + .../SubscriberCompletableSubscriber.java | 13 + .../single/BiConsumerSingleSubscriber.java | 54 + .../single/ConsumerSingleSubscriber.java | 58 + 65 files changed, 2387 insertions(+), 1022 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleAmbArray.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleAwait.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleCache.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleContains.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDefer.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDelay.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDoOnCancel.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDoOnSubscribe.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDoOnSuccess.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleEquals.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleError.java rename src/main/java/io/reactivex/internal/operators/single/{SingleOperatorFlatMap.java => SingleFlatMap.java} (93%) create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleFromPublisher.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleHide.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleJust.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleLift.java rename src/main/java/io/reactivex/internal/operators/single/{SingleOperatorMap.java => SingleMap.java} (77%) create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleNever.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleObserveOn.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleSubscribeOn.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleTimer.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleUsing.java create mode 100644 src/main/java/io/reactivex/internal/subscribers/single/BiConsumerSingleSubscriber.java create mode 100644 src/main/java/io/reactivex/internal/subscribers/single/ConsumerSingleSubscriber.java diff --git a/src/main/java/io/reactivex/CompletableSubscriber.java b/src/main/java/io/reactivex/CompletableSubscriber.java index 7ec4774d61..18c662ee90 100644 --- a/src/main/java/io/reactivex/CompletableSubscriber.java +++ b/src/main/java/io/reactivex/CompletableSubscriber.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex; import io.reactivex.disposables.Disposable; diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 1223c1c466..b3ec304b07 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -15,18 +15,15 @@ import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; import org.reactivestreams.*; -import io.reactivex.disposables.*; -import io.reactivex.exceptions.CompositeException; +import io.reactivex.disposables.Disposable; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.functions.Objects; import io.reactivex.internal.operators.single.*; -import io.reactivex.internal.util.*; +import io.reactivex.internal.subscribers.single.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; @@ -55,101 +52,7 @@ static Single wrap(SingleConsumable source) { public static Single amb(final Iterable> sources) { Objects.requireNonNull(sources, "sources is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - final AtomicBoolean once = new AtomicBoolean(); - final CompositeDisposable set = new CompositeDisposable(); - s.onSubscribe(set); - - int c = 0; - Iterator> iterator; - - try { - iterator = sources.iterator(); - } catch (Throwable e) { - s.onError(e); - return; - } - - if (iterator == null) { - s.onError(new NullPointerException("The iterator returned is null")); - return; - } - for (;;) { - if (once.get()) { - return; - } - - boolean b; - - try { - b = iterator.hasNext(); - } catch (Throwable e) { - s.onError(e); - return; - } - - if (once.get()) { - return; - } - - if (!b) { - break; - } - - SingleConsumable s1; - - if (once.get()) { - return; - } - - try { - s1 = iterator.next(); - } catch (Throwable e) { - set.dispose(); - s.onError(e); - return; - } - - if (s1 == null) { - set.dispose(); - s.onError(new NullPointerException("The single source returned by the iterator is null")); - return; - } - - s1.subscribe(new SingleSubscriber() { - - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - @Override - public void onSuccess(T value) { - if (once.compareAndSet(false, true)) { - s.onSuccess(value); - } - } - - @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } - - }); - c++; - } - - if (c == 0 && !set.isDisposed()) { - s.onError(new NoSuchElementException()); - } - } - }); + return new SingleAmbIterable(sources); } @SuppressWarnings("unchecked") @@ -165,63 +68,14 @@ public Throwable get() { if (sources.length == 1) { return wrap((SingleConsumable)sources[0]); } - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - final AtomicBoolean once = new AtomicBoolean(); - final CompositeDisposable set = new CompositeDisposable(); - s.onSubscribe(set); - - for (SingleConsumable s1 : sources) { - if (once.get()) { - return; - } - - if (s1 == null) { - set.dispose(); - Throwable e = new NullPointerException("One of the sources is null"); - if (once.compareAndSet(false, true)) { - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - return; - } - - s1.subscribe(new SingleSubscriber() { - - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - @Override - public void onSuccess(T value) { - if (once.compareAndSet(false, true)) { - s.onSuccess(value); - } - } - - @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } - - }); - } - } - }); + return new SingleAmbArray(sources); } public static Flowable concat(Iterable> sources) { return concat(Flowable.fromIterable(sources)); } - public static Flowable concat(Flowable> sources) { + public static Flowable concat(Flowable> sources) { // FIXME Publisher return sources.concatMap(new Function, Publisher>() { @Override public Publisher apply(SingleConsumable v){ @@ -354,51 +208,12 @@ public static Single create(SingleConsumable onSubscribe) { public static Single defer(final Supplier> singleSupplier) { Objects.requireNonNull(singleSupplier, "singleSupplier is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(SingleSubscriber s) { - SingleConsumable next; - - try { - next = singleSupplier.get(); - } catch (Throwable e) { - s.onSubscribe(EmptyDisposable.INSTANCE); - s.onError(e); - return; - } - - if (next == null) { - s.onSubscribe(EmptyDisposable.INSTANCE); - s.onError(new NullPointerException("The Single supplied was null")); - return; - } - - next.subscribe(s); - } - }); + return new SingleDefer(singleSupplier); } public static Single error(final Supplier errorSupplier) { Objects.requireNonNull(errorSupplier, "errorSupplier is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(SingleSubscriber s) { - Throwable error; - - try { - error = errorSupplier.get(); - } catch (Throwable e) { - error = e; - } - - if (error == null) { - error = new NullPointerException(); - } - - s.onSubscribe(EmptyDisposable.INSTANCE); - s.onError(error); - } - }); + return new SingleError(errorSupplier); } public static Single error(final Throwable error) { @@ -413,22 +228,7 @@ public Throwable get() { public static Single fromCallable(final Callable callable) { Objects.requireNonNull(callable, "callable is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(SingleSubscriber s) { - s.onSubscribe(EmptyDisposable.INSTANCE); - try { - T v = callable.call(); - if (v != null) { - s.onSuccess(v); - } else { - s.onError(new NullPointerException()); - } - } catch (Throwable e) { - s.onError(e); - } - } - }); + return new SingleFromCallable(callable); } public static Single fromFuture(Future future) { @@ -449,60 +249,19 @@ public static Single fromFuture(Future future, Scheduler sch public static Single fromPublisher(final Publisher publisher) { Objects.requireNonNull(publisher, "publisher is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - publisher.subscribe(new Subscriber() { - T value; - @Override - public void onComplete() { - T v = value; - value = null; - if (v != null) { - s.onSuccess(v); - } else { - s.onError(new NoSuchElementException()); - } - } - - @Override - public void onError(Throwable t) { - value = null; - s.onError(t); - } - - @Override - public void onNext(T t) { - value = t; - } - - @Override - public void onSubscribe(Subscription inner) { - s.onSubscribe(Disposables.from(inner)); - inner.request(Long.MAX_VALUE); - } - - }); - } - }); + return new SingleFromPublisher(publisher); } public static Single just(final T value) { Objects.requireNonNull(value, "value is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(SingleSubscriber s) { - s.onSubscribe(EmptyDisposable.INSTANCE); - s.onSuccess(value); - } - }); + return new SingleJust(value); } public static Flowable merge(Iterable> sources) { return merge(Flowable.fromIterable(sources)); } - public static Flowable merge(Flowable> sources) { + public static Flowable merge(Flowable> sources) { // FIXME Publisher return sources.flatMap(new Function, Publisher>() { @Override public Publisher apply(SingleConsumable v){ @@ -514,7 +273,7 @@ public Publisher apply(SingleConsumable v){ @SuppressWarnings({ "unchecked", "rawtypes" }) public static Single merge(SingleConsumable> source) { Objects.requireNonNull(source, "source is null"); - return new SingleOperatorFlatMap, T>(source, (Function)Functions.identity()); + return new SingleFlatMap, T>(source, (Function)Functions.identity()); } @SuppressWarnings("unchecked") @@ -633,12 +392,7 @@ public static Flowable merge( return merge(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9)); } - static final Single NEVER = create(new SingleConsumable() { - @Override - public void subscribe(SingleSubscriber s) { - s.onSubscribe(EmptyDisposable.INSTANCE); - } - }); + static final Single NEVER = new SingleNever(); @SuppressWarnings("unchecked") public static Single never() { @@ -652,75 +406,13 @@ public static Single timer(long delay, TimeUnit unit) { public static Single timer(final long delay, final TimeUnit unit, final Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); - - s.onSubscribe(mad); - - mad.set(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onSuccess(0L); - } - }, delay, unit)); - } - }); + return new SingleTimer(delay, unit, scheduler); } public static Single equals(final SingleConsumable first, final SingleConsumable second) { Objects.requireNonNull(first, "first is null"); Objects.requireNonNull(second, "second is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - final AtomicInteger count = new AtomicInteger(); - final Object[] values = { null, null }; - - final CompositeDisposable set = new CompositeDisposable(); - s.onSubscribe(set); - - class InnerSubscriber implements SingleSubscriber { - final int index; - public InnerSubscriber(int index) { - this.index = index; - } - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - @Override - public void onSuccess(T value) { - values[index] = value; - - if (count.incrementAndGet() == 2) { - s.onSuccess(Objects.equals(values[0], values[1])); - } - } - - @Override - public void onError(Throwable e) { - for (;;) { - int state = count.get(); - if (state >= 2) { - RxJavaPlugins.onError(e); - return; - } - if (count.compareAndSet(state, 2)) { - s.onError(e); - return; - } - } - } - - } - - first.subscribe(new InnerSubscriber(0)); - second.subscribe(new InnerSubscriber(1)); - } - }); + return new SingleEquals(first, second); } public static Single using(Supplier resourceSupplier, @@ -737,99 +429,7 @@ public static Single using( Objects.requireNonNull(singleFunction, "singleFunction is null"); Objects.requireNonNull(disposer, "disposer is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - final U resource; - - try { - resource = resourceSupplier.get(); - } catch (Throwable ex) { - s.onSubscribe(EmptyDisposable.INSTANCE); - s.onError(ex); - return; - } - - SingleConsumable s1; - - try { - s1 = singleFunction.apply(resource); - } catch (Throwable ex) { - s.onSubscribe(EmptyDisposable.INSTANCE); - s.onError(ex); - return; - } - - if (s1 == null) { - s.onSubscribe(EmptyDisposable.INSTANCE); - s.onError(new NullPointerException("The Single supplied by the function was null")); - return; - } - - s1.subscribe(new SingleSubscriber() { - - @Override - public void onSubscribe(Disposable d) { - if (eager) { - CompositeDisposable set = new CompositeDisposable(); - set.add(d); - set.add(new Disposable() { - @Override - public void dispose() { - try { - disposer.accept(resource); - } catch (Throwable e) { - RxJavaPlugins.onError(e); - } - } - }); - } else { - s.onSubscribe(d); - } - } - - @Override - public void onSuccess(T value) { - if (eager) { - try { - disposer.accept(resource); - } catch (Throwable e) { - s.onError(e); - return; - } - } - s.onSuccess(value); - if (!eager) { - try { - disposer.accept(resource); - } catch (Throwable e) { - RxJavaPlugins.onError(e); - } - } - } - - @Override - public void onError(Throwable e) { - if (eager) { - try { - disposer.accept(resource); - } catch (Throwable ex) { - e = new CompositeException(ex, e); - } - } - s.onError(e); - if (!eager) { - try { - disposer.accept(resource); - } catch (Throwable ex) { - RxJavaPlugins.onError(ex); - } - } - } - - }); - } - }); + return new SingleUsing(resourceSupplier, singleFunction, disposer, eager); } public static Single zip(final Iterable> sources, Function zipper) { @@ -1005,12 +605,7 @@ public final Single ambWith(SingleConsumable other) { } public final Single asSingle() { - return create(new SingleConsumable() { - @Override - public void subscribe(SingleSubscriber s) { - subscribe(s); - } - }); + return new SingleHide(this); } public final Single compose(Function, ? extends SingleConsumable> convert) { @@ -1018,112 +613,15 @@ public final Single compose(Function, ? extends SingleC } public final Single cache() { - final AtomicInteger wip = new AtomicInteger(); - final AtomicReference notification = new AtomicReference(); - final List> subscribers = new ArrayList>(); - - return create(new SingleConsumable() { - @Override - public void subscribe(SingleSubscriber s) { - Object o = notification.get(); - if (o != null) { - s.onSubscribe(EmptyDisposable.INSTANCE); - if (NotificationLite.isError(o)) { - s.onError(NotificationLite.getError(o)); - } else { - s.onSuccess(NotificationLite.getValue(o)); - } - return; - } - - synchronized (subscribers) { - o = notification.get(); - if (o == null) { - subscribers.add(s); - } - } - if (o != null) { - s.onSubscribe(EmptyDisposable.INSTANCE); - if (NotificationLite.isError(o)) { - s.onError(NotificationLite.getError(o)); - } else { - s.onSuccess(NotificationLite.getValue(o)); - } - return; - } - - if (wip.getAndIncrement() != 0) { - return; - } - - subscribe(new SingleSubscriber() { - - @Override - public void onSubscribe(Disposable d) { - - } - - @Override - public void onSuccess(T value) { - notification.set(NotificationLite.next(value)); - List> list; - synchronized (subscribers) { - list = new ArrayList>(subscribers); - subscribers.clear(); - } - for (SingleSubscriber s1 : list) { - s1.onSuccess(value); - } - } - - @Override - public void onError(Throwable e) { - notification.set(NotificationLite.error(e)); - List> list; - synchronized (subscribers) { - list = new ArrayList>(subscribers); - subscribers.clear(); - } - for (SingleSubscriber s1 : list) { - s1.onError(e); - } - } - - }); - } - }); + return new SingleCache(this); } public final Single cast(final Class clazz) { Objects.requireNonNull(clazz, "clazz is null"); - return create(new SingleConsumable() { + return map(new Function() { @Override - public void subscribe(final SingleSubscriber s) { - Single.this.subscribe(new SingleSubscriber() { - - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } - - @Override - public void onSuccess(T value) { - U v; - try { - v = clazz.cast(value); - } catch (ClassCastException ex) { - s.onError(ex); - return; - } - s.onSuccess(v); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - } - - }); + public U apply(T v) { + return clazz.cast(v); } }); } @@ -1139,176 +637,32 @@ public final Single delay(long time, TimeUnit unit) { public final Single delay(final long time, final TimeUnit unit, final Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - final MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); - s.onSubscribe(mad); - subscribe(new SingleSubscriber() { - @Override - public void onSubscribe(Disposable d) { - mad.set(d); - } - - @Override - public void onSuccess(final T value) { - mad.set(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onSuccess(value); - } - }, time, unit)); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - } - - }); - } - }); + return new SingleDelay(this, time, unit, scheduler); } public final Single doOnSubscribe(final Consumer onSubscribe) { Objects.requireNonNull(onSubscribe, "onSubscribe is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - subscribe(new SingleSubscriber() { - boolean done; - @Override - public void onSubscribe(Disposable d) { - try { - onSubscribe.accept(d); - } catch (Throwable ex) { - done = true; - d.dispose(); - s.onSubscribe(EmptyDisposable.INSTANCE); - s.onError(ex); - return; - } - - s.onSubscribe(d); - } - - @Override - public void onSuccess(T value) { - if (done) { - return; - } - s.onSuccess(value); - } - - @Override - public void onError(Throwable e) { - if (done) { - RxJavaPlugins.onError(e); - return; - } - s.onError(e); - } - - }); - } - }); + return new SingleDoOnSubscribe(this, onSubscribe); } public final Single doOnSuccess(final Consumer onSuccess) { Objects.requireNonNull(onSuccess, "onSuccess is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - subscribe(new SingleSubscriber() { - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } - - @Override - public void onSuccess(T value) { - try { - onSuccess.accept(value); - } catch (Throwable ex) { - s.onError(ex); - return; - } - s.onSuccess(value); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - } - - }); - } - }); + return new SingleDoOnSuccess(this, onSuccess); } public final Single doOnError(final Consumer onError) { Objects.requireNonNull(onError, "onError is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - subscribe(new SingleSubscriber() { - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } - - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } - - @Override - public void onError(Throwable e) { - try { - onError.accept(e); - } catch (Throwable ex) { - e = new CompositeException(ex, e); - } - s.onError(e); - } - - }); - } - }); + return new SingleDoOnError(this, onError); } public final Single doOnCancel(final Runnable onCancel) { Objects.requireNonNull(onCancel, "onCancel is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - subscribe(new SingleSubscriber() { - @Override - public void onSubscribe(Disposable d) { - CompositeDisposable set = new CompositeDisposable(); - set.add(Disposables.from(onCancel)); - set.add(d); - s.onSubscribe(set); - } - - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - } - - }); - } - }); + return new SingleDoOnCancel(this, onCancel); } public final Single flatMap(Function> mapper) { Objects.requireNonNull(mapper, "mapper is null"); - return new SingleOperatorFlatMap(this, mapper); + return new SingleFlatMap(this, mapper); } public final Flowable flatMapPublisher(Function> mapper) { @@ -1316,68 +670,16 @@ public final Flowable flatMapPublisher(Function valueRef = new AtomicReference(); - final AtomicReference errorRef = new AtomicReference(); - final CountDownLatch cdl = new CountDownLatch(1); - - subscribe(new SingleSubscriber() { - @Override - public void onError(Throwable e) { - errorRef.lazySet(e); - cdl.countDown(); - } - - @Override - public void onSubscribe(Disposable d) { - } - @Override - public void onSuccess(T value) { - valueRef.lazySet(value); - cdl.countDown(); - } - }); - - if (cdl.getCount() != 0L) { - try { - cdl.await(); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - Throwable e = errorRef.get(); - if (e != null) { - throw Exceptions.propagate(e); - } - return valueRef.get(); + return SingleAwait.get(this); } public final Single lift(final SingleOperator onLift) { Objects.requireNonNull(onLift, "onLift is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(SingleSubscriber s) { - try { - SingleSubscriber sr = onLift.apply(s); - - if (sr == null) { - throw new NullPointerException("The onLift returned a null subscriber"); - } - // TODO plugin wrapper - Single.this.subscribe(sr); - } catch (NullPointerException ex) { - throw ex; - } catch (Throwable ex) { - RxJavaPlugins.onError(ex); - NullPointerException npe = new NullPointerException("Not really but can't throw other than NPE"); - npe.initCause(ex); - throw npe; - } - } - }); + return new SingleLift(this, onLift); } public final Single map(Function mapper) { - return lift(new SingleOperatorMap(mapper)); + return new SingleMap(this, mapper); } public final Single contains(Object value) { @@ -1387,29 +689,7 @@ public final Single contains(Object value) { public final Single contains(final Object value, final BiPredicate comparer) { Objects.requireNonNull(value, "value is null"); Objects.requireNonNull(comparer, "comparer is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - Single.this.subscribe(new SingleSubscriber() { - - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } - - @Override - public void onSuccess(T v) { - s.onSuccess(comparer.test(v, value)); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - } - - }); - } - }); + return new SingleContains(this, value, comparer); } public final Flowable mergeWith(SingleConsumable other) { @@ -1422,159 +702,23 @@ public final Single> nest() { public final Single observeOn(final Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - final CompositeDisposable mad = new CompositeDisposable(); - s.onSubscribe(mad); - - Single.this.subscribe(new SingleSubscriber() { - - @Override - public void onError(final Throwable e) { - mad.add(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onError(e); - } - })); - } - - @Override - public void onSubscribe(Disposable d) { - mad.add(d); - } - - @Override - public void onSuccess(final T value) { - mad.add(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onSuccess(value); - } - })); - } - - }); - } - }); + return new SingleObserveOn(this, scheduler); } public final Single onErrorReturn(final Supplier valueSupplier) { Objects.requireNonNull(valueSupplier, "valueSupplier is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - Single.this.subscribe(new SingleSubscriber() { - - @Override - public void onError(Throwable e) { - T v; - - try { - v = valueSupplier.get(); - } catch (Throwable ex) { - s.onError(new CompositeException(ex, e)); - return; - } - - if (v == null) { - NullPointerException npe = new NullPointerException("Value supplied was null"); - npe.initCause(e); - s.onError(npe); - return; - } - - s.onSuccess(v); - } - - @Override - public void onSubscribe(Disposable d) { - s.onSubscribe(d); - } - - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } - - }); - } - }); + return new SingleOnErrorReturn(this, valueSupplier, null); } public final Single onErrorReturn(final T value) { Objects.requireNonNull(value, "value is null"); - return onErrorReturn(new Supplier() { - @Override - public T get() { - return value; - } - }); + return new SingleOnErrorReturn(this, null, value); } public final Single onErrorResumeNext( final Function> nextFunction) { Objects.requireNonNull(nextFunction, "nextFunction is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - final MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); - s.onSubscribe(mad); - - Single.this.subscribe(new SingleSubscriber() { - - @Override - public void onSubscribe(Disposable d) { - mad.set(d); - } - - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } - - @Override - public void onError(Throwable e) { - SingleConsumable next; - - try { - next = nextFunction.apply(e); - } catch (Throwable ex) { - s.onError(new CompositeException(ex, e)); - return; - } - - if (next == null) { - NullPointerException npe = new NullPointerException("The next Single supplied was null"); - npe.initCause(e); - s.onError(npe); - return; - } - - next.subscribe(new SingleSubscriber() { - - @Override - public void onSubscribe(Disposable d) { - mad.set(d); - } - - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - } - - }); - } - - }); - } - }); + return new SingleResumeNext(this, nextFunction); } public final Flowable repeat() { @@ -1624,26 +768,9 @@ public final Disposable subscribe() { public final Disposable subscribe(final BiConsumer onCallback) { Objects.requireNonNull(onCallback, "onCallback is null"); - final MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); - - subscribe(new SingleSubscriber() { - @Override - public void onError(Throwable e) { - onCallback.accept(null, e); - } - - @Override - public void onSubscribe(Disposable d) { - mad.set(d); - } - - @Override - public void onSuccess(T value) { - onCallback.accept(value, null); - } - }); - - return mad; + BiConsumerSingleSubscriber s = new BiConsumerSingleSubscriber(onCallback); + subscribe(s); + return s; } public final Disposable subscribe(Consumer onSuccess) { @@ -1654,26 +781,9 @@ public final Disposable subscribe(final Consumer onSuccess, final Con Objects.requireNonNull(onSuccess, "onSuccess is null"); Objects.requireNonNull(onError, "onError is null"); - final MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); - - subscribe(new SingleSubscriber() { - @Override - public void onError(Throwable e) { - onError.accept(e); - } - - @Override - public void onSubscribe(Disposable d) { - mad.set(d); - } - - @Override - public void onSuccess(T value) { - onSuccess.accept(value); - } - }); - - return mad; + ConsumerSingleSubscriber s = new ConsumerSingleSubscriber(onSuccess, onError); + subscribe(s); + return s; } @Override @@ -1691,17 +801,7 @@ public final void subscribe(Subscriber s) { public final Single subscribeOn(final Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - Single.this.subscribe(s); - } - }); - } - }); + return new SingleSubscribeOn(this, scheduler); } public final Single timeout(long timeout, TimeUnit unit) { @@ -1725,76 +825,7 @@ public final Single timeout(long timeout, TimeUnit unit, SingleConsumable timeout0(final long timeout, final TimeUnit unit, final Scheduler scheduler, final SingleConsumable other) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return create(new SingleConsumable() { - @Override - public void subscribe(final SingleSubscriber s) { - final CompositeDisposable set = new CompositeDisposable(); - s.onSubscribe(set); - - final AtomicBoolean once = new AtomicBoolean(); - - Disposable timer = scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - if (once.compareAndSet(false, true)) { - if (other != null) { - set.clear(); - other.subscribe(new SingleSubscriber() { - - @Override - public void onError(Throwable e) { - set.dispose(); - s.onError(e); - } - - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - @Override - public void onSuccess(T value) { - set.dispose(); - s.onSuccess(value); - } - - }); - } else { - set.dispose(); - s.onError(new TimeoutException()); - } - } - } - }, timeout, unit); - - set.add(timer); - - Single.this.subscribe(new SingleSubscriber() { - - @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onError(e); - } - } - - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - @Override - public void onSuccess(T value) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onSuccess(value); - } - } - - }); - } - }); + return new SingleTimeout(this, timeout, unit, scheduler, other); } public final R to(Function, R> convert) { diff --git a/src/main/java/io/reactivex/SingleSubscriber.java b/src/main/java/io/reactivex/SingleSubscriber.java index 894bc3b82d..64564af861 100644 --- a/src/main/java/io/reactivex/SingleSubscriber.java +++ b/src/main/java/io/reactivex/SingleSubscriber.java @@ -1,10 +1,22 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex; import io.reactivex.disposables.Disposable; public interface SingleSubscriber { - void onSubscribe(Disposable d); void onSuccess(T value); diff --git a/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java b/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java index d312a15696..991674b528 100644 --- a/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java +++ b/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.disposables; import java.util.concurrent.atomic.AtomicReference; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java index b3ba2e5c7a..3e6de49d73 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import java.util.Iterator; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAwait.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAwait.java index e759b2aaca..0fa1721300 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAwait.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAwait.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import java.util.concurrent.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableDefer.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableDefer.java index e6fcd7515c..176a3fedff 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableDefer.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableDefer.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java index 0192f8ab25..37330c28fe 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import java.util.concurrent.TimeUnit; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableError.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableError.java index 81350b9560..2a94cf8d82 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableError.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableError.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableErrorSupplier.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableErrorSupplier.java index 85b938f801..2c6dc9d02a 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableErrorSupplier.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableErrorSupplier.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromCallable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromCallable.java index fee16d439f..a82d8f2cdf 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromCallable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromCallable.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import java.util.concurrent.Callable; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromFlowable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromFlowable.java index ae07680cf7..6a79e9f577 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromFlowable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromFlowable.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import org.reactivestreams.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromObservable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromObservable.java index b0090f897e..d4135cc85f 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromObservable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromObservable.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java index 04bbfc7b26..a829ce836a 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSingle.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSingle.java index 058481c2b7..5f11a0debb 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSingle.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSingle.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java index 1ea997161c..46fbbac7f0 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableObserveOn.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableObserveOn.java index 1e08c3056b..cf03595848 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableObserveOn.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableOnErrorComplete.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableOnErrorComplete.java index ab4fd72fd4..1f47f27eaa 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableOnErrorComplete.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableOnErrorComplete.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java b/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java index d4f68dee62..0d019c307b 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java index 501b3a45dc..1fa7433c64 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableSubscribeOn.java index 497ee72878..f6a6f64aca 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableSubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableSubscribeOn.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableTimer.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableTimer.java index a6355434bf..a9534354be 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableTimer.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableTimer.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import java.util.concurrent.TimeUnit; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableToFlowable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableToFlowable.java index 64a1eecd88..eeb9f325df 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableToFlowable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableToFlowable.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableToObservable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableToObservable.java index a139d92200..6ade06a060 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableToObservable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableToObservable.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableToSingle.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableToSingle.java index 3caa756bd5..399de32dfb 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableToSingle.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableToSingle.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableUnsubscribeOn.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableUnsubscribeOn.java index fd8747c69e..3df50c7671 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableUnsubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableUnsubscribeOn.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java index a47dc15b78..796cf36e26 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableWrapper.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableWrapper.java index 1597870e00..191e2aba7f 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableWrapper.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableWrapper.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWrapper.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWrapper.java index 71b0a24ea2..afb472b22c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWrapper.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWrapper.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.flowable; import org.reactivestreams.*; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWrapper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWrapper.java index 7835c264a8..aa86f2224e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWrapper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWrapper.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.observable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAmbArray.java b/src/main/java/io/reactivex/internal/operators/single/SingleAmbArray.java new file mode 100644 index 0000000000..923bf1620f --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleAmbArray.java @@ -0,0 +1,80 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.plugins.RxJavaPlugins; + +public final class SingleAmbArray extends Single { + + final SingleConsumable[] sources; + + public SingleAmbArray(SingleConsumable[] sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + final AtomicBoolean once = new AtomicBoolean(); + final CompositeDisposable set = new CompositeDisposable(); + s.onSubscribe(set); + + for (SingleConsumable s1 : sources) { + if (once.get()) { + return; + } + + if (s1 == null) { + set.dispose(); + Throwable e = new NullPointerException("One of the sources is null"); + if (once.compareAndSet(false, true)) { + s.onError(e); + } else { + RxJavaPlugins.onError(e); + } + return; + } + + s1.subscribe(new SingleSubscriber() { + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (once.compareAndSet(false, true)) { + s.onSuccess(value); + } + } + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + s.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + }); + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java b/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java new file mode 100644 index 0000000000..6310365141 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java @@ -0,0 +1,125 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.plugins.RxJavaPlugins; + +public final class SingleAmbIterable extends Single { + + final Iterable> sources; + + public SingleAmbIterable(Iterable> sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + final AtomicBoolean once = new AtomicBoolean(); + final CompositeDisposable set = new CompositeDisposable(); + s.onSubscribe(set); + + int c = 0; + Iterator> iterator; + + try { + iterator = sources.iterator(); + } catch (Throwable e) { + s.onError(e); + return; + } + + if (iterator == null) { + s.onError(new NullPointerException("The iterator returned is null")); + return; + } + for (;;) { + if (once.get()) { + return; + } + + boolean b; + + try { + b = iterator.hasNext(); + } catch (Throwable e) { + s.onError(e); + return; + } + + if (once.get()) { + return; + } + + if (!b) { + break; + } + + SingleConsumable s1; + + if (once.get()) { + return; + } + + try { + s1 = iterator.next(); + } catch (Throwable e) { + set.dispose(); + s.onError(e); + return; + } + + if (s1 == null) { + set.dispose(); + s.onError(new NullPointerException("The single source returned by the iterator is null")); + return; + } + + s1.subscribe(new SingleSubscriber() { + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (once.compareAndSet(false, true)) { + s.onSuccess(value); + } + } + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + s.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + }); + c++; + } + + if (c == 0 && !set.isDisposed()) { + s.onError(new NoSuchElementException()); + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAwait.java b/src/main/java/io/reactivex/internal/operators/single/SingleAwait.java new file mode 100644 index 0000000000..fed8cb7c28 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleAwait.java @@ -0,0 +1,61 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.util.Exceptions; + +public enum SingleAwait { + ; + + public static T get(SingleConsumable source) { + final AtomicReference valueRef = new AtomicReference(); + final AtomicReference errorRef = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + + source.subscribe(new SingleSubscriber() { + @Override + public void onError(Throwable e) { + errorRef.lazySet(e); + cdl.countDown(); + } + + @Override + public void onSubscribe(Disposable d) { + } + @Override + public void onSuccess(T value) { + valueRef.lazySet(value); + cdl.countDown(); + } + }); + + if (cdl.getCount() != 0L) { + try { + cdl.await(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + Throwable e = errorRef.get(); + if (e != null) { + throw Exceptions.propagate(e); + } + return valueRef.get(); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleCache.java b/src/main/java/io/reactivex/internal/operators/single/SingleCache.java new file mode 100644 index 0000000000..61d76d8dbf --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleCache.java @@ -0,0 +1,109 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.util.NotificationLite; + +public final class SingleCache extends Single { + + final SingleConsumable source; + + final AtomicInteger wip; + final AtomicReference notification; + final List> subscribers; + + public SingleCache(SingleConsumable source) { + this.source = source; + this.wip = new AtomicInteger(); + this.notification = new AtomicReference(); + this.subscribers = new ArrayList>(); + } + + @Override + protected void subscribeActual(SingleSubscriber s) { + + Object o = notification.get(); + if (o != null) { + s.onSubscribe(EmptyDisposable.INSTANCE); + if (NotificationLite.isError(o)) { + s.onError(NotificationLite.getError(o)); + } else { + s.onSuccess(NotificationLite.getValue(o)); + } + return; + } + + synchronized (subscribers) { + o = notification.get(); + if (o == null) { + subscribers.add(s); + } + } + if (o != null) { + s.onSubscribe(EmptyDisposable.INSTANCE); + if (NotificationLite.isError(o)) { + s.onError(NotificationLite.getError(o)); + } else { + s.onSuccess(NotificationLite.getValue(o)); + } + return; + } + + if (wip.getAndIncrement() != 0) { + return; + } + + source.subscribe(new SingleSubscriber() { + + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onSuccess(T value) { + notification.set(NotificationLite.next(value)); + List> list; + synchronized (subscribers) { + list = new ArrayList>(subscribers); + subscribers.clear(); + } + for (SingleSubscriber s1 : list) { + s1.onSuccess(value); + } + } + + @Override + public void onError(Throwable e) { + notification.set(NotificationLite.error(e)); + List> list; + synchronized (subscribers) { + list = new ArrayList>(subscribers); + subscribers.clear(); + } + for (SingleSubscriber s1 : list) { + s1.onError(e); + } + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleContains.java b/src/main/java/io/reactivex/internal/operators/single/SingleContains.java new file mode 100644 index 0000000000..1ea8f1fdfe --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleContains.java @@ -0,0 +1,57 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.BiPredicate; + +public final class SingleContains extends Single { + + final SingleConsumable source; + + final Object value; + + final BiPredicate comparer; + + public SingleContains(SingleConsumable source, Object value, BiPredicate comparer) { + this.source = source; + this.value = value; + this.comparer = comparer; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + source.subscribe(new SingleSubscriber() { + + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T v) { + s.onSuccess(comparer.test(v, value)); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDefer.java b/src/main/java/io/reactivex/internal/operators/single/SingleDefer.java new file mode 100644 index 0000000000..4da3d7a69e --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDefer.java @@ -0,0 +1,49 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.functions.Supplier; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class SingleDefer extends Single { + + final Supplier> singleSupplier; + + public SingleDefer(Supplier> singleSupplier) { + this.singleSupplier = singleSupplier; + } + + @Override + protected void subscribeActual(SingleSubscriber s) { + SingleConsumable next; + + try { + next = singleSupplier.get(); + } catch (Throwable e) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(e); + return; + } + + if (next == null) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(new NullPointerException("The Single supplied was null")); + return; + } + + next.subscribe(s); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java b/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java new file mode 100644 index 0000000000..c3a24afc39 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java @@ -0,0 +1,65 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.TimeUnit; + +import io.reactivex.*; +import io.reactivex.disposables.*; + +public final class SingleDelay extends Single { + + + final SingleConsumable source; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + + public SingleDelay(SingleConsumable source, long time, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + final MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); + s.onSubscribe(mad); + subscribe(new SingleSubscriber() { + @Override + public void onSubscribe(Disposable d) { + mad.set(d); + } + + @Override + public void onSuccess(final T value) { + mad.set(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onSuccess(value); + } + }, time, unit)); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnCancel.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnCancel.java new file mode 100644 index 0000000000..a5d4fcc5b0 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnCancel.java @@ -0,0 +1,54 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.*; + +public final class SingleDoOnCancel extends Single { + final SingleConsumable source; + + final Runnable onCancel; + + public SingleDoOnCancel(SingleConsumable source, Runnable onCancel) { + this.source = source; + this.onCancel = onCancel; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + source.subscribe(new SingleSubscriber() { + @Override + public void onSubscribe(Disposable d) { + CompositeDisposable set = new CompositeDisposable(); + set.add(Disposables.from(onCancel)); + set.add(d); + s.onSubscribe(set); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java new file mode 100644 index 0000000000..e4b5260b2c --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java @@ -0,0 +1,59 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.functions.Consumer; + +public final class SingleDoOnError extends Single { + + final SingleConsumable source; + + final Consumer onError; + + public SingleDoOnError(SingleConsumable source, Consumer onError) { + this.source = source; + this.onError = onError; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + source.subscribe(new SingleSubscriber() { + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + try { + onError.accept(e); + } catch (Throwable ex) { + e = new CompositeException(ex, e); + } + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSubscribe.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSubscribe.java new file mode 100644 index 0000000000..a9be68c4fc --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSubscribe.java @@ -0,0 +1,73 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.plugins.RxJavaPlugins; + +public final class SingleDoOnSubscribe extends Single { + + final SingleConsumable source; + + final Consumer onSubscribe; + + public SingleDoOnSubscribe(SingleConsumable source, Consumer onSubscribe) { + this.source = source; + this.onSubscribe = onSubscribe; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + source.subscribe(new SingleSubscriber() { + boolean done; + @Override + public void onSubscribe(Disposable d) { + try { + onSubscribe.accept(d); + } catch (Throwable ex) { + done = true; + d.dispose(); + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(ex); + return; + } + + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + if (done) { + return; + } + s.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.onError(e); + return; + } + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSuccess.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSuccess.java new file mode 100644 index 0000000000..1ae82b7ff2 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnSuccess.java @@ -0,0 +1,59 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Consumer; + +public class SingleDoOnSuccess extends Single { + + final SingleConsumable source; + + final Consumer onSuccess; + + public SingleDoOnSuccess(SingleConsumable source, Consumer onSuccess) { + this.source = source; + this.onSuccess = onSuccess; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + source.subscribe(new SingleSubscriber() { + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + try { + onSuccess.accept(value); + } catch (Throwable ex) { + s.onError(ex); + return; + } + s.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java b/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java new file mode 100644 index 0000000000..2dc0bb5163 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java @@ -0,0 +1,82 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.internal.functions.Objects; +import io.reactivex.plugins.RxJavaPlugins; + +public final class SingleEquals extends Single { + + final SingleConsumable first; + final SingleConsumable second; + + public SingleEquals(SingleConsumable first, SingleConsumable second) { + this.first = first; + this.second = second; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + final AtomicInteger count = new AtomicInteger(); + final Object[] values = { null, null }; + + final CompositeDisposable set = new CompositeDisposable(); + s.onSubscribe(set); + + class InnerSubscriber implements SingleSubscriber { + final int index; + public InnerSubscriber(int index) { + this.index = index; + } + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + values[index] = value; + + if (count.incrementAndGet() == 2) { + s.onSuccess(Objects.equals(values[0], values[1])); + } + } + + @Override + public void onError(Throwable e) { + for (;;) { + int state = count.get(); + if (state >= 2) { + RxJavaPlugins.onError(e); + return; + } + if (count.compareAndSet(state, 2)) { + s.onError(e); + return; + } + } + } + + } + + first.subscribe(new InnerSubscriber(0)); + second.subscribe(new InnerSubscriber(1)); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleError.java b/src/main/java/io/reactivex/internal/operators/single/SingleError.java new file mode 100644 index 0000000000..e85ca2f700 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleError.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.functions.Supplier; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class SingleError extends Single { + + final Supplier errorSupplier; + + public SingleError(Supplier errorSupplier) { + this.errorSupplier = errorSupplier; + } + + @Override + protected void subscribeActual(SingleSubscriber s) { + Throwable error; + + try { + error = errorSupplier.get(); + } catch (Throwable e) { + error = e; + } + + if (error == null) { + error = new NullPointerException(); + } + + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(error); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleOperatorFlatMap.java b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMap.java similarity index 93% rename from src/main/java/io/reactivex/internal/operators/single/SingleOperatorFlatMap.java rename to src/main/java/io/reactivex/internal/operators/single/SingleFlatMap.java index c594b67200..83eeab515f 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleOperatorFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMap.java @@ -17,12 +17,12 @@ import io.reactivex.disposables.*; import io.reactivex.functions.Function; -public final class SingleOperatorFlatMap extends Single { +public final class SingleFlatMap extends Single { final SingleConsumable source; final Function> mapper; - public SingleOperatorFlatMap(SingleConsumable source, Function> mapper) { + public SingleFlatMap(SingleConsumable source, Function> mapper) { this.mapper = mapper; this.source = source; } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java b/src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java new file mode 100644 index 0000000000..c9b164944d --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFromCallable.java @@ -0,0 +1,45 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.Callable; + +import io.reactivex.*; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class SingleFromCallable extends Single { + + final Callable callable; + + public SingleFromCallable(Callable callable) { + this.callable = callable; + } + + @Override + protected void subscribeActual(SingleSubscriber s) { + + s.onSubscribe(EmptyDisposable.INSTANCE); + try { + T v = callable.call(); + if (v != null) { + s.onSuccess(v); + } else { + s.onError(new NullPointerException()); + } + } catch (Throwable e) { + s.onError(e); + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFromPublisher.java b/src/main/java/io/reactivex/internal/operators/single/SingleFromPublisher.java new file mode 100644 index 0000000000..6e889653e6 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFromPublisher.java @@ -0,0 +1,67 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.NoSuchElementException; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; + +public final class SingleFromPublisher extends Single { + + final Publisher publisher; + + public SingleFromPublisher(Publisher publisher) { + this.publisher = publisher; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + publisher.subscribe(new Subscriber() { + T value; + @Override + public void onComplete() { + T v = value; + value = null; + if (v != null) { + s.onSuccess(v); + } else { + s.onError(new NoSuchElementException()); + } + } + + @Override + public void onError(Throwable t) { + value = null; + s.onError(t); + } + + @Override + public void onNext(T t) { + value = t; + } + + @Override + public void onSubscribe(Subscription inner) { + s.onSubscribe(Disposables.from(inner)); + inner.request(Long.MAX_VALUE); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleHide.java b/src/main/java/io/reactivex/internal/operators/single/SingleHide.java new file mode 100644 index 0000000000..049ceaf322 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleHide.java @@ -0,0 +1,67 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +public final class SingleHide extends Single { + + final SingleConsumable source; + + public SingleHide(SingleConsumable source) { + this.source = source; + } + + @Override + protected void subscribeActual(SingleSubscriber subscriber) { + source.subscribe(new HideSingleSubscriber(subscriber)); + } + + static final class HideSingleSubscriber implements SingleSubscriber, Disposable { + + final SingleSubscriber actual; + + Disposable d; + + public HideSingleSubscriber(SingleSubscriber actual) { + this.actual = actual; + } + + @Override + public void dispose() { + d.dispose(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleJust.java b/src/main/java/io/reactivex/internal/operators/single/SingleJust.java new file mode 100644 index 0000000000..2e378b660b --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleJust.java @@ -0,0 +1,33 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class SingleJust extends Single { + + final T value; + + public SingleJust(T value) { + this.value = value; + } + + @Override + protected void subscribeActual(SingleSubscriber s) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onSuccess(value); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleLift.java b/src/main/java/io/reactivex/internal/operators/single/SingleLift.java new file mode 100644 index 0000000000..47eaa0686f --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleLift.java @@ -0,0 +1,50 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.plugins.RxJavaPlugins; + +public final class SingleLift extends Single { + + final SingleConsumable source; + + final SingleOperator onLift; + + public SingleLift(SingleConsumable source, SingleOperator onLift) { + this.source = source; + this.onLift = onLift; + } + + @Override + protected void subscribeActual(SingleSubscriber s) { + try { + SingleSubscriber sr = onLift.apply(s); + + if (sr == null) { + throw new NullPointerException("The onLift returned a null subscriber"); + } + // TODO plugin wrapper + source.subscribe(sr); + } catch (NullPointerException ex) { + throw ex; + } catch (Throwable ex) { + RxJavaPlugins.onError(ex); + NullPointerException npe = new NullPointerException("Not really but can't throw other than NPE"); + npe.initCause(ex); + throw npe; + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleOperatorMap.java b/src/main/java/io/reactivex/internal/operators/single/SingleMap.java similarity index 77% rename from src/main/java/io/reactivex/internal/operators/single/SingleOperatorMap.java rename to src/main/java/io/reactivex/internal/operators/single/SingleMap.java index bcd36f3233..8ad21a234d 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleOperatorMap.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleMap.java @@ -13,21 +13,23 @@ package io.reactivex.internal.operators.single; -import io.reactivex.Single.*; -import io.reactivex.SingleSubscriber; +import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Function; -public final class SingleOperatorMap implements SingleOperator { +public final class SingleMap extends Single { + final SingleConsumable source; + final Function mapper; - public SingleOperatorMap(Function mapper) { + public SingleMap(SingleConsumable source, Function mapper) { + this.source = source; this.mapper = mapper; } @Override - public SingleSubscriber apply(final SingleSubscriber t) { - return new SingleSubscriber() { + protected void subscribeActual(final SingleSubscriber t) { + source.subscribe(new SingleSubscriber() { @Override public void onSubscribe(Disposable d) { t.onSubscribe(d); @@ -50,6 +52,6 @@ public void onSuccess(T value) { public void onError(Throwable e) { t.onError(e); } - }; + }); } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleNever.java b/src/main/java/io/reactivex/internal/operators/single/SingleNever.java new file mode 100644 index 0000000000..bcf2f6b990 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleNever.java @@ -0,0 +1,26 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class SingleNever extends Single { + + @Override + protected void subscribeActual(SingleSubscriber s) { + s.onSubscribe(EmptyDisposable.INSTANCE); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleObserveOn.java b/src/main/java/io/reactivex/internal/operators/single/SingleObserveOn.java new file mode 100644 index 0000000000..a18124f4bb --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleObserveOn.java @@ -0,0 +1,66 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.*; + +public final class SingleObserveOn extends Single { + + final SingleConsumable source; + + final Scheduler scheduler; + + public SingleObserveOn(SingleConsumable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + final CompositeDisposable mad = new CompositeDisposable(); + s.onSubscribe(mad); + + source.subscribe(new SingleSubscriber() { + + @Override + public void onError(final Throwable e) { + mad.add(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onError(e); + } + })); + } + + @Override + public void onSubscribe(Disposable d) { + mad.add(d); + } + + @Override + public void onSuccess(final T value) { + mad.add(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onSuccess(value); + } + })); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java b/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java new file mode 100644 index 0000000000..8fe24be881 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java @@ -0,0 +1,79 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.functions.Supplier; + +public final class SingleOnErrorReturn extends Single { + final SingleConsumable source; + + final Supplier valueSupplier; + + final T value; + + public SingleOnErrorReturn(SingleConsumable source, Supplier valueSupplier, T value) { + this.source = source; + this.valueSupplier = valueSupplier; + this.value = value; + } + + + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + source.subscribe(new SingleSubscriber() { + + @Override + public void onError(Throwable e) { + T v; + + if (valueSupplier != null) { + try { + v = valueSupplier.get(); + } catch (Throwable ex) { + s.onError(new CompositeException(ex, e)); + return; + } + } else { + v = value; + } + + if (v == null) { + NullPointerException npe = new NullPointerException("Value supplied was null"); + npe.initCause(e); + s.onError(npe); + return; + } + + s.onSuccess(v); + } + + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java b/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java new file mode 100644 index 0000000000..86ce2da060 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java @@ -0,0 +1,91 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.functions.Function; + +public final class SingleResumeNext extends Single { + final SingleConsumable source; + + final Function> nextFunction; + + public SingleResumeNext(SingleConsumable source, + Function> nextFunction) { + this.source = source; + this.nextFunction = nextFunction; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + final MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); + s.onSubscribe(mad); + + source.subscribe(new SingleSubscriber() { + + @Override + public void onSubscribe(Disposable d) { + mad.set(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + SingleConsumable next; + + try { + next = nextFunction.apply(e); + } catch (Throwable ex) { + s.onError(new CompositeException(ex, e)); + return; + } + + if (next == null) { + NullPointerException npe = new NullPointerException("The next Single supplied was null"); + npe.initCause(e); + s.onError(npe); + return; + } + + next.subscribe(new SingleSubscriber() { + + @Override + public void onSubscribe(Disposable d) { + mad.set(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/single/SingleSubscribeOn.java new file mode 100644 index 0000000000..16af6df68a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleSubscribeOn.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; + +public final class SingleSubscribeOn extends Single { + final SingleConsumable source; + + final Scheduler scheduler; + + public SingleSubscribeOn(SingleConsumable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + // FIXME cancel schedule + scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + source.subscribe(s); + } + }); + + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java b/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java new file mode 100644 index 0000000000..974099562c --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleTimeout.java @@ -0,0 +1,114 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.*; +import io.reactivex.disposables.*; + +public final class SingleTimeout extends Single { + + final SingleConsumable source; + + final long timeout; + + final TimeUnit unit; + + final Scheduler scheduler; + + final SingleConsumable other; + + public SingleTimeout(SingleConsumable source, long timeout, TimeUnit unit, Scheduler scheduler, + SingleConsumable other) { + this.source = source; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + this.other = other; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + final CompositeDisposable set = new CompositeDisposable(); + s.onSubscribe(set); + + final AtomicBoolean once = new AtomicBoolean(); + + Disposable timer = scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + if (once.compareAndSet(false, true)) { + if (other != null) { + set.clear(); + other.subscribe(new SingleSubscriber() { + + @Override + public void onError(Throwable e) { + set.dispose(); + s.onError(e); + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + set.dispose(); + s.onSuccess(value); + } + + }); + } else { + set.dispose(); + s.onError(new TimeoutException()); + } + } + } + }, timeout, unit); + + set.add(timer); + + source.subscribe(new SingleSubscriber() { + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onError(e); + } + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onSuccess(value); + } + } + + }); + + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleTimer.java b/src/main/java/io/reactivex/internal/operators/single/SingleTimer.java new file mode 100644 index 0000000000..7b3b30d1f2 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleTimer.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.TimeUnit; + +import io.reactivex.*; +import io.reactivex.disposables.MultipleAssignmentDisposable; + +public final class SingleTimer extends Single { + + final long delay; + final TimeUnit unit; + final Scheduler scheduler; + + public SingleTimer(long delay, TimeUnit unit, Scheduler scheduler) { + this.delay = delay; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(final SingleSubscriber s) { + MultipleAssignmentDisposable mad = new MultipleAssignmentDisposable(); + + s.onSubscribe(mad); + + mad.set(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onSuccess(0L); + } + }, delay, unit)); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java b/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java new file mode 100644 index 0000000000..0ad1aca6c5 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java @@ -0,0 +1,134 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.functions.*; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.plugins.RxJavaPlugins; + +public final class SingleUsing extends Single { + + final Supplier resourceSupplier; + final Function> singleFunction; + final Consumer disposer; + final boolean eager; + + public SingleUsing(Supplier resourceSupplier, + Function> singleFunction, Consumer disposer, + boolean eager) { + this.resourceSupplier = resourceSupplier; + this.singleFunction = singleFunction; + this.disposer = disposer; + this.eager = eager; + } + + + + @Override + protected void subscribeActual(final SingleSubscriber s) { + + final U resource; + + try { + resource = resourceSupplier.get(); + } catch (Throwable ex) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(ex); + return; + } + + SingleConsumable s1; + + try { + s1 = singleFunction.apply(resource); + } catch (Throwable ex) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(ex); + return; + } + + if (s1 == null) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(new NullPointerException("The Single supplied by the function was null")); + return; + } + + s1.subscribe(new SingleSubscriber() { + + @Override + public void onSubscribe(Disposable d) { + if (eager) { + CompositeDisposable set = new CompositeDisposable(); + set.add(d); + set.add(new Disposable() { + @Override + public void dispose() { + try { + disposer.accept(resource); + } catch (Throwable e) { + RxJavaPlugins.onError(e); + } + } + }); + } else { + s.onSubscribe(d); + } + } + + @Override + public void onSuccess(T value) { + if (eager) { + try { + disposer.accept(resource); + } catch (Throwable e) { + s.onError(e); + return; + } + } + s.onSuccess(value); + if (!eager) { + try { + disposer.accept(resource); + } catch (Throwable e) { + RxJavaPlugins.onError(e); + } + } + } + + @Override + public void onError(Throwable e) { + if (eager) { + try { + disposer.accept(resource); + } catch (Throwable ex) { + e = new CompositeException(ex, e); + } + } + s.onError(e); + if (!eager) { + try { + disposer.accept(resource); + } catch (Throwable ex) { + RxJavaPlugins.onError(ex); + } + } + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleWrapper.java b/src/main/java/io/reactivex/internal/operators/single/SingleWrapper.java index 03ae0d16c7..d723178449 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleWrapper.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleWrapper.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.operators.single; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/subscribers/completable/CallbackCompletableSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/completable/CallbackCompletableSubscriber.java index 332a6cedf7..4c13fd4151 100644 --- a/src/main/java/io/reactivex/internal/subscribers/completable/CallbackCompletableSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/completable/CallbackCompletableSubscriber.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.subscribers.completable; import java.util.concurrent.atomic.AtomicReference; diff --git a/src/main/java/io/reactivex/internal/subscribers/completable/EmptyCompletableSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/completable/EmptyCompletableSubscriber.java index 9ba60ad897..4ab8521454 100644 --- a/src/main/java/io/reactivex/internal/subscribers/completable/EmptyCompletableSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/completable/EmptyCompletableSubscriber.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.subscribers.completable; import java.util.concurrent.atomic.AtomicReference; diff --git a/src/main/java/io/reactivex/internal/subscribers/completable/ObserverCompletableSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/completable/ObserverCompletableSubscriber.java index 7723f4a868..02d6fb40c8 100644 --- a/src/main/java/io/reactivex/internal/subscribers/completable/ObserverCompletableSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/completable/ObserverCompletableSubscriber.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.subscribers.completable; import io.reactivex.*; diff --git a/src/main/java/io/reactivex/internal/subscribers/completable/SubscriberCompletableSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/completable/SubscriberCompletableSubscriber.java index ad8ca07344..ace994dbdc 100644 --- a/src/main/java/io/reactivex/internal/subscribers/completable/SubscriberCompletableSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/completable/SubscriberCompletableSubscriber.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.subscribers.completable; import org.reactivestreams.*; diff --git a/src/main/java/io/reactivex/internal/subscribers/single/BiConsumerSingleSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/single/BiConsumerSingleSubscriber.java new file mode 100644 index 0000000000..b1acc3a24b --- /dev/null +++ b/src/main/java/io/reactivex/internal/subscribers/single/BiConsumerSingleSubscriber.java @@ -0,0 +1,54 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers.single; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.SingleSubscriber; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.BiConsumer; +import io.reactivex.internal.disposables.DisposableHelper; + +public final class BiConsumerSingleSubscriber +extends AtomicReference +implements SingleSubscriber, Disposable { + + /** */ + private static final long serialVersionUID = 4943102778943297569L; + final BiConsumer onCallback; + + public BiConsumerSingleSubscriber(BiConsumer onCallback) { + this.onCallback = onCallback; + } + + @Override + public void onError(Throwable e) { + onCallback.accept(null, e); + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(T value) { + onCallback.accept(value, null); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } +} diff --git a/src/main/java/io/reactivex/internal/subscribers/single/ConsumerSingleSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/single/ConsumerSingleSubscriber.java new file mode 100644 index 0000000000..c1718fb5d0 --- /dev/null +++ b/src/main/java/io/reactivex/internal/subscribers/single/ConsumerSingleSubscriber.java @@ -0,0 +1,58 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers.single; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.SingleSubscriber; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.DisposableHelper; + +public final class ConsumerSingleSubscriber +extends AtomicReference +implements SingleSubscriber, Disposable { + + /** */ + private static final long serialVersionUID = -7012088219455310787L; + + final Consumer onSuccess; + + final Consumer onError; + + public ConsumerSingleSubscriber(Consumer onSuccess, Consumer onError) { + this.onSuccess = onSuccess; + this.onError = onError; + } + + @Override + public void onError(Throwable e) { + onError.accept(e); + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(T value) { + onSuccess.accept(value); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } +}