Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add limit() to limit both item count and request amount #5655

Merged
merged 3 commits into from
Oct 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9676,6 +9676,53 @@ public final <R> Flowable<R> lift(FlowableOperator<? extends R, ? super T> lifte
return RxJavaPlugins.onAssembly(new FlowableLift<R, T>(this, lifter));
}

/**
* Limits both the number of upstream items (after which the sequence completes)
* and the total downstream request amount requested from the upstream to
* possibly prevent the creation of excess items by the upstream.
* <p>
* The operator requests at most the given {@code count} of items from upstream even
* if the downstream requests more than that. For example, given a {@code limit(5)},
* if the downstream requests 1, a request of 1 is submitted to the upstream
* and the operator remembers that only 4 items can be requested now on. A request
* of 5 at this point will request 4 from the upstream and any subsequent requests will
* be ignored.
* <p>
* Note that requests are negotiated on an operator boundary and {@code limit}'s amount
* may not be preserved further upstream. For example,
* {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the
* default (128) elements from the given {@code source}.
* <p>
* The main use of this operator is with sources that are async boundaries that
* don't interfere with request amounts, such as certain {@code Flowable}-based
* network endpoints that relay downstream request amounts unchanged and are, therefore,
* prone to trigger excessive item creation/transmission over the network.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests a total of the given {@code count} items from the upstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code limit} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>

* @param count the maximum number of items and the total request amount, non-negative.
* Zero will immediately cancel the upstream on subscription and complete
* the downstream.
* @return the new Flowable instance
* @see #take(long)
* @see #rebatchRequests(int)
* @since 2.1.6 - experimental
*/
@Experimental
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
public final Flowable<T> limit(long count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this strict version of take(), but thinking about API design I'm afraid that this additional operator could add unnecessary confusion for the users.

Maybe an overload of take() with boolean flag that controls "strictness" of the upstream requests would work? (or something better if you have it on your mind)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly.

if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
return RxJavaPlugins.onAssembly(new FlowableLimit<T>(this, count));
}

/**
* Returns a Flowable that applies a specified function to each item emitted by the source Publisher and
* emits the results of these function applications.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.atomic.AtomicLong;

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Limits both the total request amount and items received from the upstream.
*
* @param <T> the source and output value type
* @since 2.1.6 - experimental
*/
@Experimental
public final class FlowableLimit<T> extends AbstractFlowableWithUpstream<T, T> {

final long n;

public FlowableLimit(Flowable<T> source, long n) {
super(source);
this.n = n;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new LimitSubscriber<T>(s, n));
}

static final class LimitSubscriber<T>
extends AtomicLong
implements FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = 2288246011222124525L;

final Subscriber<? super T> actual;

long remaining;

Subscription upstream;

LimitSubscriber(Subscriber<? super T> actual, long remaining) {
this.actual = actual;
this.remaining = remaining;
lazySet(remaining);
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
if (remaining == 0L) {
s.cancel();
EmptySubscription.complete(actual);
} else {
this.upstream = s;
actual.onSubscribe(this);
}
}
}

@Override
public void onNext(T t) {
long r = remaining;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the point of local r? It looks like you can just do everything directly on remaining here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I avoid re-reading fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, it's not volatile, so it should be normally in CPU register/cache, but yeah local var has slightly more chances to be there

Although local var creates indirection between read/writes, so overall I think it's neither win nor loss in performance :)

There is interesting question on SO, but answers seem to only compare speed of access to the field when it's already initialized, not taking overall overhead of additional variable https://stackoverflow.com/questions/21613098/java-local-vs-instance-variable-access-speed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JIT may load it into a register but not everything runs on HotSpot.

if (r > 0L) {
remaining = --r;
actual.onNext(t);
if (r == 0L) {
upstream.cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this:

if (r > 0L) {
  remaining = --r;
  final boolean completed = r == 0L;
  
  if (completed) {
    upstream.cancel();
  }

  actual.onNext(t);

  if (completed) {
    actual.onComplete();
  }
}

This way we cancel upstream before delivering onNext notification, it could be important in case if onNext triggers long running chain of computations thus delaying cancellation of the upstream (in current version).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, current implementations of take() also does this so it might be unwanted behavior change, up2u, we can move this change to 3.x

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or it could trigger an interrupted thread and fail code in the downstream unexpectedly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

But doesn't it mean that actual.onComplete() should happen before upstream.cancel() then? :trollface:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's way less likely to happen. There were issues around onNext before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, although I think probability is the same here, upstream.cancel() has to be called in both ways of call ordering and possibly cause problems. But in current order we guarantee delivery of onNext which is probably good :)

actual.onComplete();
}
}
}

@Override
public void onError(Throwable t) {
if (remaining > 0L) {
remaining = 0L;
actual.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
if (remaining > 0L) {
remaining = 0L;
actual.onComplete();
}
}

@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
for (;;) {
long r = get();
if (r == 0L) {
break;
}
long toRequest;
if (r <= n) {
toRequest = r;
} else {
toRequest = n;
}
long u = r - toRequest;
if (compareAndSet(r, u)) {
upstream.request(toRequest);
break;
}
}
}
}

@Override
public void cancel() {
upstream.cancel();
}

}
}
3 changes: 2 additions & 1 deletion src/test/java/io/reactivex/ParamValidationCheckerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ public void checkParallelFlowable() {
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class));
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class, Scheduler.class));

// zero retry is allowed
// zero take/limit is allowed
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "take", Long.TYPE));
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "limit", Long.TYPE));

// negative time is considered as zero time
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class));
Expand Down
Loading