From 6b65452bc70973d9bcf59857afa94ae46b04bdf4 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 4 Oct 2023 12:15:05 +0100 Subject: [PATCH] Reduce allocations in WebSocket compression Signed-off-by: Neil Twigg --- server/client.go | 31 +++++++++++++++++-------------- server/websocket.go | 33 +++++++++++++++------------------ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/server/client.go b/server/client.go index e3364c8a..1c472528 100644 --- a/server/client.go +++ b/server/client.go @@ -346,20 +346,23 @@ func nbPoolGet(sz int) []byte { } } -func nbPoolPut(b []byte) { - switch cap(b) { - case nbPoolSizeSmall: - b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall]) - nbPoolSmall.Put(b) - case nbPoolSizeMedium: - b := (*[nbPoolSizeMedium]byte)(b[0:nbPoolSizeMedium]) - nbPoolMedium.Put(b) - case nbPoolSizeLarge: - b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge]) - nbPoolLarge.Put(b) - default: - // Ignore frames that are the wrong size, this might happen - // with WebSocket/MQTT messages as they are framed +func nbPoolPut(in []byte) { + ca := cap(in) + for in = in[:ca]; ca >= nbPoolSizeSmall; ca = cap(in) { + switch { + case ca >= nbPoolSizeLarge: + b := (*[nbPoolSizeLarge]byte)(in[0:nbPoolSizeLarge:nbPoolSizeLarge]) + nbPoolLarge.Put(b) + in = in[nbPoolSizeLarge:] + case ca >= nbPoolSizeMedium: + b := (*[nbPoolSizeMedium]byte)(in[0:nbPoolSizeMedium:nbPoolSizeMedium]) + nbPoolMedium.Put(b) + in = in[nbPoolSizeMedium:] + case ca >= nbPoolSizeSmall: + b := (*[nbPoolSizeSmall]byte)(in[0:nbPoolSizeSmall:nbPoolSizeSmall]) + nbPoolSmall.Put(b) + in = in[nbPoolSizeSmall:] + } } } diff --git a/server/websocket.go b/server/websocket.go index ff8dc1ea..cd4b80ae 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -115,6 +115,7 @@ type websocket struct { nocompfrag bool // No fragment for compressed frames maskread bool maskwrite bool + compressor *flate.Writer cookieJwt string clientIP string } @@ -1295,7 +1296,13 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { mfs = 0 } buf := bytes.NewBuffer(nbPoolGet(usz)) - cp, _ := flate.NewWriter(buf, flate.BestSpeed) + cp := c.ws.compressor + if cp == nil { + c.ws.compressor, _ = flate.NewWriter(buf, flate.BestSpeed) + cp = c.ws.compressor + } else { + cp.Reset(buf) + } var csz int for _, b := range nb { cp.Write(b) @@ -1323,9 +1330,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p[:lp]) } - new := nbPoolGet(wsFrameSizeForBrowsers) - lp = copy(new[:wsFrameSizeForBrowsers], p[:lp]) - bufs = append(bufs, fh[:n], new[:lp]) + bufs = append(bufs, fh[:n], p[:lp]) csz += n + lp p = p[lp:] } @@ -1335,16 +1340,9 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p) } - bufs = append(bufs, h) - for len(p) > 0 { - new := nbPoolGet(len(p)) - n := copy(new[:cap(new)], p) - bufs = append(bufs, new[:n]) - p = p[n:] - } + bufs = append(bufs, h, p) csz = len(h) + ol } - nbPoolPut(b) // No longer needed as we copied from above. // Add to pb the compressed data size (including headers), but // remove the original uncompressed data size that was added // during the queueing. @@ -1355,14 +1353,15 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mfs > 0 { // We are limiting the frame size. startFrame := func() int { - bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize]) + bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)) return len(bufs) - 1 } endFrame := func(idx, size int) { + bufs[idx] = bufs[idx][:wsMaxFrameHeaderSize] n, key := wsFillFrameHeader(bufs[idx], mask, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, wsBinaryMessage, size) + bufs[idx] = bufs[idx][:n] c.out.pb += int64(n) c.ws.fs += int64(n + size) - bufs[idx] = bufs[idx][:n] if mask { wsMaskBufs(key, bufs[idx+1:]) } @@ -1388,10 +1387,8 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if endStart { fhIdx = startFrame() } - new := nbPoolGet(total) - n := copy(new[:cap(new)], b[:total]) - bufs = append(bufs, new[:n]) - b = b[n:] + bufs = append(bufs, b[:total]) + b = b[total:] } } if total > 0 {