Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Reactive Streams

Hi,

On Mon, Jun 1, 2015 at 6:56 AM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:
> Simone,
>
> to express my concerns in terms of your examples...  consider your
> FormProcessor which is a Processor<ByteBuffer,Forms> that essentially is a
> N:1 mapping between buffers of data and a collection of fields.
>
> However, consider if instead of Form data, you wanted to write a
> MultipartMimeProcessor, where each field could be a form field or an
> uploaded file of potentially huge size.       In that use-case, you would
> not want to map to a Fields collection as it could be too large to fit in
> memory.    Perhaps you could have a processor that put the large parts out
> to disk?

Yes, large files are an orthogonal problem.

> But even with disk storage their could be latency and/or size issues that
> could only be addressed by processing field by field rather than all fields
> at once.  Thus it be reasonable to try to stream individual fields to the
> application.  So instead of a N:1 map from ByteBuffer to Fields, you would
> have a N:M map of ByteBuffer to Field
>
> How could such a Processor be written non-blocking?

So the model is that you want to map N ByteBuffers to M Parts (to
reuse the Servlet terminology).

The MultipartMimeProcessor will have inside an async parser that is
able to parse the multipart mime format, and produce Part objects.

It's always the last element in the publisher/subscriber chain that
drives the demand, so it will ask for 1 Part by calling request(1).
This demand arrives to the MultipartMimeProcessor, which has nothing
to give and will demand a ByteBuffer, so it'll call request(1) as
well.
This other demand arrives to the servlet Publisher, which will trigger
an async read (exactly like I do in the FormProcessor).
Let's say it reads 1 Part and a half worth of bytes into a single ByteBuffer.
The Publisher passes the ByteBuffer to the MultipartMimeProcessor by
calling onNext(ByteBuffer) and the parser will start parsing.
It will produce on Part object, and call onNext(Part), thus stop
parsing (due to the fact that the call is synchronous with the
parsing).
Now the application has to consume this Part, and once it has, it
calls request(1) again to demand another Part.
Eventually, onNext(Part) returns, and the processor looks at whether
it has more demand for Parts or not.
If it has not, it stops parsing immediately even if it has more bytes
to parse, saves the state, and returns *without* calling request(int)
for further ByteBuffers.
This returns back to the Publisher, which will see there is no demand
for ByteBuffers and will exit the async read loop.
At this point, we are in a threadless wait for the application to
consume the Part.
Let's assume that a different thread signals that the Part has now
been consumed.
Then that Subscriber will call request(1) to demand one more Part.
This demand will arrive to the MultipartMimeProcessor which will look
at its state, see there is more to parse in the previous ByteBuffer,
and parse it.
Unfortunately, what's left in that buffer is not enough, so the
ByteBuffer will be consumed, and at that point the
MultipartMimeProcessor will call request(1) to ask for more
ByteBuffers.
This request will arrive to the Publisher, which will look at its
state and it will know whether it can read or not.
Or, it will attempt a read anyway, and have isReady() return false,
which will schedule a onDataAvailable() call later.
Again, we are in threadless wait for I/O.
And so forth.

Basically, the demand is per-stage, not per the whole chain.
Each Subscriber (stage) only calls request(1) when the item is fully consumed.
N-to-M mappings are handled by async parsers (or similar) by storing
data aside until  there is more demand from downstream.
And even if the parser has more data to continue, it won't until
downstream has demanded it.

There is no requirement for onNext(T) to consume the T before returning.
The contract for consumption is to call request(int): that is the only
way you signal to the upstream stages that you want more.

This imposes a burden on the implementers of Subscribers to call
request(int) very carefully at the right time, especially if the item
can be consumed asynchronously.
However, like in my implementation, there are "utility" subscribers
that will take care of this detail, and that only expose simpler
utility methods (in my implementation these are the send() methods).

Makes sense ?

-- 
Simone Bordet
----
http://cometd.org
http://webtide.com
http://intalio.com
Developer advice, training, services and support
from the Jetty & CometD experts.
Intalio, the modern way to build business applications.


Back to the top