Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Reactive Streams

Simone,

I've started an issue to discuss this with RS.  I think we need to separate out the error handling from the ack handling so I started on errors first: https://github.com/reactive-streams/reactive-streams-jvm/issues/271

I've also pushed a few changes to my IteratingProcessor:
  • I want it to be able to pass through request(n>1) so that good flows can be achieved if there was a remote situations. This means that I've needed to add an internal queue and a bit more complexity.   IT NEEDS REVIEW!!!!
  • I've changed the item consumption to use another abstract method.  This allows immutable items to be handled that are always consumed (returns true).
  • I've made the exception and error handling more compliant with the spec.
  • I've started adding a TCK based test, but ran out of time to make it work.

cheers





On 4 June 2015 at 01:05, Simone Bordet <sbordet@xxxxxxxxxxx> wrote:
Hi,

On Wed, Jun 3, 2015 at 9:24 AM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:
> Simone,
>
> my point is that you can't get recycling within just the standard RS APIs.
> Ie you can't go get some 3rd party ByteBuffer compressing/encoding/encypting
> processor and expect it to be able to call your complete() method (or to
> take your PooledBuffer for that matter).
>
> If you go outside the API or have control of the implementation of the
> Processors, then there are lots and lots of ways that you can recycle.
>
> But the idea of the standard RS APIs is that you'll be able to plug into 3rd
> party libraries with them... so they really need some standard mechanism for
> acknowledgement or recycling or something.

For the record, you are suggesting that Subscription is augmented with
an additional method:

interface Subscription {
    ...
    void ack(T item, Throwable failure)
}

In this way, a Subscriber can not only invoke request(long) for more
data, but also acknowledge that it has consumed the item, thereby
allowing the Publisher to clean up after that item.

The use case: there is a reactive stream library that accepts
ByteBuffers and does amazing things with them (say, for example,
photoshop like transformations of pictures).
It exposes Subscribers or Processors that take a ByteBuffer as parameter.

If we want to use that library we need to feed it with ByteBuffers.
But, as a Publisher, we want to know when the library is done with the
ByteBuffers, and request(long) does not convey that semantic.

If the library has a way to tell the Publisher (via the Subscription
object) when it is done with the item, then it would be possible to
implement, for example, buffer recycling, and possibly better error
reporting (such as "cannot transform image").

The library would do:

void onNext(ByteBuffer item)
{
    try
    {
        ByteBuffer result = transform(item);
        subscription.ack(item, null); // consumed ok.
        // do something with result
       subscription.request(1); // ready to process more.
    }
    catch (Exception x)
    {
        subscription.ack(item, x); // failed to consume, report failure.
        subscription.cancel(); // I'm wrecked, don't send me more.
    }
}

The ack(...) method would also signal to the Publisher that the
subscriber failed, and why.

--
Simone Bordet
----
http://cometd.org
http://webtide.com
http://intalio.com
Developer advice, training, services and support
from the Jetty & CometD experts.
Intalio, the modern way to build business applications.



--
Greg Wilkins <gregw@xxxxxxxxxxx>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.

Back to the top