Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Websocket buffers and async modes

(inline)


On Wed, Jan 8, 2014 at 8:30 PM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:

I'd like to walk through websocket buffering and discuss the possibilities for implementing both async modes.

Specifically the websocket async modes are controlled by:


    /**
     * Indicate to the implementation that it is allowed to batch outgoing messages
     * before sending. Not all implementations support batching of outgoing messages.
     * The default mode for RemoteEndpoints is false. If the developer
     * has indicated that batching of outgoing
     * messages is permitted, then the developer must call flushBatch() in order to be
     * sure that all the messages passed into the send methods of this RemoteEndpoint
     * are sent.
     * When batching is allowed, the implementations send operations are considered
     * to have completed if the message has been written to the local batch, in
     * the case when there is still room in the batch for the message, and are considered
     * to have completed if the batch has been send to the peer and the remainder
     * written to the new batch, in the case when
     * writing the message causes the batch to need to be sent. The blocking
     * and asynchronous send methods use this notion of completion in order
     * to complete blocking calls, notify SendHandlers and complete Futures respectively.
     * When batching is allowed, if the developer has called send methods
     * on this RemoteEndpoint without calling flushBatch(), then the implementation
     * may not have sent all the messages the developer has asked to be sent. If
     * the parameter value is false and the implementation has a batch of unsent messages,
     * then the implementation must immediately send the batch of unsent messages.
     *
     * @param allowed whether the implementation is allowed to batch messages.
     * @throws IOException if batching is being disabled and there are unsent messages
     * this error may be thrown as the implementation sends the batch of unsent messages if
     * there is a problem.
     */
    void setBatchingAllowed(boolean allowed) throws IOException;


Jetty 7/8 we essentially in setBatchingAllowed(true) mode always.  Jetty 9 is current in false mode always.  I think for the common write case that we see from apps like cometd the true mode is most efficient, as a producer can send multiple frames before being slowed by any IO bottlenecks and IO operations over slow networks are done with larger more efficient buffers of data.


My reading of that is :
  •  default behavior, as mandated by JSR-356 is batching == false
  •  batching == true means that the user sort-of controls flush (implementation can flush if needed)
  •  batching == false means that the implementation controls its own flushing of messages/frames.
  •  jetty 9.1 is currently implemented for batching == false
 

So tracing how we currently work from a calls on RemoteEnpPoint.Async of
  sendText(String text, SendHandler handler) or   public void sendBinary(ByteBuffer data, SendHandler handler):

I think batching rules will apply to all sends.
  • Basic.getSendStream()
  • Basic.getSendWriter()
  • Basic.sendObject(Object)
  • Basic.sendText(String, boolean) // partial
  • Basic.sendBinary(ByteBuffer, boolean) // partial
  • RemoteEndpoint.sendPing(ByteBuffer)
  • RemoteEndpoint.sendPong(ByteBuffer)
This brings to mind a curious question.
What about streaming sends?
Should the stream.flush() result in a RemoteEndpoint.flushBatch() always?
 
  • We create a new WebSocketFrame instance, either a BinaryFrame or a TextFrame and we set the payload on the frame. For text frames, we just save a reference to the string at this point. For binary frames we take a slice off the buffer, so we have our own pos/limit pointers, but share the underlying storage

For TextFrames we immediately convert to a ByteBuffer via BufferUtil.toUTF8String().
Keep in mind that extensions don't care about Binary vs Text, its just a DataFrame where everything is at the byte level, and can arbitrarily split a unicode character into two separate frames.
 
  • We call  the native jetty API with   WebSocketRemoteEndpoint.uncheckedSendFrame(frame,new SendHandlerWriteCallback(handler));   This ops a wrapping of the handler every time, so eventually it would be good to be able to use standard handlers internally and wrap our own API handlers for legacy code.  calling the unchecked send means we avoid doing duplicate checks on the validity of the frame.
  • send frame does:
    connection.getIOState().assertOutputOpen();
    outgoing.outgoingFrame(frame,callback);
    which looks like a bit of a race to me, as the output can close after the assert?   is the assert really needed?     The outgoing is the chain of extensions.  For almost all browsers we can assume that this is going to include a deflate extension and we should optimise on this assumption
The IOState.assertOutputOpen() check is just to fast-fail attempts to write.  The race is there, yes, but its irrelevant to the overall functioning, anything that falls between the cracks here is eventually failed via the endpoint.write() attempts.

