mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Reduce allocations in WebSockets (#4623)
This commit is contained in:
@@ -115,6 +115,7 @@ type websocket struct {
|
||||
nocompfrag bool // No fragment for compressed frames
|
||||
maskread bool
|
||||
maskwrite bool
|
||||
compressor *flate.Writer
|
||||
cookieJwt string
|
||||
clientIP string
|
||||
}
|
||||
@@ -1295,7 +1296,13 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
|
||||
mfs = 0
|
||||
}
|
||||
buf := bytes.NewBuffer(nbPoolGet(usz))
|
||||
cp, _ := flate.NewWriter(buf, flate.BestSpeed)
|
||||
cp := c.ws.compressor
|
||||
if cp == nil {
|
||||
c.ws.compressor, _ = flate.NewWriter(buf, flate.BestSpeed)
|
||||
cp = c.ws.compressor
|
||||
} else {
|
||||
cp.Reset(buf)
|
||||
}
|
||||
var csz int
|
||||
for _, b := range nb {
|
||||
cp.Write(b)
|
||||
@@ -1323,9 +1330,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
|
||||
if mask {
|
||||
wsMaskBuf(key, p[:lp])
|
||||
}
|
||||
new := nbPoolGet(wsFrameSizeForBrowsers)
|
||||
lp = copy(new[:wsFrameSizeForBrowsers], p[:lp])
|
||||
bufs = append(bufs, fh[:n], new[:lp])
|
||||
bufs = append(bufs, fh[:n], p[:lp])
|
||||
csz += n + lp
|
||||
p = p[lp:]
|
||||
}
|
||||
@@ -1335,16 +1340,16 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
|
||||
if mask {
|
||||
wsMaskBuf(key, p)
|
||||
}
|
||||
bufs = append(bufs, h)
|
||||
for len(p) > 0 {
|
||||
new := nbPoolGet(len(p))
|
||||
n := copy(new[:cap(new)], p)
|
||||
bufs = append(bufs, new[:n])
|
||||
p = p[n:]
|
||||
if ol > 0 {
|
||||
bufs = append(bufs, h, p)
|
||||
}
|
||||
csz = len(h) + ol
|
||||
}
|
||||
nbPoolPut(b) // No longer needed as we copied from above.
|
||||
// Make sure that the compressor no longer holds a reference to
|
||||
// the bytes.Buffer, so that the underlying memory gets cleaned
|
||||
// up after flushOutbound/flushAndClose. For this to be safe, we
|
||||
// always cp.Reset(...) before reusing the compressor again.
|
||||
cp.Reset(nil)
|
||||
// Add to pb the compressed data size (including headers), but
|
||||
// remove the original uncompressed data size that was added
|
||||
// during the queueing.
|
||||
@@ -1355,14 +1360,15 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
|
||||
if mfs > 0 {
|
||||
// We are limiting the frame size.
|
||||
startFrame := func() int {
|
||||
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize])
|
||||
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize))
|
||||
return len(bufs) - 1
|
||||
}
|
||||
endFrame := func(idx, size int) {
|
||||
bufs[idx] = bufs[idx][:wsMaxFrameHeaderSize]
|
||||
n, key := wsFillFrameHeader(bufs[idx], mask, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, wsBinaryMessage, size)
|
||||
bufs[idx] = bufs[idx][:n]
|
||||
c.out.pb += int64(n)
|
||||
c.ws.fs += int64(n + size)
|
||||
bufs[idx] = bufs[idx][:n]
|
||||
if mask {
|
||||
wsMaskBufs(key, bufs[idx+1:])
|
||||
}
|
||||
@@ -1388,10 +1394,8 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
|
||||
if endStart {
|
||||
fhIdx = startFrame()
|
||||
}
|
||||
new := nbPoolGet(total)
|
||||
n := copy(new[:cap(new)], b[:total])
|
||||
bufs = append(bufs, new[:n])
|
||||
b = b[n:]
|
||||
bufs = append(bufs, b[:total])
|
||||
b = b[total:]
|
||||
}
|
||||
}
|
||||
if total > 0 {
|
||||
|
||||
Reference in New Issue
Block a user