Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Reactive Streams

Hi,

On Wed, Jun 3, 2015 at 9:24 AM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:
> Simone,
>
> my point is that you can't get recycling within just the standard RS APIs.
> Ie you can't go get some 3rd party ByteBuffer compressing/encoding/encypting
> processor and expect it to be able to call your complete() method (or to
> take your PooledBuffer for that matter).
>
> If you go outside the API or have control of the implementation of the
> Processors, then there are lots and lots of ways that you can recycle.
>
> But the idea of the standard RS APIs is that you'll be able to plug into 3rd
> party libraries with them... so they really need some standard mechanism for
> acknowledgement or recycling or something.

For the record, you are suggesting that Subscription is augmented with
an additional method:

interface Subscription {
    ...
    void ack(T item, Throwable failure)
}

In this way, a Subscriber can not only invoke request(long) for more
data, but also acknowledge that it has consumed the item, thereby
allowing the Publisher to clean up after that item.

The use case: there is a reactive stream library that accepts
ByteBuffers and does amazing things with them (say, for example,
photoshop like transformations of pictures).
It exposes Subscribers or Processors that take a ByteBuffer as parameter.

If we want to use that library we need to feed it with ByteBuffers.
But, as a Publisher, we want to know when the library is done with the
ByteBuffers, and request(long) does not convey that semantic.

If the library has a way to tell the Publisher (via the Subscription
object) when it is done with the item, then it would be possible to
implement, for example, buffer recycling, and possibly better error
reporting (such as "cannot transform image").

The library would do:

void onNext(ByteBuffer item)
{
    try
    {
        ByteBuffer result = transform(item);
        subscription.ack(item, null); // consumed ok.
        // do something with result
       subscription.request(1); // ready to process more.
    }
    catch (Exception x)
    {
        subscription.ack(item, x); // failed to consume, report failure.
        subscription.cancel(); // I'm wrecked, don't send me more.
    }
}

The ack(...) method would also signal to the Publisher that the
subscriber failed, and why.

-- 
Simone Bordet
----
http://cometd.org
http://webtide.com
http://intalio.com
Developer advice, training, services and support
from the Jetty & CometD experts.
Intalio, the modern way to build business applications.


Back to the top