Currently deflate-frame is the most common, but its deprecated.
permessage-deflate is arriving soon in Chrome (desktop and android), expect to see it in Firefox shortly.
 
  • The deflate frame extension gets the frame payload, so if it is a text frame then a byte array is obtained and wrapped as a buffer at this time (would be good if we could directly deflate the string?).   It also does a toArray of the payload buffer, so if this was a huge memory mapped file buffer, we would also allocate a large buffer.   It also creates a temporary compressed buffer that is larger than the payload and finally a outbuf buffer of exactly the compressed length.   That is up to 3 data copies, so it may be better to compress the payload in sections directly into a new payload buffer rather than have all the temp byte arrays.   The result of the compression is put into one or more data frames (even for text frames), so it looks like the extension does support fragments.   Eitherway, the frame is now sent downstream with a deflated payload that is within one of our own allocated pool buffers and is not vulnerable to being changed by the application, even after an async callback.

If you have an idea on how to get java's Deflater to live happily with ByteBuffers we can avoid most of these copies.
For compress flows, its easier to manage the buffers, but for the decompress flows you have no idea of the eventual size of the decompressed data, so you have to allocate based on assumptions and work from there.

I've been resisting implementing Deflate in pure java, but it seems like its becoming more and more important. :-(
 
  • Another possible extension is the fragmentation extension. Currently this just loops over the created fragment frames and does nextOutgoingFrame(frag,null); on each until the last one, which does a   nextOutgoingFrame(frag,callback);    This means that we breakup a large frame into multiple small frames all at once. I'm thinking that this really should be done as an iterating callback, so that we can have a single fragment payload buffer that is reused over and over for each fragment.  This could even be a pattern to help with the fragmentation done in the deflate extension.

Be aware that many of the extensions out there now have rules (written at each extensions spec, not the websocket protocol spec).
Some don't allow for changing of the fragmentation, so we need to be careful here.

Currently, extension negotiation isn't very well defined at the spec level. (i hate that).
For example: permessage-deflate lists rules for other extensions to follow, which is a horrible way to operate a spec.  That will just lead to all extensions needing to know about all other extensions. *ugh*
But I've been tossing ideas at bugzilla for a standardized set of rules for extensions to allow the impl to intelligently negotiate extensions.

Some thoughts:
  • Extension Rule / Uses RSV1
  • Extension Rule / Uses RSV2
  • Extension Rule / Uses RSV3
  • Extension Rule / Mangled Text Frames (produces text frames that are not UTF8 compliant)
  • Extension Rule / Preserve-Fragments
  • Extension Rule / Can ReFragment
  • Extension Rule / Uses Extension Data
These are some of the rules i've culled together from various extension specific specs.
 
  • Eventually the extension chain ends up calling     AbstractWebSocketConnection#outgoingFrame(Frame frame, WriteCallback callback) which is implemented with a direct call on FrameFlusher#enqueue(frame,callback)
  • In FrameFlusher#enqueue we are called with a frame that in almost all circumstances will contain a payload that is private to the implementation - it is a fragment, deflated, a converted string.   Only in the case of an undeflated, unfragmented binary message will we have a frame with a payload that is still visible to the application (and thus might require a copy in some circumstances).  The current enqueue only implements non-buffering mode - so it locks the queue and appends the frame (except pings are prepended to the queue) and then calls flush.  The FrameFlusher has an iterating callback, so flush just calls iterate.    In the most common case, the iteratingCB will be idle, because a single producer is probably waiting for a callback before it sends the next frame.  Only if we have multiple thread produces frames or a producer that does not wait for a callback, will we already be flushing the queue here!

The copy of buffers here can get confusing, especially with the layers of extensions possibly doing it too.
Makes ByteBufferPool.release(ByteBuffer) a layer of releases after write, right?
 
  • Each iteration of the iteratingCB takes frames from the queue, get a frame header in its own buffer, gets the payload and does a gather write of as many frames as possible up to MAX_GATHERS (default 8).  But as said, the most common case will just be a single frame and the gather will be just the header and payload.

We should add some metrics here to verify this assumption.
 
  • Only once the gather write has succeeded do we call back the callback associated with the passed frame.

And the various ByteBufferPool.release() calls! :-)
 
  • lather, rinse and repeat!

So this is not the most efficient way to send frames in the most common case. We have a Q and iteratingCB mechanism that will only be used in special cases.  The most common case does a bit of extra handling that is not really required.    However the most common case is probably better suited to the setBatchingAllowed(true) mode, so lets think about how we could support this?


Where is this "common case" idea coming from?
Its more of the "simple case".  We have no metrics to prove this is the common case.
Based on just paying attention to stackoverflow questions, you have a healthy distribution of questions, with chat topics, realtime data feed topics, large content transfer topics, and streaming topics getting equal attention from the question askers.

We even see people complaining about the per-frame limits of a 63-bit number! (wha!?)

Asking the various open source consumers of our websocket library what's more common ...
  • small (under 100k) vs large
  • a trickle of messages vs frequent messages
I get the same answer from each of them, "there is no common use case, we see all mixes of the above equally"

When I ask about browser vs other, there is a surprise though, they see more library direct use (think server to server, embedded device to server, or app to server) than browser use.

