diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index a6a8624a6d..56908d2050 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9676,6 +9676,53 @@ public final Flowable lift(FlowableOperator lifte return RxJavaPlugins.onAssembly(new FlowableLift(this, lifter)); } + /** + * Limits both the number of upstream items (after which the sequence completes) + * and the total downstream request amount requested from the upstream to + * possibly prevent the creation of excess items by the upstream. + *

+ * The operator requests at most the given {@code count} of items from upstream even + * if the downstream requests more than that. For example, given a {@code limit(5)}, + * if the downstream requests 1, a request of 1 is submitted to the upstream + * and the operator remembers that only 4 items can be requested now on. A request + * of 5 at this point will request 4 from the upstream and any subsequent requests will + * be ignored. + *

+ * Note that requests are negotiated on an operator boundary and {@code limit}'s amount + * may not be preserved further upstream. For example, + * {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the + * default (128) elements from the given {@code source}. + *

+ * The main use of this operator is with sources that are async boundaries that + * don't interfere with request amounts, such as certain {@code Flowable}-based + * network endpoints that relay downstream request amounts unchanged and are, therefore, + * prone to trigger excessive item creation/transmission over the network. + *

+ *
Backpressure:
+ *
The operator requests a total of the given {@code count} items from the upstream.
+ *
Scheduler:
+ *
{@code limit} does not operate by default on a particular {@link Scheduler}.
+ *
+ + * @param count the maximum number of items and the total request amount, non-negative. + * Zero will immediately cancel the upstream on subscription and complete + * the downstream. + * @return the new Flowable instance + * @see #take(long) + * @see #rebatchRequests(int) + * @since 2.1.6 - experimental + */ + @Experimental + @BackpressureSupport(BackpressureKind.SPECIAL) + @SchedulerSupport(SchedulerSupport.NONE) + @CheckReturnValue + public final Flowable limit(long count) { + if (count < 0) { + throw new IllegalArgumentException("count >= 0 required but it was " + count); + } + return RxJavaPlugins.onAssembly(new FlowableLimit(this, count)); + } + /** * Returns a Flowable that applies a specified function to each item emitted by the source Publisher and * emits the results of these function applications. diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableLimit.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableLimit.java new file mode 100644 index 0000000000..01b1f24f76 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableLimit.java @@ -0,0 +1,137 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Limits both the total request amount and items received from the upstream. + * + * @param the source and output value type + * @since 2.1.6 - experimental + */ +@Experimental +public final class FlowableLimit extends AbstractFlowableWithUpstream { + + final long n; + + public FlowableLimit(Flowable source, long n) { + super(source); + this.n = n; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new LimitSubscriber(s, n)); + } + + static final class LimitSubscriber + extends AtomicLong + implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = 2288246011222124525L; + + final Subscriber actual; + + long remaining; + + Subscription upstream; + + LimitSubscriber(Subscriber actual, long remaining) { + this.actual = actual; + this.remaining = remaining; + lazySet(remaining); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + if (remaining == 0L) { + s.cancel(); + EmptySubscription.complete(actual); + } else { + this.upstream = s; + actual.onSubscribe(this); + } + } + } + + @Override + public void onNext(T t) { + long r = remaining; + if (r > 0L) { + remaining = --r; + actual.onNext(t); + if (r == 0L) { + upstream.cancel(); + actual.onComplete(); + } + } + } + + @Override + public void onError(Throwable t) { + if (remaining > 0L) { + remaining = 0L; + actual.onError(t); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + if (remaining > 0L) { + remaining = 0L; + actual.onComplete(); + } + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + for (;;) { + long r = get(); + if (r == 0L) { + break; + } + long toRequest; + if (r <= n) { + toRequest = r; + } else { + toRequest = n; + } + long u = r - toRequest; + if (compareAndSet(r, u)) { + upstream.request(toRequest); + break; + } + } + } + } + + @Override + public void cancel() { + upstream.cancel(); + } + + } +} diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java index 92ff38d58c..6fef3b0273 100644 --- a/src/test/java/io/reactivex/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java @@ -182,8 +182,9 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class, Scheduler.class)); - // zero retry is allowed + // zero take/limit is allowed addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "take", Long.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "limit", Long.TYPE)); // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class)); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableLimitTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableLimitTest.java new file mode 100644 index 0000000000..553085ace2 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableLimitTest.java @@ -0,0 +1,210 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subscribers.TestSubscriber; + +public class FlowableLimitTest implements LongConsumer, Action { + + final List requests = new ArrayList(); + + static final Long CANCELLED = -100L; + + @Override + public void accept(long t) throws Exception { + requests.add(t); + } + + @Override + public void run() throws Exception { + requests.add(CANCELLED); + } + + @Test + public void shorterSequence() { + Flowable.range(1, 5) + .doOnRequest(this) + .limit(6) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(6, requests.get(0).intValue()); + } + + @Test + public void exactSequence() { + Flowable.range(1, 5) + .doOnRequest(this) + .doOnCancel(this) + .limit(5) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(2, requests.size()); + assertEquals(5, requests.get(0).intValue()); + assertEquals(CANCELLED, requests.get(1)); + } + + @Test + public void longerSequence() { + Flowable.range(1, 6) + .doOnRequest(this) + .limit(5) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(5, requests.get(0).intValue()); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .limit(5) + .test() + .assertFailure(TestException.class); + } + + @Test + public void limitZero() { + Flowable.range(1, 5) + .doOnCancel(this) + .doOnRequest(this) + .limit(0) + .test() + .assertResult(); + + assertEquals(1, requests.size()); + assertEquals(CANCELLED, requests.get(0)); + } + + @Test + public void limitStep() { + TestSubscriber ts = Flowable.range(1, 6) + .doOnRequest(this) + .limit(5) + .test(0L); + + assertEquals(0, requests.size()); + + ts.request(1); + ts.assertValue(1); + + ts.request(2); + ts.assertValues(1, 2, 3); + + ts.request(3); + ts.assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(1L, 2L, 2L), requests); + } + + @Test + public void limitAndTake() { + Flowable.range(1, 5) + .doOnCancel(this) + .doOnRequest(this) + .limit(6) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(6L, CANCELLED), requests); + } + + @Test + public void noOverrequest() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .doOnRequest(this) + .limit(5) + .test(0L); + + ts.request(5); + ts.request(10); + + assertTrue(pp.offer(1)); + pp.onComplete(); + + ts.assertResult(1); + } + + @Test + public void cancelIgnored() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + BooleanSubscription bs = new BooleanSubscription(); + s.onSubscribe(bs); + + assertTrue(bs.isCancelled()); + + s.onNext(1); + s.onComplete(); + s.onError(new TestException()); + + s.onSubscribe(null); + } + } + .limit(0) + .test() + .assertResult(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + TestHelper.assertError(errors, 1, NullPointerException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.range(1, 5).limit(3)); + } + + @Test + public void requestRace() { + for (int i = 0; i < 1000; i++) { + final TestSubscriber ts = Flowable.range(1, 10) + .limit(5) + .test(0L); + + Runnable r = new Runnable() { + @Override + public void run() { + ts.request(3); + } + }; + + TestHelper.race(r, r); + + ts.assertResult(1, 2, 3, 4, 5); + } + } +}