Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[jetty-users] Streaming proxy of content using jetty and jetty-client with end to end flow control

Hi jetty-users,

Still working on my http proxy :)
Now I'm trying to make sure I've got the content proxying working correctly
using both asynchronous servlet as well as NIO nonblocking writing.

Because both the server and client are pushing notifications at me, I believe
I need to implement some sort of transfer queue.  I was hoping that I'd end up
in a push-pull situation but ServletOutputStream calls you via a WriteListener
and Jetty-Client calls you via a Response.ContentListener.

Unfortunately, this leaves me in a position where I don't know how to get flow
control working.  The client pushes data at me and I don't have a way to push back.

My current pesudocoded approach:

Deque<Runnable> writes;
volatile boolean complete = false;

asyncCtx = httpResponse.startAsync();
out = (HttpOutput) httpResponse.getOutputStream();

writeListener = new WriteListener() {
    synchronized void onWritePossible() { // avoid re-entrance with 'synchronized'
        while(out.isReady()) {
            pending = writes.removeFirst();
            if (pending != null) {
                pending.run();
            } else if (complete && writes.isEmpty()) {
                asyncCtx.complete();
            } else {
                return;
            }
        }
    }
};

out.setWriteListener(writeListener);

proxyClientRequest.send(new Response.ContentListener() {
    void onContent(ByteBuffer buf, Callback complete) {
        writes.push(() -> {
            out.write(buf);
            complete.succeeded();
        };
        writeListener.onWritePossible();  // See if we can write it immediately
    }

    void onComplete() {
        complete = true;
        writeListener.onWritePossible();
    }
});

This has the nice property where I don't copy buffers, just keep them in a queue
of pending writes -- but I worry that in the case of a slow reader, Jetty Client
will continue onContent-pushing buffers at me until I run out of memory.

I could instead use the OutputStreamContentProvider and just read from it in the
WriteListener, which might fix the flow control, but that seems likely to involve
a number of byte[] copies that I was hoping to avoid.

What's the right approach to wire up Jetty server to Jetty client with full NIO + Async
content proxying, without introducing an intermediate transfer buffer that might grow
without bound?  Am I on the right path or am I missing something?  Thanks for any guidance!

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail


Back to the top