Jetty 7/8 we essentially in setBatchingAllowed(true) mode always. Jetty 9 is current in false mode always. I think for the common write case that we see from apps like cometd the true mode is most efficient, as a producer can send multiple frames before being slowed by any IO bottlenecks and IO operations over slow networks are done with larger more efficient buffers of data.
So tracing how we currently work from a calls on RemoteEnpPoint.Async of
sendText(String text, SendHandler handler) or public void sendBinary(ByteBuffer data, SendHandler handler):
So this is not the most efficient way to send frames in the most common case. We have a Q and iteratingCB mechanism that will only be used in special cases. The most common case does a bit of extra handling that is not really required. However the most common case is probably better suited to the setBatchingAllowed(true) mode, so lets think about how we could support this?
First thing that came to mind is that we could just call the frame callback as soon as the frame is put in the FrameFlusher Q. But this does not work as the flusher Q can grow infinitely and there is no backpressure to act as flow control of the producers are making frames faster than we can send them.
So I think the correct semantic is call the callback as a frame is prepared for a write - either individually, as a gather write or potentially as it is written into an aggregate buffer. Let's call this process aggregation - regardless of the actual implementation.
So the first problem is that the current aggregation process (preparing gathered writes) is done by the iteratorCB. This does not work for batching mode as we need the aggregation to be done when a message in enqueued and not when the current aggregate is flushed.
So enqueue has to be smarter and look at the current state of the interatingCB, if it is idle then enqueue can aggregate, if it is not idle then a prior aggregate is being flushed and enqueue will have to put the frame into the queue. This has to be atomic with regards to the iteratingCB waking up... so I'm seeing a lock here, but maybe can be done lock-free? The good news here is that a simple single frame write can avoid the queue and go direct to aggregation.
Next issue is how do we aggregate? We could copy the frames to an aggregate buffer and once that is full we auto flush. First problem here is that the current frame.getHeaderBytes() creates a new array holding the header, so we would need to change this to be frame.generateHeaderBytes(ByteBuffer buffer), so we can avoid the copy and generate directly into the aggregate buffer.
We could copy all payloads into the aggregate buffer as well and then we'd just end up with one big buffer holding lots of frames which can be flushed. Before flushing the aggregate, we would call all the frames callbacks, so while we are flushing, producers would be enqueuing the next set of frames.
Once the flush is complete, we may have one or more frames in the queue, we would then aggregate those frames, call their callbacks which might send more frames, which we would aggregate and call their callbacks etc. until the aggregate was full again or somebody called flush.
Lather, rinse and repeat.
However, I still have a concern that copying into the aggregate buffer may not be the most efficient way. I think a very common case is that the payload is already going to be in a private buffer that does not need to be copied so we could potentially just keep a reference and still do a gathering write. Eg it is a string message, or a deflated message or a fragmented message.
So we could still have an aggregate buffer, but perhaps all we put in that are multiple headers with a buffer slice pointing to each one. We would then create a gather write that consists of a sliced header in the aggregate buffer, a payload reference, a sliced header in the aggregate buffer, a payload reference etc. etc. This could avoid lots of copies for a common case.... however we would still need to copy payload (into a sliced of the aggregate buffer) if it is an undeflated, unfragmented binary frame, as the application can alter the passed buffer as soon as we call the callback. More importantly, if we alter the deflate and fragment extension as I have suggested above, then payloads from those extensions could often not be held by reference, since those extension may reused the buffer for the next fragment when they get the callback for the prior fragment.
Thus I think it is simplest to start without gathering writes, and just aggregate into a buffer and copy all payloads passed into that buffer as well. We could save a copy if we made the aggregate buffer a direct buffer (the frame header generation would have to be written to be aware and write ints or longs rather than bytes). However once we go to wss or SPDY, then the aggregate buffer will be encrypted in user space, so we want an indirect buffer.... we really need a way to ask our own IO endpoints if it is worthwhile using a direct buffer or not.
Finally (for now), we have to work out should the one FrameFlusher handle both modes, or do we have two different FrameFlushers and switching modes switches the flusher. I don't really know on this one, need to experiment.
thoughts?