From 6b65452bc70973d9bcf59857afa94ae46b04bdf4 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 4 Oct 2023 12:15:05 +0100 Subject: [PATCH 1/3] Reduce allocations in WebSocket compression Signed-off-by: Neil Twigg --- server/client.go | 31 +++++++++++++++++-------------- server/websocket.go | 33 +++++++++++++++------------------ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/server/client.go b/server/client.go index e3364c8a..1c472528 100644 --- a/server/client.go +++ b/server/client.go @@ -346,20 +346,23 @@ func nbPoolGet(sz int) []byte { } } -func nbPoolPut(b []byte) { - switch cap(b) { - case nbPoolSizeSmall: - b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall]) - nbPoolSmall.Put(b) - case nbPoolSizeMedium: - b := (*[nbPoolSizeMedium]byte)(b[0:nbPoolSizeMedium]) - nbPoolMedium.Put(b) - case nbPoolSizeLarge: - b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge]) - nbPoolLarge.Put(b) - default: - // Ignore frames that are the wrong size, this might happen - // with WebSocket/MQTT messages as they are framed +func nbPoolPut(in []byte) { + ca := cap(in) + for in = in[:ca]; ca >= nbPoolSizeSmall; ca = cap(in) { + switch { + case ca >= nbPoolSizeLarge: + b := (*[nbPoolSizeLarge]byte)(in[0:nbPoolSizeLarge:nbPoolSizeLarge]) + nbPoolLarge.Put(b) + in = in[nbPoolSizeLarge:] + case ca >= nbPoolSizeMedium: + b := (*[nbPoolSizeMedium]byte)(in[0:nbPoolSizeMedium:nbPoolSizeMedium]) + nbPoolMedium.Put(b) + in = in[nbPoolSizeMedium:] + case ca >= nbPoolSizeSmall: + b := (*[nbPoolSizeSmall]byte)(in[0:nbPoolSizeSmall:nbPoolSizeSmall]) + nbPoolSmall.Put(b) + in = in[nbPoolSizeSmall:] + } } } diff --git a/server/websocket.go b/server/websocket.go index ff8dc1ea..cd4b80ae 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -115,6 +115,7 @@ type websocket struct { nocompfrag bool // No fragment for compressed frames maskread bool maskwrite bool + compressor *flate.Writer cookieJwt string clientIP string } @@ -1295,7 +1296,13 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { mfs = 0 } buf := bytes.NewBuffer(nbPoolGet(usz)) - cp, _ := flate.NewWriter(buf, flate.BestSpeed) + cp := c.ws.compressor + if cp == nil { + c.ws.compressor, _ = flate.NewWriter(buf, flate.BestSpeed) + cp = c.ws.compressor + } else { + cp.Reset(buf) + } var csz int for _, b := range nb { cp.Write(b) @@ -1323,9 +1330,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p[:lp]) } - new := nbPoolGet(wsFrameSizeForBrowsers) - lp = copy(new[:wsFrameSizeForBrowsers], p[:lp]) - bufs = append(bufs, fh[:n], new[:lp]) + bufs = append(bufs, fh[:n], p[:lp]) csz += n + lp p = p[lp:] } @@ -1335,16 +1340,9 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p) } - bufs = append(bufs, h) - for len(p) > 0 { - new := nbPoolGet(len(p)) - n := copy(new[:cap(new)], p) - bufs = append(bufs, new[:n]) - p = p[n:] - } + bufs = append(bufs, h, p) csz = len(h) + ol } - nbPoolPut(b) // No longer needed as we copied from above. // Add to pb the compressed data size (including headers), but // remove the original uncompressed data size that was added // during the queueing. @@ -1355,14 +1353,15 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mfs > 0 { // We are limiting the frame size. startFrame := func() int { - bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize]) + bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)) return len(bufs) - 1 } endFrame := func(idx, size int) { + bufs[idx] = bufs[idx][:wsMaxFrameHeaderSize] n, key := wsFillFrameHeader(bufs[idx], mask, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, wsBinaryMessage, size) + bufs[idx] = bufs[idx][:n] c.out.pb += int64(n) c.ws.fs += int64(n + size) - bufs[idx] = bufs[idx][:n] if mask { wsMaskBufs(key, bufs[idx+1:]) } @@ -1388,10 +1387,8 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if endStart { fhIdx = startFrame() } - new := nbPoolGet(total) - n := copy(new[:cap(new)], b[:total]) - bufs = append(bufs, new[:n]) - b = b[n:] + bufs = append(bufs, b[:total]) + b = b[total:] } } if total > 0 { From e20ca9043f84796dd038c7e0c0ef4a7b3d19a65a Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 4 Oct 2023 17:18:47 +0100 Subject: [PATCH 2/3] Don't append empty slices in the unfragmented path Signed-off-by: Neil Twigg --- server/websocket.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/websocket.go b/server/websocket.go index cd4b80ae..8163568d 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1340,7 +1340,9 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p) } - bufs = append(bufs, h, p) + if ol > 0 { + bufs = append(bufs, h, p) + } csz = len(h) + ol } // Add to pb the compressed data size (including headers), but From 7124dc7bdce475db1c56fcab2542c0423ca1a087 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 4 Oct 2023 17:41:36 +0100 Subject: [PATCH 3/3] Revert changes to `nbPoolPut`, force compressor to forget byte buffer Signed-off-by: Neil Twigg --- server/client.go | 31 ++++++++++++++----------------- server/websocket.go | 5 +++++ 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/server/client.go b/server/client.go index 1c472528..e3364c8a 100644 --- a/server/client.go +++ b/server/client.go @@ -346,23 +346,20 @@ func nbPoolGet(sz int) []byte { } } -func nbPoolPut(in []byte) { - ca := cap(in) - for in = in[:ca]; ca >= nbPoolSizeSmall; ca = cap(in) { - switch { - case ca >= nbPoolSizeLarge: - b := (*[nbPoolSizeLarge]byte)(in[0:nbPoolSizeLarge:nbPoolSizeLarge]) - nbPoolLarge.Put(b) - in = in[nbPoolSizeLarge:] - case ca >= nbPoolSizeMedium: - b := (*[nbPoolSizeMedium]byte)(in[0:nbPoolSizeMedium:nbPoolSizeMedium]) - nbPoolMedium.Put(b) - in = in[nbPoolSizeMedium:] - case ca >= nbPoolSizeSmall: - b := (*[nbPoolSizeSmall]byte)(in[0:nbPoolSizeSmall:nbPoolSizeSmall]) - nbPoolSmall.Put(b) - in = in[nbPoolSizeSmall:] - } +func nbPoolPut(b []byte) { + switch cap(b) { + case nbPoolSizeSmall: + b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall]) + nbPoolSmall.Put(b) + case nbPoolSizeMedium: + b := (*[nbPoolSizeMedium]byte)(b[0:nbPoolSizeMedium]) + nbPoolMedium.Put(b) + case nbPoolSizeLarge: + b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge]) + nbPoolLarge.Put(b) + default: + // Ignore frames that are the wrong size, this might happen + // with WebSocket/MQTT messages as they are framed } } diff --git a/server/websocket.go b/server/websocket.go index 8163568d..014a1d72 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1345,6 +1345,11 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { } csz = len(h) + ol } + // Make sure that the compressor no longer holds a reference to + // the bytes.Buffer, so that the underlying memory gets cleaned + // up after flushOutbound/flushAndClose. For this to be safe, we + // always cp.Reset(...) before reusing the compressor again. + cp.Reset(nil) // Add to pb the compressed data size (including headers), but // remove the original uncompressed data size that was added // during the queueing.