Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Websocket buffers and async modes



On 10 January 2014 08:15, Joakim Erdfelt <joakim@xxxxxxxxxxx> wrote:
(inline)

ditto
 


On Wed, Jan 8, 2014 at 8:30 PM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:

Specifically the websocket async modes are controlled by:
    void setBatchingAllowed(boolean allowed) throws IOException;


Jetty 7/8 we essentially in setBatchingAllowed(true) mode always.  Jetty 9 is current in false mode always.  I think for the common write case that we see from apps like cometd the true mode is most efficient, as a producer can send multiple frames before being slowed by any IO bottlenecks and IO operations over slow networks are done with larger more efficient buffers of data.


My reading of that is :
  •  default behavior, as mandated by JSR-356 is batching == false
  •  batching == true means that the user sort-of controls flush (implementation can flush if needed)
  •  batching == false means that the implementation controls its own flushing of messages/frames.
  •  jetty 9.1 is currently implemented for batching == false

+1

 

So tracing how we currently work from a calls on RemoteEnpPoint.Async of
  sendText(String text, SendHandler handler) or   public void sendBinary(ByteBuffer data, SendHandler handler):

I think batching rules will apply to all sends.
  • Basic.getSendStream()
  • Basic.getSendWriter()
  • Basic.sendObject(Object)
  • Basic.sendText(String, boolean) // partial
  • Basic.sendBinary(ByteBuffer, boolean) // partial
  • RemoteEndpoint.sendPing(ByteBuffer)
  • RemoteEndpoint.sendPong(ByteBuffer)
This brings to mind a curious question.
What about streaming sends?
Should the stream.flush() result in a RemoteEndpoint.flushBatch() always?

Good question about streaming sends.   I think if streaming becomes a common use case then it can change a lot of assumptions I've made about what form the user data will come in.  I can see streaming being used as a target for JSON/XML generators etc.

I think that streaming makes a lot of sense with batching and agree that flush should carry through.

Because we don't know really the actual common usage pattern, I'm inclined to disregard some of my points made about "most common use cases will be in private buffers" and just accept that in batching mode we should copy the payload into the aggregate buffer.  This is fine if we have reasonable fragmentation, but could be an issue when handling really large frames (which might have to bypass aggregationi - ah complexity!).


  • We create a new WebSocketFrame instance, either a BinaryFrame or a TextFrame and we set the payload on the frame. For text frames, we just save a reference to the string at this point. For binary frames we take a slice off the buffer, so we have our own pos/limit pointers, but share the underlying storage

For TextFrames we immediately convert to a ByteBuffer via BufferUtil.toUTF8String().

I added a lazy conversion to TextFrames, which is worthwhile if we are not deflating as it means that text messages in the Q don't hold byte buffers as well.
 
Keep in mind that extensions don't care about Binary vs Text, its just a DataFrame where everything is at the byte level, and can arbitrarily split a unicode character into two separate frames.

Indeed.   But I think we have to well handle the case of being passed a very large string and avoid creating a very large byte array and holding while the frame is waiting to be sent.   So we may have to look at how we handle UTF-8 strings in HTTP output stream to avoid significant memory usage and extra copies.

Of course it would be best if extensions do not have any specific text/binary handling, so we might need to look at adding some payload cursor abstraction to AbstractFrame so extensions can ask for a chunk and then another chunk of payload without the entire payload being copied to an array.

 
 
  • We call  the native jetty API with   WebSocketRemoteEndpoint.uncheckedSendFrame(frame,new SendHandlerWriteCallback(handler));   This ops a wrapping of the handler every time, so eventually it would be good to be able to use standard handlers internally and wrap our own API handlers for legacy code.  calling the unchecked send means we avoid doing duplicate checks on the validity of the frame.
  • send frame does:
    connection.getIOState().assertOutputOpen();
    outgoing.outgoingFrame(frame,callback);
    which looks like a bit of a race to me, as the output can close after the assert?   is the assert really needed?     The outgoing is the chain of extensions.  For almost all browsers we can assume that this is going to include a deflate extension and we should optimise on this assumption
The IOState.assertOutputOpen() check is just to fast-fail attempts to write.  The race is there, yes, but its irrelevant to the overall functioning, anything that falls between the cracks here is eventually failed via the endpoint.write() attempts.

cool.

 
Currently deflate-frame is the most common, but its deprecated.
permessage-deflate is arriving soon in Chrome (desktop and android), expect to see it in Firefox shortly.

Can you summarize the difference?   It is just that message deflate only resets the compression index at the end of the message?   Is it still able to change the fragmentation of an already fragmented message?

 

If you have an idea on how to get java's Deflater to live happily with ByteBuffers we can avoid most of these copies.

Indeed that is the problem.  Currently I think the only way to avoid the copies of huge payloads is to compress from and to smaller buffers, thus fragmenting large payloads.
 