Two ends of the spectrum:
  • B2C text chat application
  • Multi User VoIP (video + audio + screensharing) 

First thing that came to mind is that we could just call the frame callback as soon as the frame is put in the FrameFlusher Q.  But this does not work as the flusher Q can grow infinitely and there is no backpressure to act as flow control of the producers are making frames faster than we can send them.

So I think the correct semantic is call the callback as a frame is prepared for a write - either individually, as a gather write or potentially as it is written into an aggregate buffer.  Let's call this process aggregation - regardless of the actual implementation.


What if it is a large frame, that requires multiple writes to be successful?
 

So the first problem is that the current aggregation process (preparing gathered writes) is done by the iteratorCB.   This does not work for batching mode as we need the aggregation to be done when a message in enqueued and not when the current aggregate is flushed.

So enqueue has to be smarter and look at the current state of the interatingCB, if it is idle then enqueue can aggregate, if it is not idle then a prior aggregate is being flushed and enqueue will have to put the frame into the queue.   This has to be atomic with regards to the iteratingCB waking up... so I'm seeing a lock here, but maybe can be done lock-free?   The good news here is that a simple single frame write can avoid the queue and go direct to aggregation.

Next issue is how do we aggregate?   We could copy the frames to an aggregate buffer and once that is full we auto flush.      First problem here is that the current frame.getHeaderBytes() creates a new array holding the header, so we would need to change this to be frame.generateHeaderBytes(ByteBuffer buffer), so we can avoid the copy and generate directly into the aggregate buffer.


I did this originally, even had a single large ByteBuffer that i just sliced a few times over to see different parts of the same underlying array, passing each sliced view of the same array into the gathering writes.  It was rather buggy at the time, but I liked that approach.  We can resurrect that if you think it would be worth it.
 

We could copy all payloads into the aggregate buffer as well and then we'd just end up with one big buffer holding lots of frames which can be flushed.   Before flushing the aggregate, we would call all the frames callbacks, so while we are flushing, producers would be enqueuing the next set of frames.


We would wind up with lots of ByteBuffers to BufferPool.release() or lots of copying this way right?
 

Once the flush is complete, we may have one or more frames in the queue, we would then aggregate those frames, call their callbacks which might send more frames, which we would aggregate and call their callbacks etc. until the aggregate was full again or somebody called flush.

Lather, rinse and repeat.

However, I still have a concern that copying into the aggregate buffer may not be the most efficient way.  I think a very common case is that the payload is already going to be in a private buffer that does not need to be copied so we could potentially just keep a reference and still do a gathering write.  Eg it is a string message, or a deflated message or a fragmented message.


By the time the frame reaches the whole aggregate write level we can't be thinking in terms of String vs Deflated vs Fragmented.  Its just frames with headers and payload.  All of the rest of the knowledge should be considered unavailable at that point.   

Don't forget that whatever solution we come up with will also have to fix the WS/SPDY and WS/HTTP2 world too.
We'll have to think about this aggregation concern at a point before HTTP/1.1 EndPoint vs SPDY vs HTTP/2, since the same rules/logic will need to exist for those as well.  Gladly the whole frame split generation of headers vs payload we have seems to have been a happy accidental alignment of the stars :)
 

So we could still have an aggregate buffer, but perhaps all we put in that are multiple headers with a buffer slice pointing to each one.  We would then create a gather write that consists of a sliced header in the aggregate buffer, a payload reference, a sliced header in the aggregate buffer, a payload reference etc. etc.     This could avoid lots of copies for a common case.... however we would still need to copy payload (into a sliced of the aggregate buffer) if it is an undeflated, unfragmented binary frame, as the application can alter the passed buffer as soon as we call the callback.    More importantly, if we alter the deflate and fragment extension as I have suggested above, then payloads from those extensions could often not be held by reference, since those extension may reused the buffer for the next fragment when they get the callback for the prior fragment.

Thus I think it is simplest to start without gathering writes, and just aggregate into a buffer and copy all payloads passed into that buffer as well.     We could save a copy if we made the aggregate buffer a direct buffer (the frame header generation would have to be written to be aware and write  ints or longs rather than bytes).   However once we go to wss or SPDY, then the aggregate buffer will be encrypted in user space, so we want an indirect buffer.... we really need a way to ask our own IO endpoints if it is worthwhile using a direct buffer or not.


With the latest mux extension chatter being to drop it in favor of WS/SPDY and WS/HTTP2 it seems that we need to abstract out the header generation for the endpoint (eg: a websocket header becomes a SPDY header, not an array of bytes) anyway.  The WebSocketSPDYConnection object of the future would also need handle these aggregation cases.
 

Finally (for now), we have to work out should the one FrameFlusher handle both modes, or do we have two different FrameFlushers and switching modes switches the flusher.   I don't really know on this one, need to experiment.

thoughts?


 

Back to the top