Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[jetty-dev] Websocket buffers and async modes


I'd like to walk through websocket buffering and discuss the possibilities for implementing both async modes.

Specifically the websocket async modes are controlled by:


    /**
     * Indicate to the implementation that it is allowed to batch outgoing messages
     * before sending. Not all implementations support batching of outgoing messages.
     * The default mode for RemoteEndpoints is false. If the developer
     * has indicated that batching of outgoing
     * messages is permitted, then the developer must call flushBatch() in order to be
     * sure that all the messages passed into the send methods of this RemoteEndpoint
     * are sent.
     * When batching is allowed, the implementations send operations are considered
     * to have completed if the message has been written to the local batch, in
     * the case when there is still room in the batch for the message, and are considered
     * to have completed if the batch has been send to the peer and the remainder
     * written to the new batch, in the case when
     * writing the message causes the batch to need to be sent. The blocking
     * and asynchronous send methods use this notion of completion in order
     * to complete blocking calls, notify SendHandlers and complete Futures respectively.
     * When batching is allowed, if the developer has called send methods
     * on this RemoteEndpoint without calling flushBatch(), then the implementation
     * may not have sent all the messages the developer has asked to be sent. If
     * the parameter value is false and the implementation has a batch of unsent messages,
     * then the implementation must immediately send the batch of unsent messages.
     *
     * @param allowed whether the implementation is allowed to batch messages.
     * @throws IOException if batching is being disabled and there are unsent messages
     * this error may be thrown as the implementation sends the batch of unsent messages if
     * there is a problem.
     */
    void setBatchingAllowed(boolean allowed) throws IOException;


Jetty 7/8 we essentially in setBatchingAllowed(true) mode always.  Jetty 9 is current in false mode always.  I think for the common write case that we see from apps like cometd the true mode is most efficient, as a producer can send multiple frames before being slowed by any IO bottlenecks and IO operations over slow networks are done with larger more efficient buffers of data.

So tracing how we currently work from a calls on RemoteEnpPoint.Async of
  sendText(String text, SendHandler handler) or   public void sendBinary(ByteBuffer data, SendHandler handler):
  • We create a new WebSocketFrame instance, either a BinaryFrame or a TextFrame and we set the payload on the frame. For text frames, we just save a reference to the string at this point. For binary frames we take a slice off the buffer, so we have our own pos/limit pointers, but share the underlying storage
  • We call  the native jetty API with   WebSocketRemoteEndpoint.uncheckedSendFrame(frame,new SendHandlerWriteCallback(handler));   This ops a wrapping of the handler every time, so eventually it would be good to be able to use standard handlers internally and wrap our own API handlers for legacy code.  calling the unchecked send means we avoid doing duplicate checks on the validity of the frame.
  • send frame does:
    connection.getIOState().assertOutputOpen();
    outgoing.outgoingFrame(frame,callback);
    which looks like a bit of a race to me, as the output can close after the assert?   is the assert really needed?     The outgoing is the chain of extensions.  For almost all browsers we can assume that this is going to include a deflate extension and we should optimise on this assumption
  • The deflate frame extension gets the frame payload, so if it is a text frame then a byte array is obtained and wrapped as a buffer at this time (would be good if we could directly deflate the string?).   It also does a toArray of the payload buffer, so if this was a huge memory mapped file buffer, we would also allocate a large buffer.   It also creates a temporary compressed buffer that is larger than the payload and finally a outbuf buffer of exactly the compressed length.   That is up to 3 data copies, so it may be better to compress the payload in sections directly into a new payload buffer rather than have all the temp byte arrays.   The result of the compression is put into one or more data frames (even for text frames), so it looks like the extension does support fragments.   Eitherway, the frame is now sent downstream with a deflated payload that is within one of our own allocated pool buffers and is not vulnerable to being changed by the application, even after an async callback.
  • Another possible extension is the fragmentation extension. Currently this just loops over the created fragment frames and does nextOutgoingFrame(frag,null); on each until the last one, which does a   nextOutgoingFrame(frag,callback);    This means that we breakup a large frame into multiple small frames all at once. I'm thinking that this really should be done as an iterating callback, so that we can have a single fragment payload buffer that is reused over and over for each fragment.  This could even be a pattern to help with the fragmentation done in the deflate extension.
  • Eventually the extension chain ends up calling     AbstractWebSocketConnection#outgoingFrame(Frame frame, WriteCallback callback) which is implemented with a direct call on FrameFlusher#enqueue(frame,callback)
  • In FrameFlusher#enqueue we are called with a frame that in almost all circumstances will contain a payload that is private to the implementation - it is a fragment, deflated, a converted string.   Only in the case of an undeflated, unfragmented binary message will we have a frame with a payload that is still visible to the application (and thus might require a copy in some circumstances).  The current enqueue only implements non-buffering mode - so it locks the queue and appends the frame (except pings are prepended to the queue) and then calls flush.  The FrameFlusher has an iterating callback, so flush just calls iterate.    In the most common case, the iteratingCB will be idle, because a single producer is probably waiting for a callback before it sends the next frame.  Only if we have multiple thread produces frames or a producer that does not wait for a callback, will we already be flushing the queue here!
  • Each iteration of the iteratingCB takes frames from the queue, get a frame header in its own buffer, gets the payload and does a gather write of as many frames as possible up to MAX_GATHERS (default 8).  But as said, the most common case will just be a single frame and the gather will be just the header and payload.
  • Only once the gather write has succeeded do we call back the callback associated with the passed frame.
  • lather, rinse and repeat!

