Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Reactive Streams

Simone,

I think a lightbulb might be turning on in my thick skull.... see this post:

https://github.com/reactive-streams/reactive-streams-jvm/issues/270#issuecomment-106257522

will try that tomorrow....

hmmm got a response to that already... Not sure I like the "request is not an ack" mime as it kind of means you can never recycle a buffer passed to onNext (unless you reference count or get the subscriber to recycle for you hmmm).

On 3 June 2015 at 00:22, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:


On 2 June 2015 at 05:09, Simone Bordet <sbordet@xxxxxxxxxxx> wrote:
Hi,

On Mon, Jun 1, 2015 at 2:48 PM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:
> Do you have any comments about the README and code in the
> FragmentingProcessor branch?

The implementation of FragmentingProcessor is IMHO wrong because it
does not modify the demand.

If you take a big ByteBuffer and you want to split it (say without any
processing, just split it), the algorithm would be something like:

onNext(buffer) {
  while (true) {
    if (demand <= 0) {
      save_buffer(buffer);
      return;
    }

    piece = create_piece();
    --demand;
    onNext(piece);
  }

Pardon the probable errors in the pseudo code above, but the key point
I want to show is that you must check for demand, and decrease it
inside the while loop.
You don't do any work if you don't have demand. That is what is
missing from your current implementation of FragmentingProcessor.



But my fragmentor does modify decrement demand.  Demand in this case is help by the counting semaphore and it is acquired in the loop.  So in my case if there is no demand the fragmenter blocks.  I'll rename the semaphone to demand!

You are suggesting it saves the buffer, but that is not a general solution as it may have been passed a 4GB memory mapped buffer and thus cannot save it to memory!


 
I'm not sure I follow your last point about close().
If you have stuff to send, then close() should first call onNext()
with what remains to send (and only if it has demand), and when it's
called back with a request(int) to signal more demand, it can call
onComplete().

Exactly!   So if you are a Processor and onComplete is called on you, you have to wait for demand so that you can flush your buffers.  onComplete will thus be a blocking call!

I'll write up that example in the README

cheers


--
Greg Wilkins <gregw@xxxxxxxxxxx>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.



--
Greg Wilkins <gregw@xxxxxxxxxxx>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.

Back to the top