For compress flows, its easier to manage the buffers, but for the decompress flows you have no idea of the eventual size of the decompressed data, so you have to allocate based on assumptions and work from there.

Sure - input side is a totally different story.
 

I've been resisting implementing Deflate in pure java, but it seems like its becoming more and more important. :-(

I've looked at that once or twice as well....   I think fragmentation is an OK thing to do, so that makes me lean towards not doing this... but worthwhile re-evaluating every so often.
 
 

Be aware that many of the extensions out there now have rules (written at each extensions spec, not the websocket protocol spec).
Some don't allow for changing of the fragmentation, so we need to be careful here.


+1
 
 
  • Eventually the extension chain ends up calling     AbstractWebSocketConnection#outgoingFrame(Frame frame, WriteCallback callback) which is implemented with a direct call on FrameFlusher#enqueue(frame,callback)
  • In FrameFlusher#enqueue we are called with a frame that in almost all circumstances will contain a payload that is private to the implementation - it is a fragment, deflated, a converted string.   Only in the case of an undeflated, unfragmented binary message will we have a frame with a payload that is still visible to the application (and thus might require a copy in some circumstances).  The current enqueue only implements non-buffering mode - so it locks the queue and appends the frame (except pings are prepended to the queue) and then calls flush.  The FrameFlusher has an iterating callback, so flush just calls iterate.    In the most common case, the iteratingCB will be idle, because a single producer is probably waiting for a callback before it sends the next frame.  Only if we have multiple thread produces frames or a producer that does not wait for a callback, will we already be flushing the queue here!

The copy of buffers here can get confusing, especially with the layers of extensions possibly doing it too.
Makes ByteBufferPool.release(ByteBuffer) a layer of releases after write, right?

So the more I think about it, I think that just having an aggregate buffer that we copy everything into is safest thing to do.   If that buffer can be direct, then that is a win anyway.  If it can't be direct (because of ssl), then I still think the code simplicity that will result is worthwhile.

 
 
  • Each iteration of the iteratingCB takes frames from the queue, get a frame header in its own buffer, gets the payload and does a gather write of as many frames as possible up to MAX_GATHERS (default 8).  But as said, the most common case will just be a single frame and the gather will be just the header and payload.

We should add some metrics here to verify this assumption.

Indeed it would be good to get this measured from any asynchronous uses already out there.  We can probably just ask the questions rather than measure it:
  • Do you have multiple threads/producers writing to the same websocket?
  • Does a producer wait for the callback before sending the next message?
I'm assuming the most common case will be No/Yes for those - but would love to hear from users that think otherwise?
 
 
  • Only once the gather write has succeeded do we call back the callback associated with the passed frame.

And the various ByteBufferPool.release() calls! :-)

Well the releases could be called later if we assume the buffers are private and cannot be changed by the results of the callback.    In many ways it would be good to move to a model where the impl never releases buffers allocated by others, as only they know if they are going to be reused or not.     But yes we have to evaluate how all these changes effect release strategy.

Again I'm leaning towards just copying into aggregate buffer and letting the caller release/reuse the passed buffer as they see fit.


 
  • lather, rinse and repeat!

So this is not the most efficient way to send frames in the most common case. We have a Q and iteratingCB mechanism that will only be used in special cases.  The most common case does a bit of extra handling that is not really required.    However the most common case is probably better suited to the setBatchingAllowed(true) mode, so lets think about how we could support this?


Where is this "common case" idea coming from?

OK my common case is probably cometd.     It is that we have a single event stream being written to the websocket that uses the callbacks as flow control to ensure that it does not write faster that the websocket can handle.

 
Its more of the "simple case".  We have no metrics to prove this is the common case.
Based on just paying attention to stackoverflow questions, you have a healthy distribution of questions, with chat topics, realtime data feed topics, large content transfer topics, and streaming topics getting equal attention from the question askers.

We even see people complaining about the per-frame limits of a 63-bit number! (wha!?)

Asking the various open source consumers of our websocket library what's more common ...
  • small (under 100k) vs large
  • a trickle of messages vs frequent messages
I get the same answer from each of them, "there is no common use case, we see all mixes of the above equally"

When I ask about browser vs other, there is a surprise though, they see more library direct use (think server to server, embedded device to server, or app to server) than browser use.

Two ends of the spectrum:
  • B2C text chat application
  • Multi User VoIP (video + audio + screensharing) 

Good points.  I agree that we need to support all the cases well.     However I also don't mind giving some favour to the cometd case.


First thing that came to mind is that we could just call the frame callback as soon as the frame is put in the FrameFlusher Q.  But this does not work as the flusher Q can grow infinitely and there is no backpressure to act as flow control of the producers are making frames faster than we can send them.

