From d6fe9d4c2d749f7032eef8bac0cf0bc4a959bc2e Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 27 Apr 2023 17:59:25 -0600 Subject: [PATCH] [ADDED] Support for route S2 compression The new field `compression` in the `cluster{}` block allows to specify which compression mode to use between servers. It can be simply specified as a boolean or a string for the simple modes, or as an object for the "s2_auto" mode where a list of RTT thresholds can be specified. By default, if no compression field is specified, the server will use the s2_auto mode with default RTT thresholds of 10ms, 50ms and 100ms for the "uncompressed", "fast", "better" and "best" modes. ``` cluster { .. # Possible values are "disabled", "off", "enabled", "on", # "accept", "s2_fast", "s2_better", "s2_best" or "s2_auto" compression: s2_fast } ``` To specify a different list of thresholds for the s2_auto, here is how it would look like: ``` cluster { .. compression: { mode: s2_auto # This means that for RTT up to 5ms (included), then # the compression level will be "uncompressed", then # from 5ms+ to 15ms, the mode will switch to "s2_fast", # then from 15ms+ to 50ms, the level will switch to # "s2_better", and anything above 50ms will result # in the "s2_best" compression mode. rtt_thresholds: [5ms, 15ms, 50ms] } } ``` Note that the "accept" mode means that a server will accept compression from a remote and switch to that same compression mode, but will otherwise not initiate compression. That is, if 2 servers are configured with "accept", then compression will actually be "off". If one of the server had say s2_fast then they would both use this mode. If a server has compression mode set (other than "off") but connects to an older server, there will be no compression between those 2 routes. Signed-off-by: Ivan Kozlovic --- server/client.go | 145 ++++++-- server/client_test.go | 65 ---- server/config_check_test.go | 56 +++ server/const.go | 2 +- server/events.go | 12 +- server/jetstream_helpers_test.go | 20 +- server/monitor.go | 54 +-- server/opts.go | 71 ++++ server/reload.go | 35 +- server/reload_test.go | 195 ++++++++++- server/route.go | 206 ++++++++++- server/routes_test.go | 562 +++++++++++++++++++++++++++++++ server/server.go | 221 ++++++++++++ server/sublist_test.go | 1 + test/bench_test.go | 100 ++++++ test/cluster_test.go | 2 +- test/cluster_tls_test.go | 2 + test/norace_test.go | 1 + test/ocsp_test.go | 1 + test/operator_test.go | 1 + test/test.go | 2 + 21 files changed, 1604 insertions(+), 150 deletions(-) diff --git a/server/client.go b/server/client.go index 2f8891e8..2cf30ce6 100644 --- a/server/client.go +++ b/server/client.go @@ -32,6 +32,7 @@ import ( "sync/atomic" "time" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" ) @@ -136,6 +137,7 @@ const ( skipFlushOnClose // Marks that flushOutbound() should not be called on connection close. expectConnect // Marks if this connection is expected to send a CONNECT connectProcessFinished // Marks if this connection has finished the connect process. + compressionNegotiated // Marks if this connection has negotiated compression level with remote. ) // set the flag (would be equivalent to set the boolean to true) @@ -301,6 +303,7 @@ type outbound struct { mp int64 // Snapshot of max pending for client. lft time.Duration // Last flush time for Write. stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in. + cw *s2.Writer } const nbPoolSizeSmall = 512 // Underlying array size of small buffer @@ -408,8 +411,12 @@ const ( type readCacheFlag uint16 const ( - hasMappings readCacheFlag = 1 << iota // For account subject mappings. - sysGroup = "_sys_" + hasMappings readCacheFlag = 1 << iota // For account subject mappings. + switchToCompression readCacheFlag = 1 << 1 +) + +const ( + sysGroup = "_sys_" ) // Used in readloop to cache hot subject lookups and group statistics. @@ -1270,6 +1277,7 @@ func (c *client) readLoop(pre []byte) { if ws { masking = c.ws.maskread } + checkCompress := c.kind == ROUTER c.mu.Unlock() defer func() { @@ -1297,6 +1305,8 @@ func (c *client) readLoop(pre []byte) { wsr.init() } + var decompress *s2.Reader + for { var n int var err error @@ -1307,7 +1317,11 @@ func (c *client) readLoop(pre []byte) { n = len(pre) pre = nil } else { - n, err = nc.Read(b) + if decompress != nil { + n, err = decompress.Read(b) + } else { + n, err = nc.Read(b) + } // If we have any data we will try to parse and exit at the end. if n == 0 && err != nil { c.closeConnection(closedStateForErr(err)) @@ -1374,6 +1388,13 @@ func (c *client) readLoop(pre []byte) { } } + // If we are a ROUTER and have processed an INFO, it is possible that + // we are asked to switch to compression now. + if checkCompress && c.in.flags.isSet(switchToCompression) { + c.in.flags.clear(switchToCompression) + decompress = s2.NewReader(nc) + } + // Updates stats for client and server that were collected // from parsing through the buffer. if c.in.msgs > 0 { @@ -1419,7 +1440,12 @@ func (c *client) readLoop(pre []byte) { // re-snapshot the account since it can change during reload, etc. acc = c.acc // Refresh nc because in some cases, we have upgraded c.nc to TLS. - nc = c.nc + if nc != c.nc { + nc = c.nc + if decompress != nil { + decompress.Reset(nc) + } + } c.mu.Unlock() // Connection was closed @@ -1506,17 +1532,9 @@ func (c *client) flushOutbound() bool { // previous write, and in the case of WebSockets, that data may already // be framed, so we are careful not to re-frame "wnb" here. Instead we // will just frame up "nb" and append it onto whatever is left on "wnb". - // "nb" will be reset back to its starting position so it can be modified - // safely by queueOutbound calls. - c.out.wnb = append(c.out.wnb, collapsed...) - var _orig [1024][]byte - orig := append(_orig[:0], c.out.wnb...) - c.out.nb = c.out.nb[:0] - - // Since WriteTo is lopping things off the beginning, we need to remember - // the start position of the underlying array so that we can get back to it. - // Otherwise we'll always "slide forward" and that will result in reallocs. - startOfWnb := c.out.wnb[0:] + // "nb" will be set to nil so that we can manipulate "collapsed" outside + // of the client's lock, which is interesting in case of compression. + c.out.nb = nil // In case it goes away after releasing the lock. nc := c.nc @@ -1524,9 +1542,51 @@ func (c *client) flushOutbound() bool { // Capture this (we change the value in some tests) wdl := c.out.wdl + // Check for compression + cw := c.out.cw + if cw != nil { + // We will have to adjust once we have compressed, so remove for now. + c.out.pb -= attempted + } + // Do NOT hold lock during actual IO. c.mu.Unlock() + // Compress outside of the lock + if cw != nil { + var err error + bb := bytes.Buffer{} + + cw.Reset(&bb) + for _, buf := range collapsed { + if _, err = cw.Write(buf); err != nil { + break + } + } + if err == nil { + err = cw.Close() + } + if err != nil { + c.Errorf("Error compressing data: %v", err) + c.markConnAsClosed(WriteError) + return false + } + collapsed = append(net.Buffers(nil), bb.Bytes()) + attempted = int64(len(collapsed[0])) + } + + // This is safe to do outside of the lock since "collapsed" is no longer + // referenced in c.out.nb (which can be modified in queueOutboud() while + // the lock is released). + c.out.wnb = append(c.out.wnb, collapsed...) + var _orig [1024][]byte + orig := append(_orig[:0], c.out.wnb...) + + // Since WriteTo is lopping things off the beginning, we need to remember + // the start position of the underlying array so that we can get back to it. + // Otherwise we'll always "slide forward" and that will result in reallocs. + startOfWnb := c.out.wnb[0:] + // flush here start := time.Now() @@ -1543,6 +1603,14 @@ func (c *client) flushOutbound() bool { // Re-acquire client lock. c.mu.Lock() + // Keep track of what was actually sent + c.sentBytes += n + + // Adjust if we were compressing. + if cw != nil { + c.out.pb += attempted + } + // At this point, "wnb" has been mutated by WriteTo and any consumed // buffers have been lopped off the beginning, so in order to return // them to the pool, we need to look at the difference between "orig" @@ -2322,6 +2390,17 @@ func (c *client) processPong() { c.rtt = computeRTT(c.rttStart) srv := c.srv reorderGWs := c.kind == GATEWAY && c.gw.outbound + // For routes, check if we have compression auto and if we should change + // the compression level. However, exclude the route if compression is + // "not supported", which indicates that this is a route to an older server. + if c.kind == ROUTER && c.route.compression != CompressionNotSupported { + if opts := srv.getOpts(); opts.Cluster.Compression.Mode == CompressionS2Auto { + if cm := selectS2AutoModeBasedOnRTT(c.rtt, opts.Cluster.Compression.RTTThresholds); cm != c.route.compression { + c.route.compression = cm + c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...) + } + } + } c.mu.Unlock() if reorderGWs { srv.gateway.orderOutboundConnections() @@ -4629,9 +4708,7 @@ func (c *client) processPingTimer() { var sendPing bool pingInterval := c.srv.getOpts().PingInterval - if c.kind == GATEWAY { - pingInterval = adjustPingIntervalForGateway(pingInterval) - } + pingInterval = adjustPingInterval(c.kind, pingInterval) now := time.Now() needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL @@ -4666,11 +4743,18 @@ func (c *client) processPingTimer() { c.mu.Unlock() } -// Returns the smallest value between the given `d` and `gatewayMaxPingInterval` durations. -// Invoked for connections known to be of GATEWAY type. -func adjustPingIntervalForGateway(d time.Duration) time.Duration { - if d > gatewayMaxPingInterval { - return gatewayMaxPingInterval +// Returns the smallest value between the given `d` and some max value +// based on the connection kind. +func adjustPingInterval(kind int, d time.Duration) time.Duration { + switch kind { + case ROUTER: + if d > routeMaxPingInterval { + return routeMaxPingInterval + } + case GATEWAY: + if d > gatewayMaxPingInterval { + return gatewayMaxPingInterval + } } return d } @@ -4681,9 +4765,7 @@ func (c *client) setPingTimer() { return } d := c.srv.getOpts().PingInterval - if c.kind == GATEWAY { - d = adjustPingIntervalForGateway(d) - } + d = adjustPingInterval(c.kind, d) c.ping.tmr = time.AfterFunc(d, c.processPingTimer) } @@ -5549,9 +5631,7 @@ func (c *client) setFirstPingTimer() { if d > firstPingInterval { d = firstPingInterval } - if c.kind == GATEWAY { - d = adjustPingIntervalForGateway(d) - } + d = adjustPingInterval(c.kind, d) } else if d > firstClientPingInterval { d = firstClientPingInterval } @@ -5559,5 +5639,12 @@ func (c *client) setFirstPingTimer() { // We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s. addDelay := rand.Int63n(int64(d / 5)) d += time.Duration(addDelay) + // In the case of ROUTER and when compression is configured, it is possible + // that this timer was already set, but just to detect a stale connection + // since we have to delay the first PING after compression negotiation + // occurred. + if c.ping.tmr != nil { + c.ping.tmr.Stop() + } c.ping.tmr = time.AfterFunc(d, c.processPingTimer) } diff --git a/server/client_test.go b/server/client_test.go index c12e4c51..3f488346 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1543,71 +1543,6 @@ func TestClientOutboundQueueCoalesce(t *testing.T) { } } -// This test ensures that outbound queues don't cause a run on -// memory when sending something to lots of clients. -func TestClientOutboundQueueMemory(t *testing.T) { - opts := DefaultOptions() - s := RunServer(opts) - defer s.Shutdown() - - var before runtime.MemStats - var after runtime.MemStats - - var err error - clients := make([]*nats.Conn, 50000) - wait := &sync.WaitGroup{} - wait.Add(len(clients)) - - for i := 0; i < len(clients); i++ { - clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) - if err != nil { - t.Fatalf("Error on connect: %v", err) - } - defer clients[i].Close() - - clients[i].Subscribe("test", func(m *nats.Msg) { - wait.Done() - }) - } - - runtime.GC() - runtime.ReadMemStats(&before) - - nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) - if err != nil { - t.Fatalf("Error on connect: %v", err) - } - defer nc.Close() - - var m [48000]byte - if err = nc.Publish("test", m[:]); err != nil { - t.Fatal(err) - } - - wait.Wait() - - runtime.GC() - runtime.ReadMemStats(&after) - - hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc) - ms := float64(len(m)) - diff := float64(ha) - float64(hb) - inc := (diff / float64(hb)) * 100 - - fmt.Printf("Message size: %.1fKB\n", ms/1024) - fmt.Printf("Subscribed clients: %d\n", len(clients)) - fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024) - fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024) - fmt.Printf("Heap allocs delta: %.1f%%\n", inc) - - // TODO: What threshold makes sense here for a failure? - /* - if inc > 10 { - t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc) - } - */ -} - func TestClientTraceRace(t *testing.T) { opts := DefaultOptions() s := RunServer(opts) diff --git a/server/config_check_test.go b/server/config_check_test.go index f1d9ec39..f891ee0d 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1603,6 +1603,62 @@ func TestConfigCheck(t *testing.T) { errorLine: 4, errorPos: 6, }, + { + name: "wrong type for cluter compression", + config: ` + cluster { + port: -1 + compression: 123 + } + `, + err: fmt.Errorf("field %q should be a boolean or a structure, got int64", "compression"), + errorLine: 4, + errorPos: 6, + }, + { + name: "wrong type for cluter compression mode", + config: ` + cluster { + port: -1 + compression: { + mode: 123 + } + } + `, + err: fmt.Errorf("interface conversion: interface {} is int64, not string"), + errorLine: 5, + errorPos: 7, + }, + { + name: "wrong type for cluter compression rtt thresholds", + config: ` + cluster { + port: -1 + compression: { + mode: "s2_auto" + rtt_thresholds: 123 + } + } + `, + err: fmt.Errorf("interface conversion: interface {} is int64, not []interface {}"), + errorLine: 6, + errorPos: 7, + }, + { + name: "invalid durations for cluter compression rtt thresholds", + config: ` + cluster { + port: -1 + compression: { + mode: "s2_auto" + rtt_thresholds: [abc] + } + } + `, + err: fmt.Errorf("time: invalid duration %q", "abc"), + errorLine: 6, + errorPos: 7, + }, } checkConfig := func(config string) error { diff --git a/server/const.go b/server/const.go index c8bc0273..4ea01acc 100644 --- a/server/const.go +++ b/server/const.go @@ -85,7 +85,7 @@ const ( // AUTH_TIMEOUT is the authorization wait time. AUTH_TIMEOUT = 2 * time.Second - // DEFAULT_PING_INTERVAL is how often pings are sent to clients and routes. + // DEFAULT_PING_INTERVAL is how often pings are sent to clients, etc... DEFAULT_PING_INTERVAL = 2 * time.Minute // DEFAULT_PING_MAX_OUT is maximum allowed pings outstanding before disconnect. diff --git a/server/events.go b/server/events.go index 3e416a9a..371efee1 100644 --- a/server/events.go +++ b/server/events.go @@ -686,11 +686,13 @@ func routeStat(r *client) *RouteStat { return nil } r.mu.Lock() + // Note: *client.out[Msgs|Bytes] are not set using atomics, + // unlike in[Msgs|Bytes]. rs := &RouteStat{ ID: r.cid, Sent: DataStats{ - Msgs: atomic.LoadInt64(&r.outMsgs), - Bytes: atomic.LoadInt64(&r.outBytes), + Msgs: r.outMsgs, + Bytes: r.outBytes, }, Received: DataStats{ Msgs: atomic.LoadInt64(&r.inMsgs), @@ -732,9 +734,11 @@ func (s *Server) sendStatsz(subj string) { gs := &GatewayStat{Name: name} c.mu.Lock() gs.ID = c.cid + // Note that *client.out[Msgs|Bytes] are not set using atomic, + // unlike the in[Msgs|bytes]. gs.Sent = DataStats{ - Msgs: atomic.LoadInt64(&c.outMsgs), - Bytes: atomic.LoadInt64(&c.outBytes), + Msgs: c.outMsgs, + Bytes: c.outBytes, } c.mu.Unlock() // Gather matching inbound connections diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 2dd0612e..1b4096aa 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -25,6 +25,7 @@ import ( "net/url" "os" "strings" + "sync" "testing" "time" @@ -1581,6 +1582,7 @@ func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error { // Net Proxy - For introducing RTT and BW constraints. type netProxy struct { + sync.RWMutex listener net.Listener conns []net.Conn rtt time.Duration @@ -1633,8 +1635,8 @@ func (np *netProxy) start() { continue } np.conns = append(np.conns, client, server) - go np.loop(np.rtt, np.up, client, server) - go np.loop(np.rtt, np.down, server, client) + go np.loop(np.up, client, server) + go np.loop(np.down, server, client) } }() } @@ -1647,8 +1649,7 @@ func (np *netProxy) routeURL() string { return strings.Replace(np.url, "nats", "nats-route", 1) } -func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { - delay := rtt / 2 +func (np *netProxy) loop(tbw int, r, w net.Conn) { const rbl = 8192 var buf [rbl]byte ctx := context.Background() @@ -1658,9 +1659,13 @@ func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { for { n, err := r.Read(buf[:]) if err != nil { + w.Close() return } // RTT delays + np.RLock() + delay := np.rtt / 2 + np.RUnlock() if delay > 0 { time.Sleep(delay) } @@ -1668,11 +1673,18 @@ func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { return } if _, err = w.Write(buf[:n]); err != nil { + r.Close() return } } } +func (np *netProxy) updateRTT(rtt time.Duration) { + np.Lock() + np.rtt = rtt + np.Unlock() +} + func (np *netProxy) stop() { if np.listener != nil { np.listener.Close() diff --git a/server/monitor.go b/server/monitor.go index 6573fd28..36c71b0f 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -123,6 +123,7 @@ type ConnInfo struct { OutMsgs int64 `json:"out_msgs"` InBytes int64 `json:"in_bytes"` OutBytes int64 `json:"out_bytes"` + SentBytes int64 `json:"sent_bytes"` NumSubs uint32 `json:"subscriptions"` Name string `json:"name,omitempty"` Lang string `json:"lang,omitempty"` @@ -542,6 +543,7 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool) ci.RTT = client.getRTT().String() ci.OutMsgs = client.outMsgs ci.OutBytes = client.outBytes + ci.SentBytes = client.sentBytes ci.NumSubs = uint32(len(client.subs)) ci.Pending = int(client.out.pb) ci.Name = client.opts.Name @@ -777,10 +779,12 @@ type RouteInfo struct { OutMsgs int64 `json:"out_msgs"` InBytes int64 `json:"in_bytes"` OutBytes int64 `json:"out_bytes"` + SentBytes int64 `json:"sent_bytes"` NumSubs uint32 `json:"subscriptions"` Subs []string `json:"subscriptions_list,omitempty"` SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"` Account string `json:"account,omitempty"` + Compression string `json:"compression,omitempty"` } // Routez returns a Routez struct containing information about routes. @@ -817,6 +821,7 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { OutMsgs: r.outMsgs, InBytes: atomic.LoadInt64(&r.inBytes), OutBytes: r.outBytes, + SentBytes: r.sentBytes, NumSubs: uint32(len(r.subs)), Import: r.opts.Import, Export: r.opts.Export, @@ -826,6 +831,7 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { Uptime: myUptime(rs.Now.Sub(r.start)), Idle: myUptime(rs.Now.Sub(r.last)), Account: string(r.route.accName), + Compression: r.route.compression, } if len(r.subs) > 0 { @@ -2095,18 +2101,19 @@ type LeafzOptions struct { // LeafInfo has detailed information on each remote leafnode connection. type LeafInfo struct { - Name string `json:"name"` - IsSpoke bool `json:"is_spoke"` - Account string `json:"account"` - IP string `json:"ip"` - Port int `json:"port"` - RTT string `json:"rtt,omitempty"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - NumSubs uint32 `json:"subscriptions"` - Subs []string `json:"subscriptions_list,omitempty"` + Name string `json:"name"` + IsSpoke bool `json:"is_spoke"` + Account string `json:"account"` + IP string `json:"ip"` + Port int `json:"port"` + RTT string `json:"rtt,omitempty"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + SentBytes int64 `json:"sent_bytes"` + NumSubs uint32 `json:"subscriptions"` + Subs []string `json:"subscriptions_list,omitempty"` } // Leafz returns a Leafz structure containing information about leafnodes. @@ -2136,17 +2143,18 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) { for _, ln := range lconns { ln.mu.Lock() lni := &LeafInfo{ - Name: ln.leaf.remoteServer, - IsSpoke: ln.isSpokeLeafNode(), - Account: ln.acc.Name, - IP: ln.host, - Port: int(ln.port), - RTT: ln.getRTT().String(), - InMsgs: atomic.LoadInt64(&ln.inMsgs), - OutMsgs: ln.outMsgs, - InBytes: atomic.LoadInt64(&ln.inBytes), - OutBytes: ln.outBytes, - NumSubs: uint32(len(ln.subs)), + Name: ln.leaf.remoteServer, + IsSpoke: ln.isSpokeLeafNode(), + Account: ln.acc.Name, + IP: ln.host, + Port: int(ln.port), + RTT: ln.getRTT().String(), + InMsgs: atomic.LoadInt64(&ln.inMsgs), + OutMsgs: ln.outMsgs, + InBytes: atomic.LoadInt64(&ln.inBytes), + OutBytes: ln.outBytes, + SentBytes: ln.sentBytes, + NumSubs: uint32(len(ln.subs)), } if opts != nil && opts.Subscriptions { lni.Subs = make([]string, 0, len(ln.subs)) diff --git a/server/opts.go b/server/opts.go index eb26359a..bb4c1026 100644 --- a/server/opts.go +++ b/server/opts.go @@ -78,6 +78,7 @@ type ClusterOpts struct { ConnectRetries int `json:"-"` PoolSize int `json:"-"` PinnedAccounts []string `json:"-"` + Compression CompressionOpts `json:"-"` // Not exported (used in tests) resolver netResolver @@ -85,6 +86,21 @@ type ClusterOpts struct { tlsConfigOpts *TLSConfigOpts } +// CompressionOpts defines the compression mode and optional configuration. +type CompressionOpts struct { + Mode string + // If `Mode` is set to CompressionS2Auto, RTTThresholds provides the + // thresholds at which the compression level will go from + // CompressionS2Uncompressed to CompressionS2Fast, CompressionS2Better + // or CompressionS2Best. If a given level is not desired, specify 0 + // for this slot. For instance, the slice []{0, 10ms, 20ms} means that + // for any RTT up to 10ms included the compression level will be + // CompressionS2Fast, then from ]10ms..20ms], the level will be selected + // as CompressionS2Better. Anything above 20ms will result in picking + // the CompressionS2Best compression level. + RTTThresholds []time.Duration +} + // GatewayOpts are options for gateways. // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. @@ -1622,6 +1638,11 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err opts.Cluster.PoolSize = int(mv.(int64)) case "accounts": opts.Cluster.PinnedAccounts, _ = parseStringArray("accounts", tk, <, mv, errors, warnings) + case "compression": + if err := parseCompression(&opts.Cluster.Compression, tk, mk, mv); err != nil { + *errors = append(*errors, err) + continue + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -1638,6 +1659,47 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err return nil } +func parseCompression(c *CompressionOpts, tk token, mk string, mv interface{}) (retErr error) { + var lt token + defer convertPanicToError(<, &retErr) + + switch mv := mv.(type) { + case string: + // Do not validate here, it will be done in NewServer. + c.Mode = mv + case bool: + if mv { + c.Mode = CompressionS2Auto + } else { + c.Mode = CompressionOff + } + case map[string]interface{}: + for mk, mv := range mv { + tk, mv = unwrapValue(mv, <) + switch strings.ToLower(mk) { + case "mode": + c.Mode = mv.(string) + case "rtt_thresholds", "thresholds", "rtts", "rtt": + for _, iv := range mv.([]interface{}) { + _, mv := unwrapValue(iv, <) + dur, err := time.ParseDuration(mv.(string)) + if err != nil { + return &configErr{tk, err.Error()} + } + c.RTTThresholds = append(c.RTTThresholds, dur) + } + default: + if !tk.IsUsedVariable() { + return &configErr{tk, fmt.Sprintf("unknown field %q", mk)} + } + } + } + default: + return &configErr{tk, fmt.Sprintf("field %q should be a boolean or a structure, got %T", mk, mv)} + } + return nil +} + func parseURLs(a []interface{}, typ string, warnings *[]error) (urls []*url.URL, errors []error) { urls = make([]*url.URL, 0, len(a)) var lt token @@ -4724,6 +4786,15 @@ func setBaselineOptions(opts *Options) { } } } + // Default to compression "auto", unless we are running tests that specify + // what the default compression mode should be. + if c := &opts.Cluster.Compression; c.Mode == _EMPTY_ { + if testDefaultClusterCompression != _EMPTY_ { + c.Mode = testDefaultClusterCompression + } else { + c.Mode = CompressionS2Auto + } + } } if opts.LeafNode.Port != 0 { if opts.LeafNode.Host == _EMPTY_ { diff --git a/server/reload.go b/server/reload.go index d2430532..16878c9d 100644 --- a/server/reload.go +++ b/server/reload.go @@ -24,6 +24,7 @@ import ( "sync/atomic" "time" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nuid" ) @@ -361,6 +362,7 @@ type clusterOption struct { accsAdded []string accsRemoved []string poolSizeChanged bool + compChanged bool } // Apply the cluster change. @@ -379,10 +381,38 @@ func (c *clusterOption) Apply(s *Server) { s.routeInfo.WSConnectURLs = s.websocket.connectURLs } s.setRouteInfoHostPortAndIP() + var routes []*client + if c.compChanged { + newMode := s.getOpts().Cluster.Compression.Mode + s.forEachRoute(func(r *client) { + r.mu.Lock() + // Skip routes that are "not supported" (because they will never do + // compression) or the routes that have already the new compression + // mode. + if r.route.compression == CompressionNotSupported || r.route.compression == newMode { + r.mu.Unlock() + return + } + // We need to close the route if it had compression "off" or the new + // mode is compression "off", or if the new mode is "accept", because + // these require negotiation. + if r.route.compression == CompressionOff || newMode == CompressionOff || newMode == CompressionAccept { + routes = append(routes, r) + } else { + // Simply change the compression writer + r.out.cw = s2.NewWriter(nil, s2WriterOptions(newMode)...) + r.route.compression = newMode + } + r.mu.Unlock() + }) + } s.mu.Unlock() if c.newValue.Name != "" && c.newValue.Name != s.ClusterName() { s.setClusterName(c.newValue.Name) } + for _, r := range routes { + r.closeConnection(ClientClosed) + } s.Noticef("Reloaded: cluster") if tlsRequired && c.newValue.TLSConfig.InsecureSkipVerify { s.Warnf(clusterTLSInsecureWarning) @@ -1111,6 +1141,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { co := &clusterOption{ newValue: newClusterOpts, permsChanged: !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions), + compChanged: !reflect.DeepEqual(oldClusterOpts.Compression, newClusterOpts.Compression), } co.diffPoolAndAccounts(&oldClusterOpts) // If there are added accounts, first make sure that we can look them up. @@ -2191,8 +2222,8 @@ func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options) s.mu.Unlock() } -// validateClusterOpts ensures the new ClusterOpts does not change host or -// port, which do not support reload. +// validateClusterOpts ensures the new ClusterOpts does not change some of the +// fields that do not support reload. func validateClusterOpts(old, new ClusterOpts) error { if old.Host != new.Host { return fmt.Errorf("config reload not supported for cluster host: old=%s, new=%s", diff --git a/server/reload_test.go b/server/reload_test.go index d3a0c386..55c925d8 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -146,9 +146,10 @@ func TestConfigReloadUnsupported(t *testing.T) { MaxPingsOut: 2, WriteDeadline: 10 * time.Second, Cluster: ClusterOpts{ - Name: "abc", - Host: "127.0.0.1", - Port: -1, + Name: "abc", + Host: "127.0.0.1", + Port: -1, + Compression: CompressionOpts{Mode: CompressionS2Auto, RTTThresholds: defaultCompressionS2AutoRTTThresholds}, }, NoSigs: true, } @@ -218,9 +219,10 @@ func TestConfigReloadInvalidConfig(t *testing.T) { MaxPingsOut: 2, WriteDeadline: 10 * time.Second, Cluster: ClusterOpts{ - Name: "abc", - Host: "127.0.0.1", - Port: -1, + Name: "abc", + Host: "127.0.0.1", + Port: -1, + Compression: CompressionOpts{Mode: CompressionS2Auto, RTTThresholds: defaultCompressionS2AutoRTTThresholds}, }, NoSigs: true, } @@ -281,9 +283,10 @@ func TestConfigReload(t *testing.T) { MaxPingsOut: 2, WriteDeadline: 10 * time.Second, Cluster: ClusterOpts{ - Name: "abc", - Host: "127.0.0.1", - Port: server.ClusterAddr().Port, + Name: "abc", + Host: "127.0.0.1", + Port: server.ClusterAddr().Port, + Compression: CompressionOpts{Mode: CompressionS2Auto, RTTThresholds: defaultCompressionS2AutoRTTThresholds}, }, NoSigs: true, } @@ -5361,3 +5364,177 @@ func TestConfigReloadGlobalAccountWithMappingAndJetStream(t *testing.T) { _, err = js.StreamInfo("TEST") require_NoError(t, err) } + +func TestConfigReloadRouteCompression(t *testing.T) { + org := testDefaultClusterCompression + testDefaultClusterCompression = _EMPTY_ + defer func() { testDefaultClusterCompression = org }() + + tmpl := ` + port: -1 + server_name: "%s" + cluster { + port: -1 + name: "local" + %s + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: accept"))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + routes := fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port) + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", routes, "compression: accept"))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + // Run a 3rd server but make it as if it was an old server. We want to + // make sure that reload of s1 and s2 will not affect routes from s3 to + // s1/s2 because these do not support compression. + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "C", routes, "compression: \"not supported\""))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + // Collect routes' cid from servers so we can check if routes are + // recreated when they should and are not when they should not. + collect := func(s *Server) map[uint64]struct{} { + m := make(map[uint64]struct{}) + s.mu.RLock() + defer s.mu.RUnlock() + s.forEachRoute(func(r *client) { + r.mu.Lock() + m[r.cid] = struct{}{} + r.mu.Unlock() + }) + return m + } + s1RouteIDs := collect(s1) + s2RouteIDs := collect(s2) + s3RouteIDs := collect(s3) + s3ID := s3.ID() + + servers := []*Server{s1, s2} + checkCompMode := func(s1Expected, s2Expected string, shouldBeNew bool) { + t.Helper() + // We wait a bit to make sure that we have routes closed before + // checking that the cluster has (re)formed. + time.Sleep(100 * time.Millisecond) + // First, make sure that the cluster is formed + checkClusterFormed(t, s1, s2, s3) + // Then check that all routes are with the expected mode. We need to + // possibly wait a bit since there is negotiation going on. + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + for _, s := range servers { + var err error + s.mu.RLock() + s.forEachRoute(func(r *client) { + if err != nil { + return + } + r.mu.Lock() + var exp string + var m map[uint64]struct{} + if r.route.remoteID == s3ID { + exp = CompressionNotSupported + m = s3RouteIDs + } else if s == s1 { + exp = s1Expected + m = s1RouteIDs + } else { + exp = s2Expected + m = s2RouteIDs + } + _, present := m[r.cid] + cm := r.route.compression + r.mu.Unlock() + if cm != exp { + err = fmt.Errorf("Expected route %v for server %s to have compression mode %q, got %q", r, s, exp, cm) + } + sbn := shouldBeNew + if exp == CompressionNotSupported { + // Override for routes to s3 + sbn = false + } + if sbn && present { + err = fmt.Errorf("Expected route %v for server %s to be a new route, but it was already present", r, s) + } else if !sbn && !present { + err = fmt.Errorf("Expected route %v for server %s to not be new", r, s) + } + }) + s.mu.RUnlock() + if err != nil { + return err + } + } + s1RouteIDs = collect(s1) + s2RouteIDs = collect(s2) + s3RouteIDs = collect(s3) + return nil + }) + } + // Since both started with "accept", which means that a server can + // accept/switch to compression but not initiate compression, they + // should both be "off" + checkCompMode(CompressionOff, CompressionOff, false) + + // Now reload s1 with "on" ("auto"), since s2 is *configured* with "accept", + // it won't use "auto" but instead fall back to "fast". S1 should be set + // to "uncompressed" because the default RTT threshold is 10ms and we should + // below that... + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: on")) + checkCompMode(CompressionS2Uncompressed, CompressionS2Fast, true) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", routes, "compression: on")) + checkCompMode(CompressionS2Uncompressed, CompressionS2Uncompressed, false) + + // Move on with "better" + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: s2_better")) + // s1 should be at "better", but s2 still at "uncompressed" + checkCompMode(CompressionS2Better, CompressionS2Uncompressed, false) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", routes, "compression: s2_better")) + checkCompMode(CompressionS2Better, CompressionS2Better, false) + + // Move to "best" + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: s2_best")) + checkCompMode(CompressionS2Best, CompressionS2Better, false) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", routes, "compression: s2_best")) + checkCompMode(CompressionS2Best, CompressionS2Best, false) + + // Now turn off + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: off")) + checkCompMode(CompressionOff, CompressionOff, true) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", routes, "compression: off")) + checkCompMode(CompressionOff, CompressionOff, false) + + // When "off" (and not "accept"), enabling 1 is not enough, the reload + // has to be done on both to take effect. + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: s2_better")) + checkCompMode(CompressionOff, CompressionOff, true) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", routes, "compression: s2_better")) + checkCompMode(CompressionS2Better, CompressionS2Better, true) + + // Try now to have different ones + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: s2_best")) + // S1 should be "best" but S2 should have stayed at "better" + checkCompMode(CompressionS2Best, CompressionS2Better, false) + + // Change compression setting back to "accept", which in that case we want + // to have a negotiation and use the remote's compression level. So + // connections should be re-created. + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: accept")) + checkCompMode(CompressionS2Better, CompressionS2Better, true) + + // To avoid flapping, add a little sleep here to make sure we have things + // settled before reloading s2. + time.Sleep(100 * time.Millisecond) + // And if we do the same with s2, then we will end-up with no compression. + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", routes, "compression: accept")) + checkCompMode(CompressionOff, CompressionOff, true) +} diff --git a/server/route.go b/server/route.go index 96fd5038..9d7281e9 100644 --- a/server/route.go +++ b/server/route.go @@ -27,6 +27,8 @@ import ( "strings" "sync/atomic" "time" + + "github.com/klauspost/compress/s2" ) // RouteType designates the router type @@ -93,6 +95,9 @@ type route struct { // This is set to true if this is a route connection to an old // server or a server that has pooling completely disabled. noPool bool + // Selected compression mode, which may be different from the + // server configured mode. + compression string } type connectInfo struct { @@ -124,6 +129,15 @@ const ( // Can be changed for tests var routeConnectDelay = DEFAULT_ROUTE_CONNECT +// The default ping interval is set to 2 minutes, which is fine for client +// connections, etc.. but since for route compression, the CompressionS2Auto +// mode uses RTT measurements (ping/pong) to decide which compression level +// to use, we want the interval to not be that high. +const defaultRouteMaxPingInterval = 30 * time.Second + +// Can be changed for tests +var routeMaxPingInterval = defaultRouteMaxPingInterval + // removeReplySub is called when we trip the max on remoteReply subs. func (c *client) removeReplySub(sub *subscription) { if sub == nil { @@ -544,6 +558,8 @@ func (c *client) processRouteInfo(info *Info) { opts := s.getOpts() + didSolicit := c.route.didSolicit + // If this is an async INFO from an existing route... if c.flags.isSet(infoReceived) { remoteID := c.route.remoteID @@ -675,7 +691,7 @@ func (c *client) processRouteInfo(info *Info) { } // Check if remote has same server name than this server. - if !c.route.didSolicit && info.Name == srvName { + if !didSolicit && info.Name == srvName { c.mu.Unlock() // This is now an error and we close the connection. We need unique names for JetStream clustering. c.Errorf("Remote server has a duplicate name: %q", info.Name) @@ -683,6 +699,57 @@ func (c *client) processRouteInfo(info *Info) { return } + // First INFO, check if this server is configured for compression because + // if that is the case, we need to negotiate it with the remote server. + if needsCompression(opts.Cluster.Compression.Mode) { + accName := string(c.route.accName) + // If we did not yet negotiate... + if !c.flags.isSet(compressionNegotiated) { + // Prevent from getting back here. + c.flags.set(compressionNegotiated) + // Release client lock since following function will need server lock. + c.mu.Unlock() + compress, err := s.negotiateRouteCompression(c, didSolicit, accName, info, opts) + if err != nil { + c.sendErrAndErr(err.Error()) + c.closeConnection(ProtocolViolation) + return + } + if compress { + // Done for now, will get back another INFO protocol... + return + } + // No compression because one side does not want/can't, so proceed. + c.mu.Lock() + } else if didSolicit { + // The other side has switched to compression, so we can now set + // the first ping timer and send the delayed INFO for situations + // where it was not already sent. + c.setFirstPingTimer() + if !routeShouldDelayInfo(accName, opts) { + cm := routeCompressionModeForInfoProtocol(opts, c.route.compression) + // Need to release and then reacquire... + c.mu.Unlock() + s.sendDelayedRouteInfo(c, accName, cm) + c.mu.Lock() + } + } + // Check that the connection did not close if the lock was released. + if c.isClosed() { + c.mu.Unlock() + return + } + } else { + // Coming from an old server, the Compression field would be the empty + // string. For servers that are configured with CompressionNotSupported, + // this makes them behave as old servers. + if info.Compression == _EMPTY_ || opts.Cluster.Compression.Mode == CompressionNotSupported { + c.route.compression = CompressionNotSupported + } else { + c.route.compression = CompressionOff + } + } + // Mark that the INFO protocol has been received, so we can detect updates. c.flags.set(infoReceived) @@ -748,7 +815,6 @@ func (c *client) processRouteInfo(info *Info) { // If this is a solicit route, we already have c.route.accName set in createRoute. // For non solicited route (the accept side), we will set the account name that // is present in the INFO protocol. - didSolicit := c.route.didSolicit if !didSolicit { c.route.accName = []byte(info.RouteAccount) } @@ -770,6 +836,89 @@ func (c *client) processRouteInfo(info *Info) { } } +func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName string, info *Info, opts *Options) (bool, error) { + // Negotiate the appropriate compression mode (or no compression) + cm, err := selectCompressionMode(opts.Cluster.Compression.Mode, info.Compression) + if err != nil { + return false, err + } + c.mu.Lock() + // For "auto" mode, set the initial compression mode based on RTT + if cm == CompressionS2Auto { + if c.rttStart.IsZero() { + c.rtt = computeRTT(c.start) + } + cm = selectS2AutoModeBasedOnRTT(c.rtt, opts.Cluster.Compression.RTTThresholds) + } + // Keep track of the negotiated compression mode. + c.route.compression = cm + c.mu.Unlock() + + // If we end-up doing compression... + if needsCompression(cm) { + // Generate an INFO with the chosen compression mode. + s.mu.Lock() + infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0) + s.mu.Unlock() + + // If we solicited, then send this INFO protocol BEFORE switching + // to compression writer. However, if we did not, we send it after. + c.mu.Lock() + if didSolicit { + c.enqueueProto(infoProto) + // Make sure it is completely flushed (the pending bytes goes to + // 0) before proceeding. + for c.out.pb > 0 && !c.isClosed() { + c.flushOutbound() + } + } + // This is to notify the readLoop that it should switch to a + // (de)compression reader. + c.in.flags.set(switchToCompression) + // Create the compress writer before queueing the INFO protocol for + // a route that did not solicit. It will make sure that that proto + // is sent with compression on. + c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...) + if !didSolicit { + c.enqueueProto(infoProto) + } + // We can now set the ping timer. + c.setFirstPingTimer() + c.mu.Unlock() + return true, nil + } + // We are not using compression, set the ping timer. + c.mu.Lock() + c.setFirstPingTimer() + c.mu.Unlock() + // If this is a solicited route, we need to send the INFO if it was not + // done during createRoute() and will not be done in addRoute(). + if didSolicit && !routeShouldDelayInfo(accName, opts) { + cm = routeCompressionModeForInfoProtocol(opts, cm) + s.sendDelayedRouteInfo(c, accName, cm) + } + return false, nil +} + +// If the configured compression mode is "auto" then will return that, +// otherwise will return the given `cm` compression mode. +func routeCompressionModeForInfoProtocol(opts *Options, cm string) string { + if opts.Cluster.Compression.Mode == CompressionS2Auto { + return CompressionS2Auto + } + return cm +} + +func (s *Server) sendDelayedRouteInfo(c *client, accName, cm string) { + s.mu.Lock() + infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0) + s.mu.Unlock() + + c.mu.Lock() + c.enqueueProto(infoProto) + c.mu.Unlock() +} + // Possibly sends local subscriptions interest to this route // based on changes in the remote's Export permissions. func (s *Server) updateRemoteRoutePerms(c *client, info *Info) { @@ -1515,15 +1664,20 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie c := &client{srv: s, nc: conn, opts: ClientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r, start: time.Now()} + // Is the server configured for compression? + compressionConfigured := needsCompression(opts.Cluster.Compression.Mode) + var infoJSON []byte - // Grab server variables + // Grab server variables and generates route INFO Json. Note that we set + // and reset some of s.routeInfo fields when that happens, so we need + // the server write lock. s.mu.Lock() // If we are creating a pooled connection and this is the server soliciting // the connection, we will delay sending the INFO after we have processed - // the incoming INFO from the remote. - delayInfo := didSolicit && accName == _EMPTY_ && opts.Cluster.PoolSize >= 1 + // the incoming INFO from the remote. Also delay if configured for compression. + delayInfo := didSolicit && (compressionConfigured || routeShouldDelayInfo(accName, opts)) if !delayInfo { - infoJSON = s.generateRouteInitialInfoJSON(accName, 0) + infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0) } authRequired := s.routeInfo.AuthRequired tlsRequired := s.routeInfo.TLSRequired @@ -1575,8 +1729,23 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie c.setRoutePermissions(opts.Cluster.Permissions) } - // Set the Ping timer - c.setFirstPingTimer() + // We can't safely send the pings until we have negotiated compression + // with the remote, but we want to protect against a connection that + // does not perform the handshake. We will start a timer that will close + // the connection as stale based on the ping interval and max out values, + // but without actually sending pings. + if compressionConfigured { + c.ping.tmr = time.AfterFunc(opts.PingInterval*time.Duration(opts.MaxPingsOut+1), func() { + c.mu.Lock() + c.Debugf("Stale Client Connection - Closing") + c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection"))) + c.mu.Unlock() + c.closeConnection(StaleConnection) + }) + } else { + // Set the Ping timer + c.setFirstPingTimer() + } // For routes, the "client" is added to s.routes only when processing // the INFO protocol, that is much later. @@ -1631,21 +1800,29 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie return c } +func routeShouldDelayInfo(accName string, opts *Options) bool { + return accName == _EMPTY_ && opts.Cluster.PoolSize >= 1 +} + // Generates a nonce and set some route info's fields before marshal'ing into JSON. // To be used only when a route is created (to send the initial INFO protocol). // // Server lock held on entry. -func (s *Server) generateRouteInitialInfoJSON(accName string, poolIdx int) []byte { +func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int) []byte { // New proto wants a nonce (although not used in routes, that is, not signed in CONNECT) var raw [nonceLen]byte nonce := raw[:] s.generateNonce(nonce) ri := &s.routeInfo - ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx = string(nonce), accName, poolIdx + // Override compression with s2_auto instead of actual compression level. + if s.getOpts().Cluster.Compression.Mode == CompressionS2Auto { + compression = CompressionS2Auto + } + ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression = string(nonce), accName, poolIdx, compression infoJSON := generateInfoJSON(&s.routeInfo) // Clear now that it has been serialized. Will prevent nonce to be included in async INFO that we may send. // Same for some other fields. - ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx = _EMPTY_, _EMPTY_, 0 + ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression = _EMPTY_, _EMPTY_, 0, _EMPTY_ return infoJSON } @@ -1806,7 +1983,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string url := c.route.url // For solicited routes, we need now to send the INFO protocol if didSolicit { - c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, idx)) + c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, c.route.compression, idx)) } c.mu.Unlock() @@ -2179,6 +2356,11 @@ func (s *Server) startRouteAcceptLoop() { Dynamic: s.isClusterNameDynamic(), LNOC: true, } + // For tests that want to simulate old servers, do not set the compression + // on the INFO protocol if configured with CompressionNotSupported. + if opts.Cluster.Compression.Mode != CompressionNotSupported { + info.Compression = opts.Cluster.Compression.Mode + } if ps := opts.Cluster.PoolSize; ps > 0 { info.RoutePoolSize = ps } diff --git a/server/routes_test.go b/server/routes_test.go index 9050702a..78774074 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -18,6 +18,7 @@ import ( "context" "crypto/tls" "fmt" + "math/rand" "net" "net/http" "net/http/httptest" @@ -3136,3 +3137,564 @@ func TestRoutePoolWithOlderServerConnectAndReconnect(t *testing.T) { // And again, make sure there is no repeat-connect checkRepeatConnect() } + +func TestRouteCompressionOptions(t *testing.T) { + org := testDefaultClusterCompression + testDefaultClusterCompression = _EMPTY_ + defer func() { testDefaultClusterCompression = org }() + + tmpl := ` + port: -1 + cluster { + port: -1 + compression: %s + } + ` + for _, test := range []struct { + name string + mode string + rttVals []int + expected string + rtts []time.Duration + }{ + {"boolean enabled", "true", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string enabled", "enabled", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string EnaBled", "EnaBled", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string on", "on", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string ON", "ON", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string fast", "fast", nil, CompressionS2Fast, nil}, + {"string Fast", "Fast", nil, CompressionS2Fast, nil}, + {"string s2_fast", "s2_fast", nil, CompressionS2Fast, nil}, + {"string s2_Fast", "s2_Fast", nil, CompressionS2Fast, nil}, + {"boolean disabled", "false", nil, CompressionOff, nil}, + {"string disabled", "disabled", nil, CompressionOff, nil}, + {"string DisableD", "DisableD", nil, CompressionOff, nil}, + {"string off", "off", nil, CompressionOff, nil}, + {"string OFF", "OFF", nil, CompressionOff, nil}, + {"better", "better", nil, CompressionS2Better, nil}, + {"Better", "Better", nil, CompressionS2Better, nil}, + {"s2_better", "s2_better", nil, CompressionS2Better, nil}, + {"S2_BETTER", "S2_BETTER", nil, CompressionS2Better, nil}, + {"best", "best", nil, CompressionS2Best, nil}, + {"BEST", "BEST", nil, CompressionS2Best, nil}, + {"s2_best", "s2_best", nil, CompressionS2Best, nil}, + {"S2_BEST", "S2_BEST", nil, CompressionS2Best, nil}, + {"auto no rtts", "auto", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"s2_auto no rtts", "s2_auto", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"auto", "{mode: auto, rtt_thresholds: [%s]}", []int{1}, CompressionS2Auto, []time.Duration{time.Millisecond}}, + {"Auto", "{Mode: Auto, thresholds: [%s]}", []int{1, 2}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond}}, + {"s2_auto", "{mode: s2_auto, thresholds: [%s]}", []int{1, 2, 3}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond, 3 * time.Millisecond}}, + {"s2_AUTO", "{mode: s2_AUTO, thresholds: [%s]}", []int{1, 2, 3, 4}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond, 3 * time.Millisecond, 4 * time.Millisecond}}, + {"s2_auto:-10,5,10", "{mode: s2_auto, thresholds: [%s]}", []int{-10, 5, 10}, CompressionS2Auto, []time.Duration{0, 5 * time.Millisecond, 10 * time.Millisecond}}, + {"s2_auto:5,10,15", "{mode: s2_auto, thresholds: [%s]}", []int{5, 10, 15}, CompressionS2Auto, []time.Duration{5 * time.Millisecond, 10 * time.Millisecond, 15 * time.Millisecond}}, + {"s2_auto:0,5,10", "{mode: s2_auto, thresholds: [%s]}", []int{0, 5, 10}, CompressionS2Auto, []time.Duration{0, 5 * time.Millisecond, 10 * time.Millisecond}}, + {"s2_auto:5,10,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{5, 10, 0, 20}, CompressionS2Auto, []time.Duration{5 * time.Millisecond, 10 * time.Millisecond, 0, 20 * time.Millisecond}}, + {"s2_auto:0,10,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{0, 10, 0, 20}, CompressionS2Auto, []time.Duration{0, 10 * time.Millisecond, 0, 20 * time.Millisecond}}, + {"s2_auto:0,0,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{0, 0, 0, 20}, CompressionS2Auto, []time.Duration{0, 0, 0, 20 * time.Millisecond}}, + {"s2_auto:0,10,0,0", "{mode: s2_auto, rtt_thresholds: [%s]}", []int{0, 10, 0, 0}, CompressionS2Auto, []time.Duration{0, 10 * time.Millisecond}}, + } { + t.Run(test.name, func(t *testing.T) { + var val string + if len(test.rttVals) > 0 { + var rtts string + for i, v := range test.rttVals { + if i > 0 { + rtts += ", " + } + rtts += fmt.Sprintf("%dms", v) + } + val = fmt.Sprintf(test.mode, rtts) + } else { + val = test.mode + } + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, val))) + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + + if o.Cluster.Compression.Mode != test.expected { + t.Fatalf("Expected compression value to be %q, got %q", test.expected, o.Cluster.Compression) + } + if !reflect.DeepEqual(test.rtts, o.Cluster.Compression.RTTThresholds) { + t.Fatalf("Expected RTT tresholds to be %+v, got %+v", test.rtts, o.Cluster.Compression.RTTThresholds) + } + s.Shutdown() + + o.Cluster.Port = -1 + o.Cluster.Compression.Mode = test.mode + if len(test.rttVals) > 0 { + o.Cluster.Compression.Mode = CompressionS2Auto + o.Cluster.Compression.RTTThresholds = o.Cluster.Compression.RTTThresholds[:0] + for _, v := range test.rttVals { + o.Cluster.Compression.RTTThresholds = append(o.Cluster.Compression.RTTThresholds, time.Duration(v)*time.Millisecond) + } + } + s = RunServer(o) + defer s.Shutdown() + if o.Cluster.Compression.Mode != test.expected { + t.Fatalf("Expected compression value to be %q, got %q", test.expected, o.Cluster.Compression) + } + if !reflect.DeepEqual(test.rtts, o.Cluster.Compression.RTTThresholds) { + t.Fatalf("Expected RTT tresholds to be %+v, got %+v", test.rtts, o.Cluster.Compression.RTTThresholds) + } + }) + } + // Test that with no compression specified, we default to "auto" + conf := createConfFile(t, []byte(` + port: -1 + cluster { + port: -1 + } + `)) + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + if o.Cluster.Compression.Mode != CompressionS2Auto { + t.Fatalf("Expected compression value to be %q, got %q", CompressionS2Auto, o.Cluster.Compression.Mode) + } + for _, test := range []struct { + name string + mode string + rtts []time.Duration + err string + }{ + {"unsupported mode", "gzip", nil, "Unsupported"}, + {"not ascending order", "s2_auto", []time.Duration{ + 5 * time.Millisecond, + 10 * time.Millisecond, + 2 * time.Millisecond, + }, "ascending"}, + {"too many thresholds", "s2_auto", []time.Duration{ + 5 * time.Millisecond, + 10 * time.Millisecond, + 20 * time.Millisecond, + 40 * time.Millisecond, + 60 * time.Millisecond, + }, "more than 4"}, + {"all 0", "s2_auto", []time.Duration{0, 0, 0, 0}, "at least one"}, + {"single 0", "s2_auto", []time.Duration{0}, "at least one"}, + } { + t.Run(test.name, func(t *testing.T) { + o := DefaultOptions() + o.Cluster.Port = -1 + o.Cluster.Compression = CompressionOpts{test.mode, test.rtts} + if _, err := NewServer(o); err == nil || !strings.Contains(err.Error(), test.err) { + t.Fatalf("Unexpected error: %v", err) + } + }) + } +} + +func TestRouteCompression(t *testing.T) { + tmpl := ` + port: -1 + server_name: "%s" + accounts { + A { users: [{user: "a", pass: "pwd"}] } + } + cluster { + name: "local" + port: -1 + compression: s2_fast + pool_size: %d + %s + %s + } + ` + for _, test := range []struct { + name string + poolSize int + accounts string + }{ + {"no pooling", -1, _EMPTY_}, + {"pooling", 3, _EMPTY_}, + {"per account", 1, "accounts: [\"A\"]"}, + } { + t.Run(test.name, func(t *testing.T) { + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "S1", test.poolSize, test.accounts, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "S2", test.poolSize, test.accounts, + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port)))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + nc2 := natsConnect(t, s2.ClientURL(), nats.UserInfo("a", "pwd")) + defer nc2.Close() + sub := natsSubSync(t, nc2, "foo") + natsFlush(t, nc2) + checkSubInterest(t, s1, "A", "foo", time.Second) + + nc1 := natsConnect(t, s1.ClientURL(), nats.UserInfo("a", "pwd")) + defer nc1.Close() + + var payloads [][]byte + count := 26 + for i := 0; i < count; i++ { + n := rand.Intn(2048) + 1 + p := make([]byte, n) + for j := 0; j < n; j++ { + p[j] = byte(i) + 'A' + } + payloads = append(payloads, p) + natsPub(t, nc1, "foo", p) + } + + totalPayloadSize := 0 + for i := 0; i < count; i++ { + m := natsNexMsg(t, sub, time.Second) + if !bytes.Equal(m.Data, payloads[i]) { + t.Fatalf("Expected payload %q - got %q", payloads[i], m.Data) + } + totalPayloadSize += len(m.Data) + } + + // Also check that the route stats shows that compression likely occurred + var out int64 + s1.mu.RLock() + if len(test.accounts) > 0 { + rems := s1.accRoutes["A"] + if rems == nil { + t.Fatal("Did not find route for account") + } + for _, r := range rems { + r.mu.Lock() + out = r.sentBytes + r.mu.Unlock() + break + } + } else { + ai, _ := s1.accounts.Load("A") + acc := ai.(*Account) + acc.mu.RLock() + pi := acc.routePoolIdx + acc.mu.RUnlock() + s1.forEachRouteIdx(pi, func(r *client) bool { + r.mu.Lock() + out = r.sentBytes + r.mu.Unlock() + return false + }) + } + s1.mu.RUnlock() + // Should at least be smaller than totalPayloadSize, use 20%. + limit := totalPayloadSize * 80 / 100 + if int(out) > limit { + t.Fatalf("Expected s1's outBytes to be less than %v, got %v", limit, out) + } + }) + } +} + +func TestRouteCompressionMatrixModes(t *testing.T) { + tmpl := ` + port: -1 + server_name: "%s" + cluster { + name: "local" + port: -1 + compression: %s + pool_size: -1 + %s + } + ` + for _, test := range []struct { + name string + s1 string + s2 string + s1Expected string + s2Expected string + }{ + {"off off", "off", "off", CompressionOff, CompressionOff}, + {"off accept", "off", "accept", CompressionOff, CompressionOff}, + {"off on", "off", "on", CompressionOff, CompressionOff}, + {"off better", "off", "better", CompressionOff, CompressionOff}, + {"off best", "off", "best", CompressionOff, CompressionOff}, + + {"accept off", "accept", "off", CompressionOff, CompressionOff}, + {"accept accept", "accept", "accept", CompressionOff, CompressionOff}, + {"accept on", "accept", "on", CompressionS2Fast, CompressionS2Uncompressed}, + {"accept better", "accept", "better", CompressionS2Better, CompressionS2Better}, + {"accept best", "accept", "best", CompressionS2Best, CompressionS2Best}, + + {"on off", "on", "off", CompressionOff, CompressionOff}, + {"on accept", "on", "accept", CompressionS2Uncompressed, CompressionS2Fast}, + {"on on", "on", "on", CompressionS2Uncompressed, CompressionS2Uncompressed}, + {"on better", "on", "better", CompressionS2Uncompressed, CompressionS2Better}, + {"on best", "on", "best", CompressionS2Uncompressed, CompressionS2Best}, + + {"better off", "better", "off", CompressionOff, CompressionOff}, + {"better accept", "better", "accept", CompressionS2Better, CompressionS2Better}, + {"better on", "better", "on", CompressionS2Better, CompressionS2Uncompressed}, + {"better better", "better", "better", CompressionS2Better, CompressionS2Better}, + {"better best", "better", "best", CompressionS2Better, CompressionS2Best}, + + {"best off", "best", "off", CompressionOff, CompressionOff}, + {"best accept", "best", "accept", CompressionS2Best, CompressionS2Best}, + {"best on", "best", "on", CompressionS2Best, CompressionS2Uncompressed}, + {"best better", "best", "better", CompressionS2Best, CompressionS2Better}, + {"best best", "best", "best", CompressionS2Best, CompressionS2Best}, + } { + t.Run(test.name, func(t *testing.T) { + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "A", test.s1, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", test.s2, fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port)))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + nc1 := natsConnect(t, s1.ClientURL()) + defer nc1.Close() + + nc2 := natsConnect(t, s2.ClientURL()) + defer nc2.Close() + + payload := make([]byte, 128) + check := func(ncp, ncs *nats.Conn, subj string, s *Server) { + t.Helper() + sub := natsSubSync(t, ncs, subj) + checkSubInterest(t, s, globalAccountName, subj, time.Second) + natsPub(t, ncp, subj, payload) + natsNexMsg(t, sub, time.Second) + + for _, srv := range []*Server{s1, s2} { + rz, err := srv.Routez(nil) + require_NoError(t, err) + var expected string + if srv == s1 { + expected = test.s1Expected + } else { + expected = test.s2Expected + } + if cm := rz.Routes[0].Compression; cm != expected { + t.Fatalf("Server %s - expected compression %q, got %q", srv, expected, cm) + } + } + } + check(nc1, nc2, "foo", s1) + check(nc2, nc1, "bar", s2) + }) + } +} + +func TestRouteCompressionWithOlderServer(t *testing.T) { + tmpl := ` + port: -1 + server_name: "%s" + cluster { + port: -1 + name: "local" + %s + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: \"on\""))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + routes := fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port) + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", routes, "compression: \"not supported\""))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + // Make sure that s1 route's compression is "off" + s1.mu.RLock() + s1.forEachRoute(func(r *client) { + r.mu.Lock() + cm := r.route.compression + r.mu.Unlock() + if cm != CompressionNotSupported { + s1.mu.RUnlock() + t.Fatalf("Compression should be %q, got %q", CompressionNotSupported, cm) + } + }) + s1.mu.RUnlock() +} + +func TestRouteCompressionImplicitRoute(t *testing.T) { + tmpl := ` + port: -1 + server_name: "%s" + cluster { + port: -1 + name: "local" + %s + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "A", _EMPTY_, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + routes := fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port) + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", routes, "compression: \"fast\""))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "C", routes, "compression: \"best\""))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + checkComp := func(s *Server, remoteID, expected string) { + t.Helper() + s.mu.RLock() + defer s.mu.RUnlock() + var err error + s.forEachRoute(func(r *client) { + if err != nil { + return + } + var cm string + ok := true + r.mu.Lock() + if r.route.remoteID == remoteID { + cm = r.route.compression + ok = cm == expected + } + r.mu.Unlock() + if !ok { + err = fmt.Errorf("Server %q - expected route to %q to use compression %q, got %q", + s, remoteID, expected, cm) + } + }) + } + checkComp(s1, s2.ID(), CompressionS2Fast) + checkComp(s1, s3.ID(), CompressionS2Best) + checkComp(s2, s1.ID(), CompressionS2Fast) + checkComp(s2, s3.ID(), CompressionS2Best) + checkComp(s3, s1.ID(), CompressionS2Best) + checkComp(s3, s2.ID(), CompressionS2Best) +} + +func TestRouteCompressionAuto(t *testing.T) { + tmpl := ` + port: -1 + server_name: "%s" + ping_interval: "%s" + cluster { + port: -1 + name: "local" + compression: %s + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "A", "10s", "s2_fast", _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + // Start with 0ms RTT + np := createNetProxy(0, 1024*1024*1024, 1024*1024*1024, fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port), true) + routes := fmt.Sprintf("routes: [\"%s\"]", np.routeURL()) + + rtts := "{mode: s2_auto, rtt_thresholds: [10ms, 20ms, 30ms]}" + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", "100ms", rtts, routes))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + defer np.stop() + + checkClusterFormed(t, s1, s2) + + checkComp := func(expected string) { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + s2.mu.RLock() + defer s2.mu.RUnlock() + var err error + s2.forEachRoute(func(r *client) { + if err != nil { + return + } + r.mu.Lock() + cm := r.route.compression + r.mu.Unlock() + if cm != expected { + err = fmt.Errorf("Route %v compression mode expected to be %q, got %q", r, expected, cm) + } + }) + return err + }) + } + checkComp(CompressionS2Uncompressed) + + // Change the proxy RTT and we should get compression "fast" + np.updateRTT(15 * time.Millisecond) + checkComp(CompressionS2Fast) + + // Now 25ms, and get "better" + np.updateRTT(25 * time.Millisecond) + checkComp(CompressionS2Better) + + // Above 35 and we should get "best" + np.updateRTT(35 * time.Millisecond) + checkComp(CompressionS2Best) + + // Down to 1ms and again should get "uncompressed" + np.updateRTT(1 * time.Millisecond) + checkComp(CompressionS2Uncompressed) + + // Do a config reload with disabling uncompressed + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", "100ms", "{mode: s2_auto, rtt_thresholds: [0ms, 10ms, 0ms, 30ms]}", routes)) + // Change the RTT back down to 1ms, but we should not go uncompressed, + // we should have "fast" compression. + np.updateRTT(1 * time.Millisecond) + checkComp(CompressionS2Fast) + // Now bump to 15ms and we should be using "best", not the "better" mode + np.updateRTT(15 * time.Millisecond) + checkComp(CompressionS2Best) + // Try 40ms and we should still be using "best" + np.updateRTT(40 * time.Millisecond) + checkComp(CompressionS2Best) + + // Try other variations + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", "100ms", "{mode: s2_auto, rtt_thresholds: [5ms, 15ms, 0ms, 0ms]}", routes)) + np.updateRTT(1 * time.Millisecond) + checkComp(CompressionS2Uncompressed) + np.updateRTT(10 * time.Millisecond) + checkComp(CompressionS2Fast) + // Since we expect the same compression level, just wait before doing + // the update and the next check. + time.Sleep(100 * time.Millisecond) + np.updateRTT(25 * time.Millisecond) + checkComp(CompressionS2Fast) +} + +func TestRoutePings(t *testing.T) { + routeMaxPingInterval = 50 * time.Millisecond + defer func() { routeMaxPingInterval = defaultRouteMaxPingInterval }() + + o1 := DefaultOptions() + s1 := RunServer(o1) + defer s1.Shutdown() + + o2 := DefaultOptions() + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + s2 := RunServer(o2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + ch := make(chan struct{}, 1) + s1.mu.RLock() + s1.forEachRemote(func(r *client) { + r.mu.Lock() + r.nc = &capturePingConn{r.nc, ch} + r.mu.Unlock() + }) + s1.mu.RUnlock() + + for i := 0; i < 5; i++ { + select { + case <-ch: + case <-time.After(250 * time.Millisecond): + t.Fatalf("Did not send PING") + } + } +} diff --git a/server/server.go b/server/server.go index 428ee579..f642a96f 100644 --- a/server/server.go +++ b/server/server.go @@ -41,6 +41,7 @@ import ( "sync/atomic" "time" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nkeys" "github.com/nats-io/nuid" @@ -84,6 +85,7 @@ type Info struct { ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. WSConnectURLs []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to. LameDuckMode bool `json:"ldm,omitempty"` + Compression string `json:"compression,omitempty"` // Route Specific Import *SubjectPermission `json:"import,omitempty"` @@ -325,9 +327,223 @@ type stats struct { outMsgs int64 inBytes int64 outBytes int64 + sentBytes int64 // Total of what is sent out, include all protocols slowConsumers int64 } +// This is used by tests so we can run all server tests with a default route +// compression mode. For instance: +// go test -race -v ./server -cluster_compression=fast +var testDefaultClusterCompression string + +// Compression modes. +const ( + CompressionNotSupported = "not supported" + CompressionOff = "off" + CompressionAccept = "accept" + CompressionS2Auto = "s2_auto" + CompressionS2Uncompressed = "s2_uncompressed" + CompressionS2Fast = "s2_fast" + CompressionS2Better = "s2_better" + CompressionS2Best = "s2_best" +) + +// defaultCompressionS2AutoRTTThresholds is the default of RTT thresholds for +// the CompressionS2Auto mode. +var defaultCompressionS2AutoRTTThresholds = []time.Duration{ + // [0..10ms] -> CompressionS2Uncompressed + 10 * time.Millisecond, + // ]10ms..50ms] -> CompressionS2Fast + 50 * time.Millisecond, + // ]50ms..100ms] -> CompressionS2Better + 100 * time.Millisecond, + // ]100ms..] -> CompressionS2Best +} + +// For a given user provided string, matches to one of the compression mode +// constant and updates the provided string to that constant. Returns an +// error if the provided compression mode is not known. +func validateAndNormalizeCompressionOption(c *CompressionOpts) error { + if c == nil { + return nil + } + cmtl := strings.ToLower(c.Mode) + switch cmtl { + case "not supported", "not_supported": + c.Mode = CompressionNotSupported + case "disabled", "off", "false": + c.Mode = CompressionOff + case "accept": + c.Mode = CompressionAccept + case "on", "enabled", "true", "auto", "s2_auto": + var rtts []time.Duration + if len(c.RTTThresholds) == 0 { + rtts = defaultCompressionS2AutoRTTThresholds + } else { + for _, n := range c.RTTThresholds { + // Do not error on negative, but simply set to 0 + if n < 0 { + n = 0 + } + // Make sure they are properly ordered. However, it is possible + // to have a "0" anywhere in the list to indicate that this + // compression level should not be used. + if l := len(rtts); l > 0 && n != 0 { + for _, v := range rtts { + if n < v { + return fmt.Errorf("RTT threshold values %v should be in ascending order", c.RTTThresholds) + } + } + } + rtts = append(rtts, n) + } + if len(rtts) > 0 { + // Trim 0 that are at the end. + stop := -1 + for i := len(rtts) - 1; i >= 0; i-- { + if rtts[i] != 0 { + stop = i + break + } + } + rtts = rtts[:stop+1] + } + if len(rtts) > 4 { + // There should be at most values for "uncompressed", "fast", + // "better" and "best" (when some 0 are present). + return fmt.Errorf("The compression mode %q should have no more than 4 RTT thresholds: %v", c.Mode, c.RTTThresholds) + } else if len(rtts) == 0 { + // But there should be at least 1 if the user provided the slice. + // We would be here only if it was provided by say with values + // being a single or all zeros. + return fmt.Errorf("The compression mode %q requires at least one RTT threshold", c.Mode) + } + } + c.Mode = CompressionS2Auto + c.RTTThresholds = rtts + case "fast", "s2_fast": + c.Mode = CompressionS2Fast + case "better", "s2_better": + c.Mode = CompressionS2Better + case "best", "s2_best": + c.Mode = CompressionS2Best + default: + return fmt.Errorf("Unsupported compression mode %q", c.Mode) + } + return nil +} + +// Returns `true` if the compression mode `m` indicates that the server +// will negotiate compression with the remote server, `false` otherwise. +// Note that the provided compression mode is assumed to have been +// normalized and validated. +func needsCompression(m string) bool { + return m != _EMPTY_ && m != CompressionOff && m != CompressionNotSupported +} + +// Compression is asymmetric, meaning that one side can have a different +// compression level than the other. However, we need to check for cases +// when this server `scm` or the remote `rcm` do not support compression +// (say older server, or test to make it behave as it is not), or have +// the compression off. +// Note that `scm` is assumed to not be "off" or "not supported". +func selectCompressionMode(scm, rcm string) (mode string, err error) { + if rcm == CompressionNotSupported || rcm == _EMPTY_ { + return CompressionNotSupported, nil + } + switch rcm { + case CompressionOff: + // If the remote explicitly disables compression, then we won't + // use compression. + return CompressionOff, nil + case CompressionAccept: + // If the remote is ok with compression (but is not initiating it), + // and if we too are in this mode, then it means no compression. + if scm == CompressionAccept { + return CompressionOff, nil + } + // Otherwise use our compression mode. + return scm, nil + case CompressionS2Auto, CompressionS2Uncompressed, CompressionS2Fast, CompressionS2Better, CompressionS2Best: + // This case is here to make sure that if we don't recognize a + // compression setting, we error out. + if scm == CompressionAccept { + // If our compression mode is "accept", then we will use the remote + // compression mode, except if it is "auto", in which case we will + // default to "fast". This is not a configuration (auto in one + // side and accept in the other) that would be recommended. + if rcm == CompressionS2Auto { + return CompressionS2Fast, nil + } + // Use their compression mode. + return rcm, nil + } + // Otherwise use our compression mode. + return scm, nil + default: + return _EMPTY_, fmt.Errorf("Unsupported route compression mode %q", rcm) + } +} + +// Given a connection RTT and a list of thresholds durations, this +// function will return an S2 compression level such as "uncompressed", +// "fast", "better" or "best". For instance, with the following slice: +// [5ms, 10ms, 15ms, 20ms], a RTT of up to 5ms will result +// in the compression level "uncompressed", ]5ms..10ms] will result in +// "fast" compression, etc.. +// However, the 0 value allows for disabling of some compression levels. +// For instance, the following slice: [0, 0, 20, 30] means that a RTT of +// [0..20ms] would result in the "better" compression - effectively disabling +// the use of "uncompressed" and "fast", then anything above 20ms would +// result in the use of "best" level (the 30 in the list has no effect +// and the list could have been simplified to [0, 0, 20]). +func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration) string { + var idx int + var found bool + for i, d := range rttThresholds { + if rtt <= d { + idx = i + found = true + break + } + } + if !found { + // If we did not find but we have all levels, then use "best", + // otherwise use the last one in array. + if l := len(rttThresholds); l >= 3 { + idx = 3 + } else { + idx = l - 1 + } + } + switch idx { + case 0: + return CompressionS2Uncompressed + case 1: + return CompressionS2Fast + case 2: + return CompressionS2Better + } + return CompressionS2Best +} + +// Returns an array of s2 WriterOption based on the route compression mode. +// So far we return a single option, but this way we can call s2.NewWriter() +// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so +// this is more versatile. +func s2WriterOptions(cm string) []s2.WriterOption { + switch cm { + case CompressionS2Uncompressed: + return []s2.WriterOption{s2.WriterUncompressed()} + case CompressionS2Best: + return []s2.WriterOption{s2.WriterBestCompression()} + case CompressionS2Better: + return []s2.WriterOption{s2.WriterBetterCompression()} + default: + return nil + } +} + // New will setup a new server struct after parsing the options. // DEPRECATED: Use NewServer(opts) func New(opts *Options) *Server { @@ -690,6 +906,11 @@ func (s *Server) ClientURL() string { } func validateCluster(o *Options) error { + if o.Cluster.Compression.Mode != _EMPTY_ { + if err := validateAndNormalizeCompressionOption(&o.Cluster.Compression); err != nil { + return err + } + } if err := validatePinnedCerts(o.Cluster.TLSPinnedCerts); err != nil { return fmt.Errorf("cluster: %v", err) } diff --git a/server/sublist_test.go b/server/sublist_test.go index b9e12a26..2bf33b63 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1558,6 +1558,7 @@ var benchSublistSl = NewSublistWithCache() // https://github.com/golang/go/issues/31859 func TestMain(m *testing.M) { + flag.StringVar(&testDefaultClusterCompression, "cluster_compression", _EMPTY_, "Test with this compression level as the default") flag.Parse() initSublist := false flag.Visit(func(f *flag.Flag) { diff --git a/test/bench_test.go b/test/bench_test.go index 17916721..0cc8ebf6 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" ) const PERF_PORT = 8422 @@ -735,6 +736,105 @@ func Benchmark___Routed16QueueSub(b *testing.B) { routeQueue(b, 16, 2) } +func doS2CompressBench(b *testing.B, compress string) { + b.StopTimer() + conf1 := createConfFile(b, []byte(fmt.Sprintf(` + port: -1 + cluster { + name: "local" + port: -1 + pool_size: -1 + compression: %s + } + `, compress))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(b, []byte(fmt.Sprintf(` + port: -1 + cluster { + name: "local" + port: -1 + pool_size: -1 + compression: %s + routes: ["nats://127.0.0.1:%d"] + } + `, compress, o1.Cluster.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(b, s1, s2) + + nc2, err := nats.Connect(s2.ClientURL()) + if err != nil { + b.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + ch := make(chan struct{}, 1) + var count int + nc2.Subscribe("foo", func(_ *nats.Msg) { + if count++; count == b.N { + select { + case ch <- struct{}{}: + default: + } + } + }) + + checkSubInterest(b, s1, "$G", "foo", time.Second) + + nc1, err := nats.Connect(s1.ClientURL()) + if err != nil { + b.Fatalf("Error on connect: %v", err) + } + defer nc1.Close() + + // This one is easily compressible. + payload1 := make([]byte, 128) + // Make it random so that compression code has more to do. + payload2 := make([]byte, 256) + for i := 0; i < len(payload); i++ { + payload2[i] = byte(rand.Intn(26) + 'A') + } + b.StartTimer() + + for i := 0; i < b.N; i++ { + if i%2 == 0 { + nc1.Publish("foo", payload1) + } else { + nc1.Publish("foo", payload2) + } + } + + select { + case <-ch: + return + case <-time.After(10 * time.Second): + b.Fatal("Timeout waiting to receive all messages") + } +} + +func Benchmark____________RouteCompressOff(b *testing.B) { + doS2CompressBench(b, server.CompressionOff) +} + +func Benchmark_RouteCompressS2Uncompressed(b *testing.B) { + doS2CompressBench(b, server.CompressionS2Uncompressed) +} + +func Benchmark_________RouteCompressS2Fast(b *testing.B) { + doS2CompressBench(b, server.CompressionS2Fast) +} + +func Benchmark_______RouteCompressS2Better(b *testing.B) { + doS2CompressBench(b, server.CompressionS2Better) +} + +func Benchmark_________RouteCompressS2Best(b *testing.B) { + doS2CompressBench(b, server.CompressionS2Best) +} + func doFanout(b *testing.B, numServers, numConnections, subsPerConnection int, subject, payload string) { var s1, s2 *server.Server var o1, o2 *server.Options diff --git a/test/cluster_test.go b/test/cluster_test.go index f8354d46..477ec4db 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -74,7 +74,7 @@ func checkExpectedSubs(expected int, servers ...*server.Server) error { return nil } -func checkSubInterest(t *testing.T, s *server.Server, accName, subject string, timeout time.Duration) { +func checkSubInterest(t testing.TB, s *server.Server, accName, subject string, timeout time.Duration) { t.Helper() checkFor(t, timeout, 15*time.Millisecond, func() error { acc, err := s.LookupAccount(accName) diff --git a/test/cluster_tls_test.go b/test/cluster_tls_test.go index c968d50a..3238424c 100644 --- a/test/cluster_tls_test.go +++ b/test/cluster_tls_test.go @@ -104,6 +104,7 @@ func TestClusterTLSInsecure(t *testing.T) { name: "xyz" listen: "127.0.0.1:-1" pool_size: -1 + compression: "disabled" tls { cert_file: "./configs/certs/server-noip.pem" key_file: "./configs/certs/server-key-noip.pem" @@ -124,6 +125,7 @@ func TestClusterTLSInsecure(t *testing.T) { name: "xyz" listen: "127.0.0.1:-1" pool_size: -1 + compression: "disabled" tls { cert_file: "./configs/certs/server-noip.pem" key_file: "./configs/certs/server-key-noip.pem" diff --git a/test/norace_test.go b/test/norace_test.go index ffaf45c1..260ad5f8 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -47,6 +47,7 @@ func TestNoRaceRouteSendSubs(t *testing.T) { cluster { port: -1 pool_size: -1 + compression: disabled %s } no_sys_acc: true diff --git a/test/ocsp_test.go b/test/ocsp_test.go index 0205342a..bd2b071b 100644 --- a/test/ocsp_test.go +++ b/test/ocsp_test.go @@ -1206,6 +1206,7 @@ func TestOCSPCluster(t *testing.T) { cluster { port: -1 pool_size: -1 + compression: "disabled" name: AB host: "127.0.0.1" advertise: 127.0.0.1 diff --git a/test/operator_test.go b/test/operator_test.go index 0afca441..590ec74b 100644 --- a/test/operator_test.go +++ b/test/operator_test.go @@ -342,6 +342,7 @@ func TestReloadDoesNotWipeAccountsWithOperatorMode(t *testing.T) { name: "A" listen: 127.0.0.1:-1 pool_size: -1 + compression: "disabled" authorization { timeout: 2.2 } %s diff --git a/test/test.go b/test/test.go index 0a1eea6a..3b2264d5 100644 --- a/test/test.go +++ b/test/test.go @@ -81,6 +81,8 @@ func RunServerCallback(opts *server.Options, callback func(*server.Server)) *ser opts.Debug = doDebug // For all tests in the "test" package, we will disable route pooling. opts.Cluster.PoolSize = -1 + // Also disable compression for "test" package. + opts.Cluster.Compression.Mode = server.CompressionOff s, err := server.NewServer(opts) if err != nil || s == nil {