Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Reactive Streams

Hi,

On Mon, Jun 1, 2015 at 2:48 PM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:
> Do you have any comments about the README and code in the
> FragmentingProcessor branch?

The implementation of FragmentingProcessor is IMHO wrong because it
does not modify the demand.

If you take a big ByteBuffer and you want to split it (say without any
processing, just split it), the algorithm would be something like:

onNext(buffer) {
  while (true) {
    if (demand <= 0) {
      save_buffer(buffer);
      return;
    }

    piece = create_piece();
    --demand;
    onNext(piece);
  }

Pardon the probable errors in the pseudo code above, but the key point
I want to show is that you must check for demand, and decrease it
inside the while loop.
You don't do any work if you don't have demand. That is what is
missing from your current implementation of FragmentingProcessor.

Your mutability point is more interesting, but I don't think is about
mutability per se, but more about reuse.
You could pass a mutable object without problems, provided you don't reuse it.
However, for reuse it is only matter of passing the right item:
instead of passing a ByteBuffer you can pass a pair (ByteBuffer,
Callback), with the contract that the callback must be invoked when
the buffer is consumed, and then you can know whether you can reuse or
not.
We have encountered this many times in the Jetty code exactly because
we want to pool/reuse buffers, and we ended up with similar solutions
(a pair that identifies buffer and callback) and here is no different.
Perhaps the callback is just a single method (say completed()) rather
than the two we have in Callback, but you get the point.

I'm not sure I follow your last point about close().
If you have stuff to send, then close() should first call onNext()
with what remains to send (and only if it has demand), and when it's
called back with a request(int) to signal more demand, it can call
onComplete().
Now, close() would have to be implemented in a blocking way because
that is its semantic.
I don't see close() as you report any different from a pair write() +
terminate(), where write() does the last write, and terminate() only
signals the completion.

So overall, I think that with the reactive streams API can cover all
your concerns.
We just need to use the right type T, depending on the case, and
implement correctly the checks on the demand in the publisher.

-- 
Simone Bordet
----
http://cometd.org
http://webtide.com
http://intalio.com
Developer advice, training, services and support
from the Jetty & CometD experts.
Intalio, the modern way to build business applications.


Back to the top