So this is not the most efficient way to send frames in the most common case. We have a Q and iteratingCB mechanism that will only be used in special cases.  The most common case does a bit of extra handling that is not really required.    However the most common case is probably better suited to the setBatchingAllowed(true) mode, so lets think about how we could support this?

First thing that came to mind is that we could just call the frame callback as soon as the frame is put in the FrameFlusher Q.  But this does not work as the flusher Q can grow infinitely and there is no backpressure to act as flow control of the producers are making frames faster than we can send them.

So I think the correct semantic is call the callback as a frame is prepared for a write - either individually, as a gather write or potentially as it is written into an aggregate buffer.  Let's call this process aggregation - regardless of the actual implementation.

So the first problem is that the current aggregation process (preparing gathered writes) is done by the iteratorCB.   This does not work for batching mode as we need the aggregation to be done when a message in enqueued and not when the current aggregate is flushed.

So enqueue has to be smarter and look at the current state of the interatingCB, if it is idle then enqueue can aggregate, if it is not idle then a prior aggregate is being flushed and enqueue will have to put the frame into the queue.   This has to be atomic with regards to the iteratingCB waking up... so I'm seeing a lock here, but maybe can be done lock-free?   The good news here is that a simple single frame write can avoid the queue and go direct to aggregation.

Next issue is how do we aggregate?   We could copy the frames to an aggregate buffer and once that is full we auto flush.      First problem here is that the current frame.getHeaderBytes() creates a new array holding the header, so we would need to change this to be frame.generateHeaderBytes(ByteBuffer buffer), so we can avoid the copy and generate directly into the aggregate buffer.

We could copy all payloads into the aggregate buffer as well and then we'd just end up with one big buffer holding lots of frames which can be flushed.   Before flushing the aggregate, we would call all the frames callbacks, so while we are flushing, producers would be enqueuing the next set of frames.

Once the flush is complete, we may have one or more frames in the queue, we would then aggregate those frames, call their callbacks which might send more frames, which we would aggregate and call their callbacks etc. until the aggregate was full again or somebody called flush.

Lather, rinse and repeat.

However, I still have a concern that copying into the aggregate buffer may not be the most efficient way.  I think a very common case is that the payload is already going to be in a private buffer that does not need to be copied so we could potentially just keep a reference and still do a gathering write.  Eg it is a string message, or a deflated message or a fragmented message.

So we could still have an aggregate buffer, but perhaps all we put in that are multiple headers with a buffer slice pointing to each one.  We would then create a gather write that consists of a sliced header in the aggregate buffer, a payload reference, a sliced header in the aggregate buffer, a payload reference etc. etc.     This could avoid lots of copies for a common case.... however we would still need to copy payload (into a sliced of the aggregate buffer) if it is an undeflated, unfragmented binary frame, as the application can alter the passed buffer as soon as we call the callback.    More importantly, if we alter the deflate and fragment extension as I have suggested above, then payloads from those extensions could often not be held by reference, since those extension may reused the buffer for the next fragment when they get the callback for the prior fragment.

Thus I think it is simplest to start without gathering writes, and just aggregate into a buffer and copy all payloads passed into that buffer as well.     We could save a copy if we made the aggregate buffer a direct buffer (the frame header generation would have to be written to be aware and write  ints or longs rather than bytes).   However once we go to wss or SPDY, then the aggregate buffer will be encrypted in user space, so we want an indirect buffer.... we really need a way to ask our own IO endpoints if it is worthwhile using a direct buffer or not.

Finally (for now), we have to work out should the one FrameFlusher handle both modes, or do we have two different FrameFlushers and switching modes switches the flusher.   I don't really know on this one, need to experiment.

thoughts?















 

--
Greg Wilkins <gregw@xxxxxxxxxxx>
http://www.webtide.com
Developer advice and support from the Jetty & CometD experts.
Intalio, the modern way to build business applications.

Back to the top