/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package io.reactivex.internal.operators.observable; import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.util.AtomicThrowable; import io.reactivex.plugins.RxJavaPlugins; public final class ObservableCombineLatest<T, R> extends Observable<R> { final ObservableSource<? extends T>[] sources; final Iterable<? extends ObservableSource<? extends T>> sourcesIterable; final Function<? super Object[], ? extends R> combiner; final int bufferSize; final boolean delayError; public ObservableCombineLatest(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable, Function<? super Object[], ? extends R> combiner, int bufferSize, boolean delayError) { this.sources = sources; this.sourcesIterable = sourcesIterable; this.combiner = combiner; this.bufferSize = bufferSize; this.delayError = delayError; } @Override @SuppressWarnings("unchecked") public void subscribeActual(Observer<? super R> observer) { ObservableSource<? extends T>[] sources = this.sources; int count = 0; if (sources == null) { sources = new Observable[8]; for (ObservableSource<? extends T> p : sourcesIterable) { if (count == sources.length) { ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)]; System.arraycopy(sources, 0, b, 0, count); sources = b; } sources[count++] = p; } } else { count = sources.length; } if (count == 0) { EmptyDisposable.complete(observer); return; } LatestCoordinator<T, R> lc = new LatestCoordinator<T, R>(observer, combiner, count, bufferSize, delayError); lc.subscribe(sources); } static final class LatestCoordinator<T, R> extends AtomicInteger implements Disposable { private static final long serialVersionUID = 8567835998786448817L; final Observer<? super R> downstream; final Function<? super Object[], ? extends R> combiner; final CombinerObserver<T, R>[] observers; Object[] latest; final SpscLinkedArrayQueue<Object[]> queue; final boolean delayError; volatile boolean cancelled; volatile boolean done; final AtomicThrowable errors = new AtomicThrowable(); int active; int complete; @SuppressWarnings("unchecked") LatestCoordinator(Observer<? super R> actual, Function<? super Object[], ? extends R> combiner, int count, int bufferSize, boolean delayError) { this.downstream = actual; this.combiner = combiner; this.delayError = delayError; this.latest = new Object[count]; CombinerObserver<T, R>[] as = new CombinerObserver[count]; for (int i = 0; i < count; i++) { as[i] = new CombinerObserver<T, R>(this, i); } this.observers = as; this.queue = new SpscLinkedArrayQueue<Object[]>(bufferSize); } public void subscribe(ObservableSource<? extends T>[] sources) { Observer<T>[] as = observers; int len = as.length; downstream.onSubscribe(this); for (int i = 0; i < len; i++) { if (done || cancelled) { return; } sources[i].subscribe(as[i]); } } @Override public void dispose() { if (!cancelled) { cancelled = true; cancelSources(); if (getAndIncrement() == 0) { clear(queue); } } } @Override public boolean isDisposed() { return cancelled; } void cancelSources() { for (CombinerObserver<T, R> observer : observers) { observer.dispose(); } } void clear(SpscLinkedArrayQueue<?> q) { synchronized (this) { latest = null; } q.clear(); } void drain() { if (getAndIncrement() != 0) { return; } final SpscLinkedArrayQueue<Object[]> q = queue; final Observer<? super R> a = downstream; final boolean delayError = this.delayError; int missed = 1; for (;;) { for (;;) { if (cancelled) { clear(q); return; } if (!delayError && errors.get() != null) { cancelSources(); clear(q); a.onError(errors.terminate()); return; } boolean d = done; Object[] s = q.poll(); boolean empty = s == null; if (d && empty) { clear(q); Throwable ex = errors.terminate(); if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } if (empty) { break; } R v; try { v = ObjectHelper.requireNonNull(combiner.apply(s), "The combiner returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); errors.addThrowable(ex); cancelSources(); clear(q); ex = errors.terminate(); a.onError(ex); return; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } } void innerNext(int index, T item) { boolean shouldDrain = false; synchronized (this) { Object[] latest = this.latest; if (latest == null) { return; } Object o = latest[index]; int a = active; if (o == null) { active = ++a; } latest[index] = item; if (a == latest.length) { queue.offer(latest.clone()); shouldDrain = true; } } if (shouldDrain) { drain(); } } void innerError(int index, Throwable ex) { if (errors.addThrowable(ex)) { boolean cancelOthers = true; if (delayError) { synchronized (this) { Object[] latest = this.latest; if (latest == null) { return; } cancelOthers = latest[index] == null; if (cancelOthers || ++complete == latest.length) { done = true; } } } if (cancelOthers) { cancelSources(); } drain(); } else { RxJavaPlugins.onError(ex); } } void innerComplete(int index) { boolean cancelOthers = false; synchronized (this) { Object[] latest = this.latest; if (latest == null) { return; } cancelOthers = latest[index] == null; if (cancelOthers || ++complete == latest.length) { done = true; } } if (cancelOthers) { cancelSources(); } drain(); } } static final class CombinerObserver<T, R> extends AtomicReference<Disposable> implements Observer<T> { private static final long serialVersionUID = -4823716997131257941L; final LatestCoordinator<T, R> parent; final int index; CombinerObserver(LatestCoordinator<T, R> parent, int index) { this.parent = parent; this.index = index; } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); } @Override public void onNext(T t) { parent.innerNext(index, t); } @Override public void onError(Throwable t) { parent.innerError(index, t); } @Override public void onComplete() { parent.innerComplete(index); } public void dispose() { DisposableHelper.dispose(this); } } }