[ADDED] Support for route S2 compression

The new field `compression` in the `cluster{}` block allows to
specify which compression mode to use between servers.

It can be simply specified as a boolean or a string for the
simple modes, or as an object for the "s2_auto" mode where
a list of RTT thresholds can be specified.

By default, if no compression field is specified, the server
will use the s2_auto mode with default RTT thresholds of
10ms, 50ms and 100ms for the "uncompressed", "fast", "better"
and "best" modes.

```
cluster {
..
  # Possible values are "disabled", "off", "enabled", "on",
  # "accept", "s2_fast", "s2_better", "s2_best" or "s2_auto"
  compression: s2_fast
}
```

To specify a different list of thresholds for the s2_auto,
here is how it would look like:
```
cluster {
..
  compression: {
    mode: s2_auto
    # This means that for RTT up to 5ms (included), then
    # the compression level will be "uncompressed", then
    # from 5ms+ to 15ms, the mode will switch to "s2_fast",
    # then from 15ms+ to 50ms, the level will switch to
    # "s2_better", and anything above 50ms will result
    # in the "s2_best" compression mode.
    rtt_thresholds: [5ms, 15ms, 50ms]
  }
}
```

Note that the "accept" mode means that a server will accept
compression from a remote and switch to that same compression
mode, but will otherwise not initiate compression. That is,
if 2 servers are configured with "accept", then compression
will actually be "off". If one of the server had say s2_fast
then they would both use this mode.

If a server has compression mode set (other than "off") but
connects to an older server, there will be no compression between
those 2 routes.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2023-04-27 17:59:25 -06:00
parent d573b78aee
commit d6fe9d4c2d
21 changed files with 1604 additions and 150 deletions

View File

