-
Notifications
You must be signed in to change notification settings - Fork 535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ByteBuffers/Strings with onNext(T item) and request(int n)? #270
Comments
Hi @gregw, demand-element relation is explained here: https://github.com/reactive-streams/reactive-streams-jvm#subscriber-controlled-queue-bounds Sadly, Java (or the JDK rather) does not have a standard representation for an immutable chuck of bytes (just as String is a representation for an immutable chunck of unicode code points). An acceptable solution for your use case may be to send asReadOnly() versions of ByteBuffer? |
Hi @viktorklang , Thanks for the response... firstly is this the right place to discuss this kind of thing. Please point me to the appropriate forum if not. The link you gave makes it clear that the value passed in request(int n) will translate to the invocation count of onNext(), which is exactly the problem I don't know how to get around? If I have a Subscriber and do a request(1), then I do know that onNext() will be called once and only once, but I don't know if it will be called with a ByteBuffer that contains a single byte or one that contains 4GB! Considering that onNext has to fully consume the content passed, that is a big problem. What we are trying to do is to provide the Flow semantics directly from the Jetty servers asynchronous IO streams, so that developers don't have to deal with the complexities of Servlet API's async callbacks. |
I put together the https://github.com/ReactiveX/RxJavaString/ project to help with some of the issues with splitting and encoding across different chunks of UTF8 bytes. Not sure any of it would be directly useful for ByteBuffers. If you want maybe that project would be a good place for a ByteBuffer specific operations if not then maybe it could be used as a template for your own custom operators. |
George, I can see that such concerns as splitting/encoding of byte streams that class ITF8Processor implements Flow.Processor<ByteBuffer,CharSequence> which could be used to convert bytes collection to characters sequences and However, looking at your code I can't see that you've addressed the issue It is that question I need resolved so I can get over the initial hump and cheers On 29 May 2015 at 12:44, George Campbell notifications@github.com wrote:
Greg Wilkins <gregw@webtide.com gregw@intalio.com> - an Intalio.com |
You'll either have to have the Producer generate reasonable sized ByteBuffers or have an intermediate step that slices them into smaller parts. Might I also suggest you take a look at https://github.com/ReactiveX/RxNetty. |
Ah I've seen that this has been briefly discussed before in #47 and closed because it was too hard to solve simply! Sorry but that seams like a bit of a cop out to me and will make the most simple of processors impossible to write asynchronously. For example, if I want to write a processor the does UTF9 conversion of ByteBuffer to CharSequence, then it is possible for onNext to be called with a massive memory mapped file as a ByteBuffer, which will never ever fit into memory. So the processor will have no choice but to block and slice off a buffer of characters at a time, feeding them to its downstream subscriber until all have been consumed. It is not even possible to write a non-blocking fragmenting Processor whose only role is to chop a big buffer into smaller ones that might avoid this problem downstream. I don't think this is not a too hard problem and I can see two basic approaches:
It would be a real pity if this semantic is put into the too hard basket. The whole point of APIs like this is that async is hard and needs to be simplified. Having a single async Flow abstraction that can handle bytes, chars, frames, objects, more complex objects etc. while keeping the same semantic and backpressure mechanism would be awesome! |
Surely at one end of a Producer chain their might be a user who is not going to be constrained by reasonable sizes. They are just going to memory map the biggest file they can find as a ByteBuffer and feed it into a Flow and want it to work. You can write a Processor that slices large items into smaller items, but I can't see how it can be written asynchronously because the large item will arrive in an onNext call which can't return until the entire item has been consumed. OK so maybe this can be done in a Producer, but to do so would require another asynchronous API and isn't the whole point of this exercise to give a nice portable async reactive API that users can know love and use? If we have to switch to another API when we hit uncontrolled user code, then it kind of defeats the point! No? Also, even if the initial Producer creates right sized chunks, who is to say that they are right-sized all the way through a chain of Producers and Subscribers. If you are to use the Processor model then it needs to be able to asynchronously slice big items into small items. The current API does not allow this in a non blocking way, so the model will fail for an arbitrary chain of Processors with differing size limitations within the chain. |
Here are two examples of splitting potentially oversized byte[]/ByteBuffers into smaller ones: public class ByteArraySplitting {
public static void main(String[] args) {
int maxSize = 300;
Observable.just(new byte[1024]).repeat(2)
.flatMap(a -> {
int capacity = (a.length + maxSize - 1) / maxSize;
List<byte[]> chunks = new ArrayList<>(capacity);
int i = 0;
int remaining = a.length;
while (remaining != 0) {
int len = Math.min(maxSize, remaining);
byte[] chunk = Arrays.copyOfRange(a, i, i + len);
chunks.add(chunk);
i += len;
remaining -= len;
}
return Observable.from(chunks);
})
.subscribe(b -> System.out.println(b.length));
ByteBuffer bb = ByteBuffer.allocate(1024);
bb.put(new byte[1024]);
bb.flip();
Observable.just(bb)
.flatMap(a -> {
int capacity = (a.limit() + maxSize - 1) / maxSize;
List<ByteBuffer> chunks = new ArrayList<>(capacity);
int i = 0;
int remaining = a.limit() - a.position();
while (remaining != 0) {
int len = Math.min(maxSize, remaining);
ByteBuffer chunk = a.slice();
chunk.position(i);
chunk.limit(i + len);
chunks.add(chunk);
i += len;
remaining -= len;
}
return Observable.from(chunks);
})
.subscribe(b -> System.out.println(b.position() + " .. " + b.limit()));
}
} Cutting down on the size is straighforward, but if you want to combine smaller chunks up to the limit and split larger chunks down to the limit, that is more complicated. |
I can't relate the examples you've give to an asynchronous implementation of a Flow.Processor<ByteBuffer,ByteBuffer>, where the big buffers will arrive in an onNext(ByteBuffer item) call and have to be fed to another Subscriber, who may only have done a request(1) and another request call may be some time in coming. Thus once the big buffer has been split, the processor can only calls its subscribers onNext(ByteBuffer) with a small buffer and then has to wait until another call to request(n) is made - hence it becomes blocking? |
In RxJava all of the non blocking backpressure logic is in the last line Here is the implementation of the from
|
@abersnaze OK I'm going to need to play with your code a bit more to see if I can understand how you are avoiding blocking. Is this code against:
|
@gregw You are better off with the latest: <dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.11</version>
</dependency> @abersnaze's example seems to be an older one because it has a concurrency bug in its fast-path. |
Thanks @akarnokd and @abersnaze, I've got the code running and understand it better (I'm still getting my head around that code style). I can see that example can be kind of thought of as asynchronous, as it holds the chunks until a request(n) call comes in from a subscriber, in which case it hands out n chunks. If the subscriber was remote or truly asynchronous (which it is not in this example), then there would be no thread waiting to hand out the next chunk. However, this is essentially just a producer that slices a big buffer into lots of smaller buffers and hands them out as requested. This is not the Flow.Processor case that I'm concerned with where the big buffer has itself been delivered by an asynchronous callback. Firstly pre computing the chunks may not always be possible. OK for ByteBuffers that can be sliced, but if it was being converted to a CharSequence, then precomputing would triple the memory commitment, which is kind of against the whole purpose of slicing it up in the first place. Even if you solve this by taking a single slice as demanded, you still need to hold onto the source ByteBuffer while all the slices are taken. If that source ByteBuffer has been provided by a onNext call from a truly async source, then I don't see how this can work. It works in this example because the ByteBuffer is not recycled/reused for the duration of the test. But in a real usage, the ByteBuffer passed in an onNext call is likely to be reused immediately after the onNext returns - thus any slices taken subsequently will point to the wrong data. Anyway, let me get my head a bit more into this style of code and I'll write a unit test that demonstrates the async semantic I'm trying to achieve... this could take me a few days. |
In Rx we like to use Observable.from because it hides some complicated logic but it requires data to be precomputed. We've been experimenting with an AnstractOnSubscribe that hands the back pressure and delegates to a closure to produce just enough data when needed. |
@abersnaze It is not so much the use of Observable.from that is the issue for me. If the task is slicing a ByteBuffer into small slices, then it is fit for purpose. The problem is not preslice or slice as you go, the problem is the scope of the big buffer that you are slicing from. If that big buffer is passed in from an onNext call as part of a chain of processors, then that onNext call cannot return while slices are current (as it backs them) or there is more slicing to do. Thus that onNext call becomes blocking not asynchronous. |
You can apply |
Sorry for the late reply, I didn't have the cycles to read through the whole thread, so forgive me if this has already been said: There is absolutely no problem going from Collection as element type to T, nor the other way around, someSource.strictChunkedPublisher(size = N).subscribe(subscriber) the reason for this is simple—since the Collection has already been created, it is not occupying variable subscriber buffer space. Perhaps you want to allow for "read as much as you have but don't send me more than N per chunk": someSource.lenientChunkedPublisher(size = N).subscribe(subscriber) The reasons for not messing around with specifying byte sizes are, but not limited to: A) Given a type T what is its size? Given that we can create types that reflect intent. (You could create a ByteBuf256 if you want to be explicit about capacity and optimization opportunities). Given that we can transform chunks into their contents in a non-blocking and backpressured way (by peeling off elements and emitting their contents as requested) Given that we can take elements and chunk them. (classing windowing) ===> What would, if any, be the benefits of changing to include for chunking. (Worth noting is that currently the RS spec allows for implementations that push ~175 million elements a second (point-to-point).) |
There was a previous discussion that might also be of interest: #47 |
FWIW we're dealing with this issue in the reactive-ipc project. [1] The problem really becomes one of translation. By that I mean: In short, I think there has to be additional information similar to what |
I understand that this is not an issue in a Publisher and it can fragment a large collection it already has in memory easy enough. The use-case I'm concerned with is if the memory limitation is in a Processor. Two examples come to mind:
To illustrate this problem I've written an example (against the reactive streams API rather than rxJava), that includes a long README about the problems I can't get my head around: https://github.com/jetty-project/jetty-reactive/tree/FragmentingProcessor Again it is the Processors that I'm most concerned with. The origin publisher and ultimate subscriber have a lot more flexibility to deal with these kinds of issues. If an ecosystem of utility processors is to be developed (eg compressors, encryptors, encoders etc.) then they need to be able to operate asynchronously within the API provided. That is what I'm not currently seeing. |
@rkuhn yes #47 is essentially the same issue.... problem is that it was closed as being too hard to fix! I'm evaluating this API against a real asynchronous use-case that is in need of a better API (Servlet async IO) and I think it reasonable that an API proposed for java 9 can handle such use-cases at least as good as callbacks. Hence I'd like to open the discussion again to see if there are techniques within the API as proposed that can help or if there are some possible changes to the API. As without one or the other, reactive streams do not look suitable for these use-cases. See the README in https://github.com/jetty-project/jetty-reactive/tree/FragmentingProcessor for more detailed description of the use-cases I'm concerned with. cheers |
@viktorklang ooops I didn't give my second example.... never mind, it is the same as the first one. |
The memory has already been allocated for the chunk, so I don't see how the Subscriber's buffer would be overrun? (The Subscriber's buffer is in Ts, no matter if T is composite or not) |
@viktorklang Surely you don't think the only significant measure of a subscriber's capacity is an often unmeasurable (at the least unknowable) impact on RAM usage? There are other kinds of constraints that have nothing to do with RAM usage. |
Hmm, how do you mean in this case? To illustrate my point, let's say we have a ByteBuf256 type which, unsurprisingly, holds up-to 256 bytes. |
Actually.... I think I'm about to answer my own question..... A Processor that receives a ByteBuffer in onNext that is larger than it can process without blocking (do to lack of backpressure) can just keep a reference to it and return from the onNext call... provided that:
Hmmm I'll give that a go in my repo tomorrow (1am here now) and report back if that approach solves my issues. |
I'm opportunistically answering your answer instead of your previous comments (let me know if I should)
If a processor decides to issue more demand upstream than is signalled downstream, it (by design) needs to be willing to buffer that itself. This does not entail any blocking in the "parking a Thread"-sense of the word.
Exactly. Think of
No, this is faulty.
See previous answer.
What you'll need to do in this case is to control the pipeline: YourPublisher -> CustomerProcessor<ByteBuffer, ByteBuffer> -> YourSubscriber So you can collect the ByteBuffers as they are passed through. Does that help? |
@viktorklang regarding your comment to @jbrisbin It's subscriber calls request(1) on the inflator, so it calls request(1) on it's publisher, which responds by calling inflator.onNext(byte[]) with a 256 byte array. But it is an inflator, so that 256 byte array can expand to an almost arbitrary large byte[]. There are several possibilities:
|
@gregw FWIW in Reactor we had to implement his "remainder" functionality for our |
@viktorklang replying to your message that crossed mine. Surely a Processor is a Subscriber, so it is free to without its subsequent request(n) calls until it has finished handling a previous onNext call. I understand that using a request(1) as an ack is not exactly correct, but I do not see how else a publisher can recycle a buffer it has previously passed to onNext. It's not acceptable to say that it can't recycle it as servers will just not scale if they create that kind of garbage for each message. So without the request as ack, how else can a Publisher (or Processer) recycle a buffer? |
@viktorklang regarding your comment to @jbrisbin
I'll try simplify the example:
Now, if the routine that possibly creates an unbounded number of Ts doesn't have a means to control the number of Ts it produces, nothing Reactive Streams can do will ever be able to fix that. It is equivalent to a Publisher that cannot be controlled-> you'll have to decide to buffer and drop.
Sure, but only if you control the implementation, i.e. you know more about the pipeline than the RS spec can know. I'm talking from the general perspective of what you can and can't assume wr.t. arbitrary Publishers, Subscribers and Processors given the spec and tck.
Safest strategy is to add a collector stage at the end that ferries the buffer back to the publisher. Does that make sense? |
@viktorklang However, it is probably a better solution than the request as ack idea as it at least allows for request(n>1) to be sent to the Publisher to avoid possible latency issues. I'll experiment further. |
You'll be able to hook in any processor you like as long as you control where the buffers are injected and where they are collected. Or? Let me know what your experimentation reveals, this is an interesting topic. |
OK I think I'm understanding RS better and will close this issue. The problem that I was having is that I wanted to implement a Processor that was async, bounded memory and acknowledging (so callers could recycle buffers). I've worked out that you can have only 2 of those 3 attributes. I've been able to implement an async & bounded memory process so long as I don't attempt to use the return of onNext or a subsequent request(n) call to indicate acknowledgement (so that buffers can be recycled or the demo exit after completion). You can see the results of this revelation here. While I think acknowledgement is an important attribute, it is not the subject of this issue and I'll move the discussion of it to the list. My demo achieves acknowledgement only by going outside the standard RS API. Two other things I'll note of interest here: Firstly the demo contains an abstract IteratingProcessor that is based on the Jetty IteratingCallback. This is a useful pattern to avoid arbitrarily deep recursion (onNext(item) calls request(n) calls onNext(item) calls request(n)....) which is a common problem with async frameworks. The other is that I've noticed that RS are similar to a scheduling strategy that we've implement in Jetty called Eat What You Kill. EWYK achieves mechanical sympathy by implementing the ethos of a thread should not produce a task unless it is able to immediately consume it. This avoids queues, dispatch delays and allows tasks to be consumed with hot tasks. RS style encourages similar mechanical sympathies by requiring publishers to not publish until there is demands that originates from the ultimate subscriber. This can allow a single thread to call through a chain of onNext calls, keeping it's cache hot while avoiding queues and dispatch delays. |
Just passing by and looking at your problem. What would be if you copied the 256 bytes buffer when you received it in your GzipInflatorProcessor to a private buffer and then decompress incrementally from the private copy when the downstream requests more. So you don't retain a reference to buffer passed to you. Surely this will mean more memory copying, but that's an alternative if you don't want to manage byte[] arrays ownership. You treat byte[] arrays as value types. Does that make any sense? |
Hi @gregw, I can't see any tests in the project—I'd strongly recommend applying the TCK to your processor, it will assist you in ironing out potential spec violations. Speaking of the spec, I'd love to get your feedback on it, is there things that need improvement, what was good etc? |
@plevart Sure copying data get's around some of the issues and copying 256 bytes in that example is not a big deal. But copying data passed in that can be of arbitrary size is not a good general solution as it is hard to enforce memory bounds. cheers |
@viktorklang Thanks for the suggestion of applying the TCK. I think it is probably a bit early for our experiments, but we will certainly look at it soon. With regards to the spec, it is also a bit early for me to say anything too definitive... specially as I think I've just spent the best part of a week barking up the wrong tree with it :) Although I guess that might mean that some of the documentation and examples could be improved. But now that I've adjusted my head to think more in line with how the API is intended to be used, I can give you some initial feedback. It is definitely a lot simpler to work with than other async APIs, so while I think there is a lot of good in the Servlet async IO API, it is a very tricky API to use and we are very interested in wrapping it as RS's to make it easier to use. That is where our focus has shifted in our experimental repo. The other thing I like is the mechanical sympathy that results from RS, with a single thread able to call all the way through a chain of onNext calls with a hot CPU cache and no dispatch. The IteratingProcessor model we've implemented should be good at stopping any recursion problems and is much simpler to use than the IteratingCallback we use with our callback model. However, we still do have some concerns with the lack of acknowledgement, so a publisher does not know if it can recycle an item, can't get exceptions in some circumstances and doesn't know if an onCompleted() has progressed to completion all the way through a chain. But give us a few more days to get our heads more into it and also try out some more real use cases, perhaps with some other implementations. But so far its pretty good! cheers |
I assume you saw these? https://github.com/reactive-streams/reactive-streams-jvm/tree/master/examples/src/main/java/org/reactivestreams/example/unicast You may want to inspect other implementations (implementing it is non-trivial just because the problem has inherent complexity) but the end-user will not be implementing it themselves but use an implementation. It may also be interesting for you to leverage an already existing implementation to prototype with. Looking forward to your feedback. |
What type and aggregations should be used to use reactive streams for a byte or character IO?
So for example, if I wanted a Subscriber, does request(n) refer to the number of byte buffers (of unknown size) or to the number of bytes in one or more byte buffers? What if my subscriber wanted to control both the number of bytebuffers and their total size?
I really don't want to do Subscriber unless there was a way to have an onNext(byte[] item) to receive bytes in bulk.
Ditto for String and characters, or arrays of anything
The text was updated successfully, but these errors were encountered: