Buffer re-use in WebSocket code, fix race conditions

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-04-21 15:15:09 +01:00
parent bf286744dd
commit 2ece00b08f
2 changed files with 52 additions and 30 deletions

View File

@@ -452,7 +452,9 @@ func (c *client) wsHandleControlFrame(r *wsReadInfo, frameType wsOpCode, nc io.R
}
}
}
c.wsEnqueueControlMessage(wsCloseMessage, wsCreateCloseMessage(status, body))
clm := wsCreateCloseMessage(status, body)
c.wsEnqueueControlMessage(wsCloseMessage, clm)
nbPoolPut(clm) // wsEnqueueControlMessage has taken a copy.
// Return io.EOF so that readLoop will close the connection as ClientClosed
// after processing pending buffers.
return pos, io.EOF
@@ -502,7 +504,7 @@ func wsIsControlFrame(frameType wsOpCode) bool {
// Create the frame header.
// Encodes the frame type and optional compression flag, and the size of the payload.
func wsCreateFrameHeader(useMasking, compressed bool, frameType wsOpCode, l int) ([]byte, []byte) {
fh := make([]byte, wsMaxFrameHeaderSize)
fh := nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize]
n, key := wsFillFrameHeader(fh, useMasking, wsFirstFrame, wsFinalFrame, compressed, frameType, l)
return fh[:n], key
}
@@ -596,11 +598,13 @@ func (c *client) wsEnqueueControlMessageLocked(controlMsg wsOpCode, payload []by
if useMasking {
sz += 4
}
cm := make([]byte, sz+len(payload))
cm := nbPoolGet(sz + len(payload))
cm = cm[:cap(cm)]
n, key := wsFillFrameHeader(cm, useMasking, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, controlMsg, len(payload))
cm = cm[:n]
// Note that payload is optional.
if len(payload) > 0 {
copy(cm[n:], payload)
cm = append(cm, payload...)
if useMasking {
wsMaskBuf(key, cm[n:])
}
@@ -646,6 +650,7 @@ func (c *client) wsEnqueueCloseMessage(reason ClosedState) {
}
body := wsCreateCloseMessage(status, reason.String())
c.wsEnqueueControlMessageLocked(wsCloseMessage, body)
nbPoolPut(body) // wsEnqueueControlMessageLocked has taken a copy.
}
// Create and then enqueue a close message with a protocol error and the
@@ -655,6 +660,7 @@ func (c *client) wsEnqueueCloseMessage(reason ClosedState) {
func (c *client) wsHandleProtocolError(message string) error {
buf := wsCreateCloseMessage(wsCloseStatusProtocolError, message)
c.wsEnqueueControlMessage(wsCloseMessage, buf)
nbPoolPut(buf) // wsEnqueueControlMessage has taken a copy.
return fmt.Errorf(message)
}
@@ -671,7 +677,7 @@ func wsCreateCloseMessage(status int, body string) []byte {
body = body[:wsMaxControlPayloadSize-5]
body += "..."
}
buf := make([]byte, 2+len(body))
buf := nbPoolGet(2 + len(body))[:2+len(body)]
// We need to have a 2 byte unsigned int that represents the error status code
// https://tools.ietf.org/html/rfc6455#section-5.5.1
binary.BigEndian.PutUint16(buf[:2], uint16(status))
@@ -1298,6 +1304,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
var csz int
for _, b := range nb {
cp.Write(b)
nbPoolPut(b) // No longer needed as contents written to compressor.
}
if err := cp.Flush(); err != nil {
c.Errorf("Error during compression: %v", err)
@@ -1314,24 +1321,33 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
} else {
final = true
}
fh := make([]byte, wsMaxFrameHeaderSize)
// Only the first frame should be marked as compressed, so pass
// `first` for the compressed boolean.
fh := nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize]
n, key := wsFillFrameHeader(fh, mask, first, final, first, wsBinaryMessage, lp)
if mask {
wsMaskBuf(key, p[:lp])
}
bufs = append(bufs, fh[:n], p[:lp])
new := nbPoolGet(wsFrameSizeForBrowsers)
lp = copy(new[:wsFrameSizeForBrowsers], p[:lp])
bufs = append(bufs, fh[:n], new[:lp])
csz += n + lp
p = p[lp:]
}
} else {
h, key := wsCreateFrameHeader(mask, true, wsBinaryMessage, len(p))
ol := len(p)
h, key := wsCreateFrameHeader(mask, true, wsBinaryMessage, ol)
if mask {
wsMaskBuf(key, p)
}
bufs = append(bufs, h, p)
csz = len(h) + len(p)
bufs = append(bufs, h)
for len(p) > 0 {
new := nbPoolGet(len(p))
n := copy(new[:cap(new)], p)
bufs = append(bufs, new[:n])
p = p[n:]
}
csz = len(h) + ol
}
// Add to pb the compressed data size (including headers), but
// remove the original uncompressed data size that was added
@@ -1343,7 +1359,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mfs > 0 {
// We are limiting the frame size.
startFrame := func() int {
bufs = append(bufs, make([]byte, wsMaxFrameHeaderSize))
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize])
return len(bufs) - 1
}
endFrame := func(idx, size int) {
@@ -1376,8 +1392,10 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if endStart {
fhIdx = startFrame()
}
bufs = append(bufs, b[:total])
b = b[total:]
new := nbPoolGet(total)
n := copy(new[:cap(new)], b[:total])
bufs = append(bufs, new[:n])
b = b[n:]
}
}
if total > 0 {