@@ -32,6 +32,7 @@ import (
"sync/atomic"
"time"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
)
@@ -136,6 +137,7 @@ const (
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
expectConnect // Marks if this connection is expected to send a CONNECT
connectProcessFinished // Marks if this connection has finished the connect process.
compressionNegotiated // Marks if this connection has negotiated compression level with remote.
)
// set the flag (would be equivalent to set the boolean to true)
@@ -301,6 +303,7 @@ type outbound struct {
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
cw *s2.Writer
}
const nbPoolSizeSmall = 512 // Underlying array size of small buffer
@@ -408,8 +411,12 @@ const (
type readCacheFlag uint16
const (
hasMappings readCacheFlag = 1 << iota // For account subject mappings.
sysGroup = "_sys_"
hasMappings readCacheFlag = 1 << iota // For account subject mappings.
switchToCompression readCacheFlag = 1 << 1
)
const (
sysGroup = "_sys_"
)
// Used in readloop to cache hot subject lookups and group statistics.
@@ -1270,6 +1277,7 @@ func (c *client) readLoop(pre []byte) {
if ws {
masking = c.ws.maskread
}
checkCompress := c.kind == ROUTER
c.mu.Unlock()
defer func() {
@@ -1297,6 +1305,8 @@ func (c *client) readLoop(pre []byte) {
wsr.init()
}
var decompress *s2.Reader
for {
var n int
var err error
@@ -1307,7 +1317,11 @@ func (c *client) readLoop(pre []byte) {
n = len(pre)
pre = nil
} else {
n, err = nc.Read(b)
if decompress != nil {
n, err = decompress.Read(b)
} else {
n, err = nc.Read(b)
}
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
@@ -1374,6 +1388,13 @@ func (c *client) readLoop(pre []byte) {
}
}
// If we are a ROUTER and have processed an INFO, it is possible that
// we are asked to switch to compression now.
if checkCompress && c.in.flags.isSet(switchToCompression) {
c.in.flags.clear(switchToCompression)
decompress = s2.NewReader(nc)
}
// Updates stats for client and server that were collected
// from parsing through the buffer.
if c.in.msgs > 0 {
@@ -1419,7 +1440,12 @@ func (c *client) readLoop(pre []byte) {
// re-snapshot the account since it can change during reload, etc.
acc = c.acc
// Refresh nc because in some cases, we have upgraded c.nc to TLS.
nc = c.nc
if nc != c.nc {
nc = c.nc
if decompress != nil {
decompress.Reset(nc)
}
}
c.mu.Unlock()
// Connection was closed
@@ -1506,17 +1532,9 @@ func (c *client) flushOutbound() bool {
// previous write, and in the case of WebSockets, that data may already
// be framed, so we are careful not to re-frame "wnb" here. Instead we
// will just frame up "nb" and append it onto whatever is left on "wnb".
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)
c.out.nb = c.out.nb[:0]
// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]
// "nb" will be set to nil so that we can manipulate "collapsed" outside
// of the client's lock, which is interesting in case of compression.
c.out.nb = nil
// In case it goes away after releasing the lock.
nc := c.nc
@@ -1524,9 +1542,51 @@ func (c *client) flushOutbound() bool {
// Capture this (we change the value in some tests)
wdl := c.out.wdl
// Check for compression
cw := c.out.cw
if cw != nil {
// We will have to adjust once we have compressed, so remove for now.
c.out.pb -= attempted
}
// Do NOT hold lock during actual IO.
c.mu.Unlock()
// Compress outside of the lock
if cw != nil {
var err error
bb := bytes.Buffer{}
cw.Reset(&bb)
for _, buf := range collapsed {
if _, err = cw.Write(buf); err != nil {
break
}
}
if err == nil {
err = cw.Close()
}
if err != nil {
c.Errorf("Error compressing data: %v", err)
c.markConnAsClosed(WriteError)
return false
}
collapsed = append(net.Buffers(nil), bb.Bytes())
attempted = int64(len(collapsed[0]))
}
// This is safe to do outside of the lock since "collapsed" is no longer
// referenced in c.out.nb (which can be modified in queueOutboud() while
// the lock is released).
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)
// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]
// flush here
start := time.Now()
@@ -1543,6 +1603,14 @@ func (c *client) flushOutbound() bool {
// Re-acquire client lock.
c.mu.Lock()
// Keep track of what was actually sent
c.sentBytes += n
// Adjust if we were compressing.
if cw != nil {
c.out.pb += attempted
}
// At this point, "wnb" has been mutated by WriteTo and any consumed
// buffers have been lopped off the beginning, so in order to return
// them to the pool, we need to look at the difference between "orig"
@@ -2322,6 +2390,17 @@ func (c *client) processPong() {
c.rtt = computeRTT(c.rttStart)
srv := c.srv
reorderGWs := c.kind == GATEWAY && c.gw.outbound
// For routes, check if we have compression auto and if we should change
// the compression level. However, exclude the route if compression is
// "not supported", which indicates that this is a route to an older server.
if c.kind == ROUTER && c.route.compression != CompressionNotSupported {
if opts := srv.getOpts(); opts.Cluster.Compression.Mode == CompressionS2Auto {
if cm := selectS2AutoModeBasedOnRTT(c.rtt, opts.Cluster.Compression.RTTThresholds); cm != c.route.compression {
c.route.compression = cm
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
}
}
}
c.mu.Unlock()
if reorderGWs {
srv.gateway.orderOutboundConnections()
@@ -4629,9 +4708,7 @@ func (c *client) processPingTimer() {
var sendPing bool
pingInterval := c.srv.getOpts().PingInterval
if c.kind == GATEWAY {
pingInterval = adjustPingIntervalForGateway(pingInterval)
}
pingInterval = adjustPingInterval(c.kind, pingInterval)
now := time.Now()
needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL
@@ -4666,11 +4743,18 @@ func (c *client) processPingTimer() {
c.mu.Unlock()
}
// Returns the smallest value between the given `d` and `gatewayMaxPingInterval` durations.
// Invoked for connections known to be of GATEWAY type.
func adjustPingIntervalForGateway(d time.Duration) time.Duration {
if d > gatewayMaxPingInterval {
return gatewayMaxPingInterval
// Returns the smallest value between the given `d` and some max value
// based on the connection kind.
func adjustPingInterval(kind int, d time.Duration) time.Duration {
switch kind {
case ROUTER:
if d > routeMaxPingInterval {
return routeMaxPingInterval
}
case GATEWAY:
if d > gatewayMaxPingInterval {
return gatewayMaxPingInterval
}
}
return d
}
@@ -4681,9 +4765,7 @@ func (c *client) setPingTimer() {
return
}
d := c.srv.getOpts().PingInterval
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
d = adjustPingInterval(c.kind, d)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}
@@ -5549,9 +5631,7 @@ func (c *client) setFirstPingTimer() {
if d > firstPingInterval {
d = firstPingInterval
}
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
d = adjustPingInterval(c.kind, d)
} else if d > firstClientPingInterval {
d = firstClientPingInterval
}
@@ -5559,5 +5639,12 @@ func (c *client) setFirstPingTimer() {
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))
d += time.Duration(addDelay)
// In the case of ROUTER and when compression is configured, it is possible
// that this timer was already set, but just to detect a stale connection
// since we have to delay the first PING after compression negotiation
// occurred.
if c.ping.tmr != nil {
c.ping.tmr.Stop()
}
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}