/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package io.reactivex.internal.operators.observable; import java.util.List; import java.util.concurrent.*; import io.reactivex.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.*; import io.reactivex.observables.ConnectableObservable; /** * Helper utility class to support Observable with inner classes. */ public final class ObservableInternalHelper { private ObservableInternalHelper() { throw new IllegalStateException("No instances!"); } static final class SimpleGenerator<T, S> implements BiFunction<S, Emitter<T>, S> { final Consumer<Emitter<T>> consumer; SimpleGenerator(Consumer<Emitter<T>> consumer) { this.consumer = consumer; } @Override public S apply(S t1, Emitter<T> t2) throws Exception { consumer.accept(t2); return t1; } } public static <T, S> BiFunction<S, Emitter<T>, S> simpleGenerator(Consumer<Emitter<T>> consumer) { return new SimpleGenerator<T, S>(consumer); } static final class SimpleBiGenerator<T, S> implements BiFunction<S, Emitter<T>, S> { final BiConsumer<S, Emitter<T>> consumer; SimpleBiGenerator(BiConsumer<S, Emitter<T>> consumer) { this.consumer = consumer; } @Override public S apply(S t1, Emitter<T> t2) throws Exception { consumer.accept(t1, t2); return t1; } } public static <T, S> BiFunction<S, Emitter<T>, S> simpleBiGenerator(BiConsumer<S, Emitter<T>> consumer) { return new SimpleBiGenerator<T, S>(consumer); } static final class ItemDelayFunction<T, U> implements Function<T, ObservableSource<T>> { final Function<? super T, ? extends ObservableSource<U>> itemDelay; ItemDelayFunction(Function<? super T, ? extends ObservableSource<U>> itemDelay) { this.itemDelay = itemDelay; } @Override public ObservableSource<T> apply(final T v) throws Exception { ObservableSource<U> o = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null ObservableSource"); return new ObservableTake<U>(o, 1).map(Functions.justFunction(v)).defaultIfEmpty(v); } } public static <T, U> Function<T, ObservableSource<T>> itemDelay(final Function<? super T, ? extends ObservableSource<U>> itemDelay) { return new ItemDelayFunction<T, U>(itemDelay); } static final class ObserverOnNext<T> implements Consumer<T> { final Observer<T> observer; ObserverOnNext(Observer<T> observer) { this.observer = observer; } @Override public void accept(T v) throws Exception { observer.onNext(v); } } static final class ObserverOnError<T> implements Consumer<Throwable> { final Observer<T> observer; ObserverOnError(Observer<T> observer) { this.observer = observer; } @Override public void accept(Throwable v) throws Exception { observer.onError(v); } } static final class ObserverOnComplete<T> implements Action { final Observer<T> observer; ObserverOnComplete(Observer<T> observer) { this.observer = observer; } @Override public void run() throws Exception { observer.onComplete(); } } public static <T> Consumer<T> observerOnNext(Observer<T> observer) { return new ObserverOnNext<T>(observer); } public static <T> Consumer<Throwable> observerOnError(Observer<T> observer) { return new ObserverOnError<T>(observer); } public static <T> Action observerOnComplete(Observer<T> observer) { return new ObserverOnComplete<T>(observer); } static final class FlatMapWithCombinerInner<U, R, T> implements Function<U, R> { private final BiFunction<? super T, ? super U, ? extends R> combiner; private final T t; FlatMapWithCombinerInner(BiFunction<? super T, ? super U, ? extends R> combiner, T t) { this.combiner = combiner; this.t = t; } @Override public R apply(U w) throws Exception { return combiner.apply(t, w); } } static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, ObservableSource<R>> { private final BiFunction<? super T, ? super U, ? extends R> combiner; private final Function<? super T, ? extends ObservableSource<? extends U>> mapper; FlatMapWithCombinerOuter(BiFunction<? super T, ? super U, ? extends R> combiner, Function<? super T, ? extends ObservableSource<? extends U>> mapper) { this.combiner = combiner; this.mapper = mapper; } @Override public ObservableSource<R> apply(final T t) throws Exception { @SuppressWarnings("unchecked") ObservableSource<U> u = (ObservableSource<U>)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); return new ObservableMap<U, R>(u, new FlatMapWithCombinerInner<U, R, T>(combiner, t)); } } public static <T, U, R> Function<T, ObservableSource<R>> flatMapWithCombiner( final Function<? super T, ? extends ObservableSource<? extends U>> mapper, final BiFunction<? super T, ? super U, ? extends R> combiner) { return new FlatMapWithCombinerOuter<T, R, U>(combiner, mapper); } static final class FlatMapIntoIterable<T, U> implements Function<T, ObservableSource<U>> { private final Function<? super T, ? extends Iterable<? extends U>> mapper; FlatMapIntoIterable(Function<? super T, ? extends Iterable<? extends U>> mapper) { this.mapper = mapper; } @Override public ObservableSource<U> apply(T t) throws Exception { return new ObservableFromIterable<U>(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable")); } } public static <T, U> Function<T, ObservableSource<U>> flatMapIntoIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper) { return new FlatMapIntoIterable<T, U>(mapper); } enum MapToInt implements Function<Object, Object> { INSTANCE; @Override public Object apply(Object t) throws Exception { return 0; } } public static <T> Callable<ConnectableObservable<T>> replayCallable(final Observable<T> parent) { return new ReplayCallable<T>(parent); } public static <T> Callable<ConnectableObservable<T>> replayCallable(final Observable<T> parent, final int bufferSize) { return new BufferedReplayCallable<T>(parent, bufferSize); } public static <T> Callable<ConnectableObservable<T>> replayCallable(final Observable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { return new BufferedTimedReplayCallable<T>(parent, bufferSize, time, unit, scheduler); } public static <T> Callable<ConnectableObservable<T>> replayCallable(final Observable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler) { return new TimedReplayCallable<T>(parent, time, unit, scheduler); } public static <T, R> Function<Observable<T>, ObservableSource<R>> replayFunction(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final Scheduler scheduler) { return new ReplayFunction<T, R>(selector, scheduler); } static final class ZipIterableFunction<T, R> implements Function<List<ObservableSource<? extends T>>, ObservableSource<? extends R>> { private final Function<? super Object[], ? extends R> zipper; ZipIterableFunction(Function<? super Object[], ? extends R> zipper) { this.zipper = zipper; } @Override public ObservableSource<? extends R> apply(List<ObservableSource<? extends T>> list) { return Observable.zipIterable(list, zipper, false, Observable.bufferSize()); } } public static <T, R> Function<List<ObservableSource<? extends T>>, ObservableSource<? extends R>> zipIterable(final Function<? super Object[], ? extends R> zipper) { return new ZipIterableFunction<T, R>(zipper); } static final class ReplayCallable<T> implements Callable<ConnectableObservable<T>> { private final Observable<T> parent; ReplayCallable(Observable<T> parent) { this.parent = parent; } @Override public ConnectableObservable<T> call() { return parent.replay(); } } static final class BufferedReplayCallable<T> implements Callable<ConnectableObservable<T>> { private final Observable<T> parent; private final int bufferSize; BufferedReplayCallable(Observable<T> parent, int bufferSize) { this.parent = parent; this.bufferSize = bufferSize; } @Override public ConnectableObservable<T> call() { return parent.replay(bufferSize); } } static final class BufferedTimedReplayCallable<T> implements Callable<ConnectableObservable<T>> { private final Observable<T> parent; private final int bufferSize; private final long time; private final TimeUnit unit; private final Scheduler scheduler; BufferedTimedReplayCallable(Observable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) { this.parent = parent; this.bufferSize = bufferSize; this.time = time; this.unit = unit; this.scheduler = scheduler; } @Override public ConnectableObservable<T> call() { return parent.replay(bufferSize, time, unit, scheduler); } } static final class TimedReplayCallable<T> implements Callable<ConnectableObservable<T>> { private final Observable<T> parent; private final long time; private final TimeUnit unit; private final Scheduler scheduler; TimedReplayCallable(Observable<T> parent, long time, TimeUnit unit, Scheduler scheduler) { this.parent = parent; this.time = time; this.unit = unit; this.scheduler = scheduler; } @Override public ConnectableObservable<T> call() { return parent.replay(time, unit, scheduler); } } static final class ReplayFunction<T, R> implements Function<Observable<T>, ObservableSource<R>> { private final Function<? super Observable<T>, ? extends ObservableSource<R>> selector; private final Scheduler scheduler; ReplayFunction(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, Scheduler scheduler) { this.selector = selector; this.scheduler = scheduler; } @Override public ObservableSource<R> apply(Observable<T> t) throws Exception { ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource"); return Observable.wrap(apply).observeOn(scheduler); } } }