So I think the correct semantic is call the callback as a frame is prepared for a write - either individually, as a gather write or potentially as it is written into an aggregate buffer.  Let's call this process aggregation - regardless of the actual implementation.


What if it is a large frame, that requires multiple writes to be successful?

Indeed tricky one.    We probably have to have a bypass for large frames that are too big to be aggregated.   We have the same issue in HttpOutput, were we only aggregate small buffers and large ones are passed directly to the new layer.   Isn't complexity fun!


 

So the first problem is that the current aggregation process (preparing gathered writes) is done by the iteratorCB.   This does not work for batching mode as we need the aggregation to be done when a message in enqueued and not when the current aggregate is flushed.

So enqueue has to be smarter and look at the current state of the interatingCB, if it is idle then enqueue can aggregate, if it is not idle then a prior aggregate is being flushed and enqueue will have to put the frame into the queue.   This has to be atomic with regards to the iteratingCB waking up... so I'm seeing a lock here, but maybe can be done lock-free?   The good news here is that a simple single frame write can avoid the queue and go direct to aggregation.

Next issue is how do we aggregate?   We could copy the frames to an aggregate buffer and once that is full we auto flush.      First problem here is that the current frame.getHeaderBytes() creates a new array holding the header, so we would need to change this to be frame.generateHeaderBytes(ByteBuffer buffer), so we can avoid the copy and generate directly into the aggregate buffer.


I did this originally, even had a single large ByteBuffer that i just sliced a few times over to see different parts of the same underlying array, passing each sliced view of the same array into the gathering writes.  It was rather buggy at the time, but I liked that approach.  We can resurrect that if you think it would be worth it.
 

We could copy all payloads into the aggregate buffer as well and then we'd just end up with one big buffer holding lots of frames which can be flushed.   Before flushing the aggregate, we would call all the frames callbacks, so while we are flushing, producers would be enqueuing the next set of frames.


We would wind up with lots of ByteBuffers to BufferPool.release() or lots of copying this way right?

I think we would end up with lots of copying, but the more I think about it the more I think we have to copy anyway.    The only way we can do a callback without copying the payload is if we REALLY know it is a private buffer and will not be reused/released by the caller.    This is true for TextFrames that have be passed directly from the app to the flusher, but I think such frames will be rare as many are going to be deflated or fragmented.    If we go with a deflator that uses a fixed size buffer, then it could be reused/release when a callback happens, so it still has to be copied.

So considering that there will be modes were most/all of the passed buffers are not private, I don't think it is worth the complexity to try to work out if we have to copy or not - and if we don't copy if we have to be responsible for release or not.

I think it is better to just copy and let the caller release their buffers/frames.

If we can copy into a direct buffer we get a win anyway.  If we can't we still save a lot in complexity and work well for the frequent case of deflating.


However, I still have a concern that copying into the aggregate buffer may not be the most efficient way.  I think a very common case is that the payload is already going to be in a private buffer that does not need to be copied so we could potentially just keep a reference and still do a gathering write.  Eg it is a string message, or a deflated message or a fragmented message.


By the time the frame reaches the whole aggregate write level we can't be thinking in terms of String vs Deflated vs Fragmented.  Its just frames with headers and payload.  All of the rest of the knowledge should be considered unavailable at that point.   

+1.   Leaning towards the just copy always approach :)

 
Don't forget that whatever solution we come up with will also have to fix the WS/SPDY and WS/HTTP2 world too.
We'll have to think about this aggregation concern at a point before HTTP/1.1 EndPoint vs SPDY vs HTTP/2, since the same rules/logic will need to exist for those as well.  Gladly the whole frame split generation of headers vs payload we have seems to have been a happy accidental alignment of the stars :)
 

true.
 

With the latest mux extension chatter being to drop it in favor of WS/SPDY and WS/HTTP2 it seems that we need to abstract out the header generation for the endpoint (eg: a websocket header becomes a SPDY header, not an array of bytes) anyway.  The WebSocketSPDYConnection object of the future would also need handle these aggregation cases.

ah very interesting.    More to ponder here!

So still lots of uncertainty, but at this stage I have convinced myself that we do need to support batching as it will greatly help cometd.  But I cannot convince myself that there will be a common enough case to allow us to not copy payloads and just keep referenced to the passed payload. So that pushes us to copying always.    

Next question is to we copy as we generate - ie layout the wire protocol of headers and payloads into a direct buffer ready to send?   or do we copy at a higher level so that alternate framing can be used like WS/HTTP2. Needs more time with the code to really answer this.

So I think we need to continue with discussions, thought experiments and maybe testing assumptions before we jump into the coding of this.

cheers

--
Greg Wilkins <gregw@xxxxxxxxxxx>
http://www.webtide.com
Developer advice and support from the Jetty & CometD experts.
Intalio, the modern way to build business applications.

Back to the top