Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Reactive Streams



On 2 June 2015 at 05:09, Simone Bordet <sbordet@xxxxxxxxxxx> wrote:
Hi,

On Mon, Jun 1, 2015 at 2:48 PM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:
> Do you have any comments about the README and code in the
> FragmentingProcessor branch?

The implementation of FragmentingProcessor is IMHO wrong because it
does not modify the demand.

If you take a big ByteBuffer and you want to split it (say without any
processing, just split it), the algorithm would be something like:

onNext(buffer) {
  while (true) {
    if (demand <= 0) {
      save_buffer(buffer);
      return;
    }

    piece = create_piece();
    --demand;
    onNext(piece);
  }

Pardon the probable errors in the pseudo code above, but the key point
I want to show is that you must check for demand, and decrease it
inside the while loop.
You don't do any work if you don't have demand. That is what is
missing from your current implementation of FragmentingProcessor.



But my fragmentor does modify decrement demand.  Demand in this case is help by the counting semaphore and it is acquired in the loop.  So in my case if there is no demand the fragmenter blocks.  I'll rename the semaphone to demand!

You are suggesting it saves the buffer, but that is not a general solution as it may have been passed a 4GB memory mapped buffer and thus cannot save it to memory!


 
I'm not sure I follow your last point about close().
If you have stuff to send, then close() should first call onNext()
with what remains to send (and only if it has demand), and when it's
called back with a request(int) to signal more demand, it can call
onComplete().

Exactly!   So if you are a Processor and onComplete is called on you, you have to wait for demand so that you can flush your buffers.  onComplete will thus be a blocking call!

I'll write up that example in the README

cheers


--
Greg Wilkins <gregw@xxxxxxxxxxx>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.

Back to the top