diff --git a/server/websocket.go b/server/websocket.go index ff8dc1ea..014a1d72 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -115,6 +115,7 @@ type websocket struct { nocompfrag bool // No fragment for compressed frames maskread bool maskwrite bool + compressor *flate.Writer cookieJwt string clientIP string } @@ -1295,7 +1296,13 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { mfs = 0 } buf := bytes.NewBuffer(nbPoolGet(usz)) - cp, _ := flate.NewWriter(buf, flate.BestSpeed) + cp := c.ws.compressor + if cp == nil { + c.ws.compressor, _ = flate.NewWriter(buf, flate.BestSpeed) + cp = c.ws.compressor + } else { + cp.Reset(buf) + } var csz int for _, b := range nb { cp.Write(b) @@ -1323,9 +1330,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p[:lp]) } - new := nbPoolGet(wsFrameSizeForBrowsers) - lp = copy(new[:wsFrameSizeForBrowsers], p[:lp]) - bufs = append(bufs, fh[:n], new[:lp]) + bufs = append(bufs, fh[:n], p[:lp]) csz += n + lp p = p[lp:] } @@ -1335,16 +1340,16 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p) } - bufs = append(bufs, h) - for len(p) > 0 { - new := nbPoolGet(len(p)) - n := copy(new[:cap(new)], p) - bufs = append(bufs, new[:n]) - p = p[n:] + if ol > 0 { + bufs = append(bufs, h, p) } csz = len(h) + ol } - nbPoolPut(b) // No longer needed as we copied from above. + // Make sure that the compressor no longer holds a reference to + // the bytes.Buffer, so that the underlying memory gets cleaned + // up after flushOutbound/flushAndClose. For this to be safe, we + // always cp.Reset(...) before reusing the compressor again. + cp.Reset(nil) // Add to pb the compressed data size (including headers), but // remove the original uncompressed data size that was added // during the queueing. @@ -1355,14 +1360,15 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mfs > 0 { // We are limiting the frame size. startFrame := func() int { - bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize]) + bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)) return len(bufs) - 1 } endFrame := func(idx, size int) { + bufs[idx] = bufs[idx][:wsMaxFrameHeaderSize] n, key := wsFillFrameHeader(bufs[idx], mask, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, wsBinaryMessage, size) + bufs[idx] = bufs[idx][:n] c.out.pb += int64(n) c.ws.fs += int64(n + size) - bufs[idx] = bufs[idx][:n] if mask { wsMaskBufs(key, bufs[idx+1:]) } @@ -1388,10 +1394,8 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if endStart { fhIdx = startFrame() } - new := nbPoolGet(total) - n := copy(new[:cap(new)], b[:total]) - bufs = append(bufs, new[:n]) - b = b[n:] + bufs = append(bufs, b[:total]) + b = b[total:] } } if total > 0 {