-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Take Doesn't Reduce Large RequestN #5077
Comments
If the downstream is requesting more than N, the operator requests unbounded so the upstream can go on a fast-path when it emits. Take will then stop the upstream at N and there is no overflow. If the downstream requested M < N, take will also just request M. Try this: Flowable.interval(100, TimeUnit.MILLISECONDS)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) {
System.out.println(n);
requests.addAndGet(n);
}})
.take(2)
.rebatchRequests(1) // <-----------------------------------------
.subscribe(ts); |
That doesn't make sense to me and differs from v1 behavior. If the upstream is across a network boundary that means someone has to know that take doesn't actually reduce the requested amount and therefore always needs to use both take and rebatch. I don't buy the argument that if downstream requests more that this should be done, as it is normal for the downstream subscriber to use the default value and take to be used to constrain the output. If unconstrained output was wanted I'd use an Observable, but I'm using Flowable so expect it to be constrained. |
New major versions have the opportunity to change behavior on some level.
Is this for the ReactiveSocket library? As an IO boundary provider, you should never trust the downstream's request amount the same way as |
No it's not for ReactiveSocket. But it's for Reactive Streams APIs that involve IO. I understand a major version can change behavior, but I don't understand why this would change. Please explain why the very small perf improvement in the source is worth changing this behavior? The point of Flowable is to constrain emission. If I wanted unconstrained I'd use Observable. Why should the source not be restricted to the maximum that take will accept for real world use cases and not just fabricated micro benchmarks? |
It is completely legal for an operator to go unbounded if it ensures the rest of the rules towards its downstream consumer. Running unbounded means less overhead from operators which means more time to process the actual items, especially those just came over from an async boundary (observeOn).
You are still thinking about requests as an end-to-end way of communicating with some server and sending a message to it requesting the next item based on consumption at the very end. Reactive-Streams backpressure is different. It is a negotiation between subsequent operators. Some may not interfere it, some might change the timing or amount, some might go unbounded. It is about honoring the downstream's amount and working out an amount for the upstream. For example: source.observeOn(Schedulers.computation()).take(5).subscribe(...) Will the No.
|
Yes I know that. I was involved in arguing for inclusion of that rule in the Reactive Streams spec and know the benefits.
No I'm not. Remember I was part of the founding group for Reactive Streams, so I understand the semantics. Early in the discussions it was assumed every operator hop was async, but we argued that hops could be synchronous and therefore would benefit from optimizations in the spec, such as rule 3.17 and 2.2.
I understand that, and am not questioning that. I'm questioning a simple implementation choice, not a contractual or semantic problem. Everything that the But I won't bother attempting discussion any further. It's not important enough to me. |
thanks for raising this @benjchristensen. I wasn't aware of it. So I'm assuming then that if I've got some synchronous network requesting source (that can't effectively emit partially e.g. a web service request that returns a list of items) and I don't want that source to transfer an unnecessarily large list in reaction to a source.rebatchRequests(2).take(2); This is certainly an uncommon use case for me but I did like that in RxJava 1.x upstream requests were constrained generally to what was required by downstream with the exception of a few well-known operators like |
@akarnokd or @benjchristensen can you reopen this issue? I think it needs more discussion. |
The use case is fairly common for me. Consider this: Library: public Flowable<Data> getDataFromNetwork(Input args) {
return SomeFlowable(...)
// ensure over network we do no more than request(128)
.rebatchRequests(128);
} Then a consumer that I don't control: getDataFromNetwork(...).take(10).subscribe(...) In that case I would prefer to only request 10 items over the network, but in the |
Yep, I'm not a fan of that behaviour and I'm sure I'll encounter it tying together service calls as our organisation moves more logic behind services. There's a broader consideration here I suppose which is how much should users of the RxJava api know or expect of the request behaviour of operators. We don't really have a guiding principle in place that helps us here. For me I'd like to see operators only request what they need and where for performance considerations it makes sense to request in batches (across async boundaries for instance) then it should be configurable. I've had a quick look through the list of operators and I've identified the operators that I'd prefer only requested what they need. To have constrained requesting behaviour on these operators as well as the ability to configure Note that all these below request
The operators above are the only operators I saw that have a definite upper bound that is less than |
Anything that has the |
@akarnokd is right the way we solve it on the same behavior with our network components (at least kafka, redis and netty ones) is to address the Network IO by prefetch at generator level. Basically backpressure certainly applies around boundaries and you shouldn't bind yourself into a request behavior downstream. In the end the unbounded information is still valuable as you influence the rate of replenishing upstream. We talked about it with @akarnokd last year already when we were experimenting with reactive-streams-commons and it we agreed then for the 3 libraries (including reactor) it seemed the right thing to do without failing the specification. So far we have one contention point with the RS spec which is to signal error if request is invalid/negative that we ignore until a special operator is used. That's the same question with aggregating operators for instance which are going to be unbounded given the implicit contract that no callback is blocking. Of course it could block if told otherwise by an explicit thread jump like observeOn which then falls into the category discussed above for the generator, prefetch fixed-size queue. |
This makes total sense that it must define the max, but when it is known that the downstream is requesting less, then it would be better if that information was passed upwards so requests are constrained all the way up the chain. Regarding the specific implementation of In practice, the cost of over-producing elements, and over-sending elements across async boundaries (network, threads, queues) is far higher than any optimization gained by sending Thus, I suggest |
I agree with this. The potentially bounded request from a It might be worth noting that
public Flowable<Data> getDataFromNetwork() {
return SomeFlowable(...)
// ensure over network we do no more than request(128)
// first request is allowed through even if less than 10
// later requests smaller than 10 will be rebatched to 10
.rebatchRequests(10, 128, true);
} and use cases like these below would all be quite efficient in terms of the network fetch if //will fetch 1
getDataFromNetwork().first().subscribe();
//will fetch in batches of 128
getDataFromNetwork().take(1000000).subscribe();
//will fetch 1, then 10, 10, 10, ...
getDataFromNetwork().take(100).rebatchRequests(1).subscribe(); This approach is helpful when calls to the network source may happen frequently overall from many client processes (distributed or not) but the individual subscriptions to |
Let's assume networkSource().take(1_000_000)
.subscribe(v -> { }, e -> { }, () -> { }, s -> s.request(1_000_000)); In RxJava, whenever there is a boundary, the operator adjusts its input request amounts to remain reasonable, such as |
Yep I agree, but we use a different rebatching operator internal to I've implemented some new request limiting operators in If there's viability to these new operators then review would be nice later (it's only a day old and I'll check it some more in the next week or two). Critical to the argument is that I want support for sources that under-the-covers don't easily support streaming. This is a common situation especially for integrating legacy distributed services using RxJava. |
Like @davidmoten I am doing rebatching myself before sending over the network as the built-in one is too limiting. I for example want:
|
@benjchristensen As you demonstrated publicly, you consider keeping behavioral compatibility very important and defended it to ensure continued interoperability. I have the same priority regarding changes to established behavior in RxJava 2 where I believe there was plenty of time before release anyone could bring up this problem, at which point and even now, I see the following compatible changes possible:
In case you plan to implement any of the options, you may want to prepare for the following case setup: If you want the 150 to be the upper bound of the sum of total requests sent to |
Sort of. That enormous rewrite was never going to get a thorough review, people didn't have the time unfortunately. I'm also not sure if this issue was ever raised specifically and discussed in terms that would have sparked reaction. I went looking for it in old issues and PRs and couldn't find it (but might be there of course). That's water under the bridge now of course. @akarnokd are you open to a breaking change on this one so that the default behaviour is not unbounded? I frankly doubt it would affect anyone at all and the only people that would have read and absorbed the backpressure annotations on
Yeah thanks for that, I did handle request accounting in the operators in rxjava-extras as you describe when I wrote them weeks ago. If you (or anyone else) can review them terrific. I opened an issue at davidmoten/rxjava2-extras#4. |
Not really, adding a new operator |
I'd prefer an overload on |
That doesn't really help the primary use case for me since the consumer doesn't know what the producer behavior is. They shouldn't have to know to do this or not, the producer should just receive the smallest possible request(n) value - and take is a clear indicator that no more than the take(n) value ever needs to be produced. The point of Flowable is flow control across async boundaries - preventing over production and buffering. If optimizing for synchronous execution is what I was looking for I'd use Iterable/Stream. If flow control was not needed, I'd use Observable. |
Closing via #5655. |
When a child subscriber submits a large requestN value, such as Long.MAX_VALUE, the 'take' operator does not reduce it as expected.
For example, in the following where a default subscribe happens and requests Long.MAX_VALUE up, it is expected that the
take(10)
would adjust the requested size to the max value that take will permit through.Here is a unit test:
This errors with:
Is there a reason that
take
in RxJava 2 does not behave this way and reduce the requestN value to the limit?The text was updated successfully, but these errors were encountered: