From 67498af2dce063052cffb1785c8b36d22af6fcfb Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 15 May 2023 17:42:39 -0600 Subject: [PATCH] [ADDED] LeafNode: Support for s2 compression This is similar to PR #4115 but for LeafNodes. Compression mode can be set on both side (the accept and in remotes). ``` leafnodes { port: 7422 compression: s2_best remotes [ { url: "nats://host2:74222" compression: s2_better } ] } ``` Possible modes are similar than for routes (described in PR #4115), except that when not defined we default to `s2_auto`. Signed-off-by: Ivan Kozlovic --- server/client.go | 74 ++- server/config_check_test.go | 132 +++++ server/leafnode.go | 343 +++++++++---- server/leafnode_test.go | 949 +++++++++++++++++++++++++++++++++++- server/monitor.go | 48 +- server/opts.go | 42 +- server/reload.go | 105 +++- server/reload_test.go | 320 +++++++++++- server/route.go | 32 +- server/routes_test.go | 16 +- server/server.go | 43 +- server/sublist_test.go | 1 + test/leafnode_test.go | 2 + test/ocsp_test.go | 4 + test/test.go | 1 + 15 files changed, 1904 insertions(+), 208 deletions(-) diff --git a/server/client.go b/server/client.go index 65dea201..ba5328d1 100644 --- a/server/client.go +++ b/server/client.go @@ -1275,7 +1275,7 @@ func (c *client) readLoop(pre []byte) { if ws { masking = c.ws.maskread } - checkCompress := c.kind == ROUTER + checkCompress := c.kind == ROUTER || c.kind == LEAF c.mu.Unlock() defer func() { @@ -1303,7 +1303,9 @@ func (c *client) readLoop(pre []byte) { wsr.init() } - var decompress *s2.Reader + var decompress bool + var reader io.Reader + reader = nc for { var n int @@ -1315,11 +1317,7 @@ func (c *client) readLoop(pre []byte) { n = len(pre) pre = nil } else { - if decompress != nil { - n, err = decompress.Read(b) - } else { - n, err = nc.Read(b) - } + n, err = reader.Read(b) // If we have any data we will try to parse and exit at the end. if n == 0 && err != nil { c.closeConnection(closedStateForErr(err)) @@ -1327,7 +1325,7 @@ func (c *client) readLoop(pre []byte) { } } if ws { - bufs, err = c.wsRead(wsr, nc, b[:n]) + bufs, err = c.wsRead(wsr, reader, b[:n]) if bufs == nil && err != nil { if err != io.EOF { c.Errorf("read error: %v", err) @@ -1386,11 +1384,13 @@ func (c *client) readLoop(pre []byte) { } } - // If we are a ROUTER and have processed an INFO, it is possible that + // If we are a ROUTER/LEAF and have processed an INFO, it is possible that // we are asked to switch to compression now. if checkCompress && c.in.flags.isSet(switchToCompression) { c.in.flags.clear(switchToCompression) - decompress = s2.NewReader(nc) + // For now we support only s2 compression... + reader = s2.NewReader(nc) + decompress = true } // Updates stats for client and server that were collected @@ -1440,8 +1440,11 @@ func (c *client) readLoop(pre []byte) { // Refresh nc because in some cases, we have upgraded c.nc to TLS. if nc != c.nc { nc = c.nc - if decompress != nil { - decompress.Reset(nc) + if decompress && nc != nil { + // For now we support only s2 compression... + reader.(*s2.Reader).Reset(nc) + } else if !decompress { + reader = nc } } c.mu.Unlock() @@ -1545,6 +1548,9 @@ func (c *client) flushOutbound() bool { if cw != nil { // We will have to adjust once we have compressed, so remove for now. c.out.pb -= attempted + if c.isWebsocket() { + c.ws.fs -= attempted + } } // Do NOT hold lock during actual IO. @@ -1604,6 +1610,9 @@ func (c *client) flushOutbound() bool { // Adjust if we were compressing. if cw != nil { c.out.pb += attempted + if c.isWebsocket() { + c.ws.fs += attempted + } } // At this point, "wnb" has been mutated by WriteTo and any consumed @@ -2389,16 +2398,19 @@ func (c *client) processPong() { c.rtt = computeRTT(c.rttStart) srv := c.srv reorderGWs := c.kind == GATEWAY && c.gw.outbound - // If compression is currently active for a route connection, if the + // If compression is currently active for a route/leaf connection, if the // compression configuration is s2_auto, check if we should change // the compression level. if c.kind == ROUTER && needsCompression(c.route.compression) { - if co := &(srv.getOpts().Cluster.Compression); co.Mode == CompressionS2Auto { - if cm := selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds); cm != c.route.compression { - c.route.compression = cm - c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...) - } + c.updateS2AutoCompressionLevel(&srv.getOpts().Cluster.Compression, &c.route.compression) + } else if c.kind == LEAF && needsCompression(c.leaf.compression) { + var co *CompressionOpts + if r := c.leaf.remote; r != nil { + co = &r.Compression + } else { + co = &srv.getOpts().LeafNode.Compression } + c.updateS2AutoCompressionLevel(co, &c.leaf.compression) } c.mu.Unlock() if reorderGWs { @@ -2406,6 +2418,20 @@ func (c *client) processPong() { } } +// Select the s2 compression level based on the client's current RTT and the configured +// RTT thresholds slice. If current level is different than selected one, save the +// new compression level string and create a new s2 writer. +// Lock held on entry. +func (c *client) updateS2AutoCompressionLevel(co *CompressionOpts, compression *string) { + if co.Mode != CompressionS2Auto { + return + } + if cm := selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds); cm != *compression { + *compression = cm + c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...) + } +} + // Will return the parts from the raw wire msg. func (c *client) msgParts(data []byte) (hdr []byte, msg []byte) { if c != nil && c.pa.hdr > 0 { @@ -4711,8 +4737,8 @@ func (c *client) processPingTimer() { now := time.Now() needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL - // Do not delay PINGs for ROUTER, GATEWAY or spoke LEAF connections. - if c.kind == ROUTER || c.kind == GATEWAY || c.isSpokeLeafNode() { + // Do not delay PINGs for ROUTER, GATEWAY or LEAF connections. + if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF { sendPing = true } else { // If we received client data or a ping from the other side within the PingInterval, @@ -4746,9 +4772,9 @@ func (c *client) processPingTimer() { // based on the connection kind. func adjustPingInterval(kind int, d time.Duration) time.Duration { switch kind { - case ROUTER: - if d > routeMaxPingInterval { - return routeMaxPingInterval + case ROUTER, LEAF: + if d > connWithCompressionMaxPingInterval { + return connWithCompressionMaxPingInterval } case GATEWAY: if d > gatewayMaxPingInterval { @@ -5646,7 +5672,7 @@ func (c *client) setFirstPingTimer() { // We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s. addDelay := rand.Int63n(int64(d / 5)) d += time.Duration(addDelay) - // In the case of ROUTER and when compression is configured, it is possible + // In the case of ROUTER/LEAF and when compression is configured, it is possible // that this timer was already set, but just to detect a stale connection // since we have to delay the first PING after compression negotiation // occurred. diff --git a/server/config_check_test.go b/server/config_check_test.go index f891ee0d..f1718b35 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1659,6 +1659,138 @@ func TestConfigCheck(t *testing.T) { errorLine: 6, errorPos: 7, }, + { + name: "wrong type for leafnodes compression", + config: ` + leafnodes { + port: -1 + compression: 123 + } + `, + err: fmt.Errorf("field %q should be a boolean or a structure, got int64", "compression"), + errorLine: 4, + errorPos: 6, + }, + { + name: "wrong type for leafnodes compression mode", + config: ` + leafnodes { + port: -1 + compression: { + mode: 123 + } + } + `, + err: fmt.Errorf("interface conversion: interface {} is int64, not string"), + errorLine: 5, + errorPos: 7, + }, + { + name: "wrong type for leafnodes compression rtt thresholds", + config: ` + leafnodes { + port: -1 + compression: { + mode: "s2_auto" + rtt_thresholds: 123 + } + } + `, + err: fmt.Errorf("interface conversion: interface {} is int64, not []interface {}"), + errorLine: 6, + errorPos: 7, + }, + { + name: "invalid durations for leafnodes compression rtt thresholds", + config: ` + leafnodes { + port: -1 + compression: { + mode: "s2_auto" + rtt_thresholds: [abc] + } + } + `, + err: fmt.Errorf("time: invalid duration %q", "abc"), + errorLine: 6, + errorPos: 7, + }, + { + name: "wrong type for remote leafnodes compression", + config: ` + leafnodes { + port: -1 + remotes [ + { + url: "nats://127.0.0.1:123" + compression: 123 + } + ] + } + `, + err: fmt.Errorf("field %q should be a boolean or a structure, got int64", "compression"), + errorLine: 7, + errorPos: 8, + }, + { + name: "wrong type for remote leafnodes compression mode", + config: ` + leafnodes { + port: -1 + remotes [ + { + url: "nats://127.0.0.1:123" + compression: { + mode: 123 + } + } + ] + } + `, + err: fmt.Errorf("interface conversion: interface {} is int64, not string"), + errorLine: 8, + errorPos: 9, + }, + { + name: "wrong type for remote leafnodes compression rtt thresholds", + config: ` + leafnodes { + port: -1 + remotes [ + { + url: "nats://127.0.0.1:123" + compression: { + mode: "s2_auto" + rtt_thresholds: 123 + } + } + ] + } + `, + err: fmt.Errorf("interface conversion: interface {} is int64, not []interface {}"), + errorLine: 9, + errorPos: 9, + }, + { + name: "invalid durations for remote leafnodes compression rtt thresholds", + config: ` + leafnodes { + port: -1 + remotes [ + { + url: "nats://127.0.0.1:123" + compression: { + mode: "s2_auto" + rtt_thresholds: [abc] + } + } + ] + } + `, + err: fmt.Errorf("time: invalid duration %q", "abc"), + errorLine: 9, + errorPos: 9, + }, } checkConfig := func(config string) error { diff --git a/server/leafnode.go b/server/leafnode.go index 6c4e0a0f..7f3abf2a 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -34,6 +34,7 @@ import ( "sync/atomic" "time" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nkeys" "github.com/nats-io/nuid" @@ -90,6 +91,8 @@ type leaf struct { // we would add it a second time in the smap causing later unsub to suppress the LS-. tsub map[*subscription]struct{} tsubt *time.Timer + // Selected compression mode, which may be different from the server configured mode. + compression string } // Used for remote (solicited) leafnodes. @@ -241,6 +244,13 @@ func validateLeafNode(o *Options) error { } } + // Validate compression settings + if o.LeafNode.Compression.Mode != _EMPTY_ { + if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil { + return err + } + } + // If a remote has a websocket scheme, all need to have it. for _, rcfg := range o.LeafNode.Remotes { if len(rcfg.URLs) >= 2 { @@ -256,6 +266,12 @@ func validateLeafNode(o *Options) error { return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs)) } } + // Validate compression settings + if rcfg.Compression.Mode != _EMPTY_ { + if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil { + return err + } + } } if o.LeafNode.Port == 0 { @@ -689,6 +705,11 @@ func (s *Server) startLeafNodeAcceptLoop() { Proto: 1, // Fixed for now. InfoOnConnect: true, } + // For tests that want to simulate old servers, do not set the compression + // on the INFO protocol if configured with CompressionNotSupported. + if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported { + info.Compression = cm + } // If we have selected a random port... if port == 0 { // Write resolved port back to options. @@ -736,19 +757,19 @@ var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[- // clusterName is provided as argument to avoid lock ordering issues with the locked client c // Lock should be held entering here. -func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error { +func (c *client) sendLeafConnect(clusterName string, headers bool) error { // We support basic user/pass and operator based user JWT with signatures. cinfo := leafConnectInfo{ - Version: VERSION, - TLS: tlsRequired, - ID: c.srv.info.ID, - Domain: c.srv.info.Domain, - Name: c.srv.info.Name, - Hub: c.leaf.remote.Hub, - Cluster: clusterName, - Headers: headers, - JetStream: c.acc.jetStreamConfigured(), - DenyPub: c.leaf.remote.DenyImports, + Version: VERSION, + ID: c.srv.info.ID, + Domain: c.srv.info.Domain, + Name: c.srv.info.Name, + Hub: c.leaf.remote.Hub, + Cluster: clusterName, + Headers: headers, + JetStream: c.acc.jetStreamConfigured(), + DenyPub: c.leaf.remote.DenyImports, + Compression: c.leaf.compression, } // If a signature callback is specified, this takes precedence over anything else. @@ -967,6 +988,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf // Grab server variables s.mu.Lock() info = s.copyLeafNodeInfo() + info.Compression = opts.LeafNode.Compression.Mode s.generateNonce(nonce[:]) s.mu.Unlock() } @@ -995,26 +1017,9 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf } else { // If configured to do TLS handshake first if tlsFirst { - // Still check if there is really need for TLS in case user set - // this boolean but nothing else... - tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote, true) - - // If TLS required, peform handshake. - if tlsRequired { - // Get the URL that was used to connect to the remote server. - rURL := remote.getCurrentURL() - - // Perform the client-side TLS handshake. - if resetTLSName, err := c.doTLSClientHandshake("leafnode", rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil { - // Check if we need to reset the remote's TLS name. - if resetTLSName { - remote.Lock() - remote.tlsName = _EMPTY_ - remote.Unlock() - } - c.mu.Unlock() - return nil - } + if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil { + c.mu.Unlock() + return nil } } // We need to wait for the info, but not for too long. @@ -1068,7 +1073,20 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf // Leaf nodes will always require a CONNECT to let us know // when we are properly bound to an account. - c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout)) + // + // If compression is configured, we can't set the authTimer here because + // it would cause the parser to fail any incoming protocol that is not a + // CONNECT (and we need to exchange INFO protocols for compression + // negotiation). So instead, use the ping timer until we are done with + // negotiation and can set the auth timer. + timeout := secondsToDuration(opts.LeafNode.AuthTimeout) + if needsCompression(opts.LeafNode.Compression.Mode) { + c.ping.tmr = time.AfterFunc(timeout, func() { + c.authTimeout() + }) + } else { + c.setAuthTimer(timeout) + } } // Keep track in case server is shutdown before we can successfully register. @@ -1093,22 +1111,112 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf return c } -func (c *client) processLeafnodeInfo(info *Info) { - s := c.srv +// Will perform the client-side TLS handshake if needed. Assumes that this +// is called by the solicit side (remote will be non nil). Returns `true` +// if TLS is required, `false` otherwise. +// Lock held on entry. +func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) { + // Check if TLS is required and gather TLS config variables. + tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote) + if !tlsRequired { + return false, nil + } + // If TLS required, peform handshake. + // Get the URL that was used to connect to the remote server. + rURL := remote.getCurrentURL() + + // Perform the client-side TLS handshake. + if resetTLSName, err := c.doTLSClientHandshake("leafnode", rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil { + // Check if we need to reset the remote's TLS name. + if resetTLSName { + remote.Lock() + remote.tlsName = _EMPTY_ + remote.Unlock() + } + return false, err + } + return true, nil +} + +func (c *client) processLeafnodeInfo(info *Info) { c.mu.Lock() if c.leaf == nil || c.isClosed() { c.mu.Unlock() return } + s := c.srv + opts := s.getOpts() + remote := c.leaf.remote + didSolicit := remote != nil + firstINFO := !c.flags.isSet(infoReceived) - var firstINFO bool + // In case of websocket, the TLS handshake has been already done. + // So check only for non websocket connections and for configurations + // where the TLS Handshake was not done first. + if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst { + if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil { + c.mu.Unlock() + return + } + } + + // Check for compression, unless already done. + if firstINFO && !c.flags.isSet(compressionNegotiated) { + // Prevent from getting back here. + c.flags.set(compressionNegotiated) + + var co *CompressionOpts + if !didSolicit { + co = &opts.LeafNode.Compression + } else { + co = &remote.Compression + } + if needsCompression(co.Mode) { + // Release client lock since following function will need server lock. + c.mu.Unlock() + compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co) + if err != nil { + c.sendErrAndErr(err.Error()) + c.closeConnection(ProtocolViolation) + return + } + if compress { + // Done for now, will get back another INFO protocol... + return + } + // No compression because one side does not want/can't, so proceed. + c.mu.Lock() + // Check that the connection did not close if the lock was released. + if c.isClosed() { + c.mu.Unlock() + return + } + } else { + // Coming from an old server, the Compression field would be the empty + // string. For servers that are configured with CompressionNotSupported, + // this makes them behave as old servers. + if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported { + c.leaf.compression = CompressionNotSupported + } else { + c.leaf.compression = CompressionOff + } + } + // Fall through and process the INFO protocol as usual. + } else if firstINFO && !didSolicit && needsCompression(opts.LeafNode.Compression.Mode) { + // We used the ping timer instead of auth timer when accepting a remote + // connection so that we can exchange INFO protocols and not have the + // parser return a protocol violation. Now that the negotiation is over + // stop the ping timer and set the auth timer. + clearTimer(&c.ping.tmr) + c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout)) + } - // Mark that the INFO protocol has been received. // Note: For now, only the initial INFO has a nonce. We // will probably do auto key rotation at some point. - if c.flags.setIfNotSet(infoReceived) { - firstINFO = true + if firstINFO { + // Mark that the INFO protocol has been received. + c.flags.set(infoReceived) // Prevent connecting to non leafnode port. Need to do this only for // the first INFO, not for async INFO updates... // @@ -1128,7 +1236,7 @@ func (c *client) processLeafnodeInfo(info *Info) { // As seen from above, a solicited LeafNode connection should receive // from the remote server an INFO with CID and LeafNodeURLs. Anything // else should be considered an attempt to connect to a wrong port. - if c.leaf.remote != nil && (info.CID == 0 || info.LeafNodeURLs == nil) { + if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) { c.mu.Unlock() c.Errorf(ErrConnectedToWrongPort.Error()) c.closeConnection(WrongPort) @@ -1136,8 +1244,8 @@ func (c *client) processLeafnodeInfo(info *Info) { } // Capture a nonce here. c.nonce = []byte(info.Nonce) - if info.TLSRequired && c.leaf.remote != nil { - c.leaf.remote.TLS = true + if info.TLSRequired && didSolicit { + remote.TLS = true } supportsHeaders := c.srv.supportsHeaders() c.headers = supportsHeaders && info.Headers @@ -1157,7 +1265,7 @@ func (c *client) processLeafnodeInfo(info *Info) { // For both initial INFO and async INFO protocols, Possibly // update our list of remote leafnode URLs we can connect to. - if c.leaf.remote != nil && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) { + if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) { // Consider the incoming array as the most up-to-date // representation of the remote cluster's list of URLs. c.updateLeafNodeURLs(info) @@ -1191,7 +1299,7 @@ func (c *client) processLeafnodeInfo(info *Info) { // If this is a remote connection and this is the first INFO protocol, // then we need to finish the connect process by sending CONNECT, etc.. - if firstINFO && c.leaf.remote != nil { + if firstINFO && didSolicit { // Clear deadline that was set in createLeafNode while waiting for the INFO. c.nc.SetDeadline(time.Time{}) resumeConnect = true @@ -1215,6 +1323,67 @@ func (c *client) processLeafnodeInfo(info *Info) { } } +func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) { + // Negotiate the appropriate compression mode (or no compression) + cm, err := selectCompressionMode(co.Mode, infoCompression) + if err != nil { + return false, err + } + c.mu.Lock() + // For "auto" mode, set the initial compression mode based on RTT + if cm == CompressionS2Auto { + if c.rttStart.IsZero() { + c.rtt = computeRTT(c.start) + } + cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds) + } + // Keep track of the negotiated compression mode. + c.leaf.compression = cm + cid := c.cid + var nonce string + if !didSolicit { + nonce = string(c.nonce) + } + c.mu.Unlock() + + if !needsCompression(cm) { + return false, nil + } + + // If we end-up doing compression... + + // Generate an INFO with the chosen compression mode. + s.mu.Lock() + info := s.copyLeafNodeInfo() + info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce + infoProto := generateInfoJSON(info) + s.mu.Unlock() + + // If we solicited, then send this INFO protocol BEFORE switching + // to compression writer. However, if we did not, we send it after. + c.mu.Lock() + if didSolicit { + c.enqueueProto(infoProto) + // Make sure it is completely flushed (the pending bytes goes to + // 0) before proceeding. + for c.out.pb > 0 && !c.isClosed() { + c.flushOutbound() + } + } + // This is to notify the readLoop that it should switch to a + // (de)compression reader. + c.in.flags.set(switchToCompression) + // Create the compress writer before queueing the INFO protocol for + // a route that did not solicit. It will make sure that that proto + // is sent with compression on. + c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...) + if !didSolicit { + c.enqueueProto(infoProto) + } + c.mu.Unlock() + return true, nil +} + // When getting a leaf node INFO protocol, use the provided // array of urls to update the list of possible endpoints. func (c *client) updateLeafNodeURLs(info *Info) { @@ -1500,8 +1669,6 @@ type leafConnectInfo struct { Sig string `json:"sig,omitempty"` User string `json:"user,omitempty"` Pass string `json:"pass,omitempty"` - TLS bool `json:"tls_required"` - Comp bool `json:"compression,omitempty"` ID string `json:"server_id,omitempty"` Domain string `json:"domain,omitempty"` Name string `json:"name,omitempty"` @@ -1511,6 +1678,12 @@ type leafConnectInfo struct { JetStream bool `json:"jetstream,omitempty"` DenyPub []string `json:"deny_pub,omitempty"` + // There was an existing field called: + // >> Comp bool `json:"compression,omitempty"` + // that has never been used. With support for compression, we now need + // a field that is a string. So we use a different json tag: + Compression string `json:"compress_mode,omitempty"` + // Just used to detect wrong connection attempts. Gateway string `json:"gateway,omitempty"` } @@ -1581,6 +1754,16 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // support headers and the remote has sent in the CONNECT protocol that it does // support headers too. c.headers = supportHeaders && proto.Headers + // If the compression level is still not set, set it based on what has been + // given to us in the CONNECT protocol. + if c.leaf.compression == _EMPTY_ { + // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported + if proto.Compression == _EMPTY_ { + c.leaf.compression = CompressionNotSupported + } else { + c.leaf.compression = proto.Compression + } + } // Remember the remote server. c.leaf.remoteServer = proto.Name @@ -2509,16 +2692,16 @@ func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, t // if TLS is required, and if so, will return a clone of the TLS Config // (since some fields will be changed during handshake), the TLS server // name that is remembered, and the TLS timeout. -func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg, needsLock bool) (bool, *tls.Config, string, float64) { +func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) { var ( tlsConfig *tls.Config tlsName string tlsTimeout float64 ) - if needsLock { - remote.RLock() - } + remote.RLock() + defer remote.RUnlock() + tlsRequired := remote.TLS || remote.TLSConfig != nil if tlsRequired { if remote.TLSConfig != nil { @@ -2532,9 +2715,6 @@ func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg, needsLock b tlsTimeout = float64(TLS_TIMEOUT / time.Second) } } - if needsLock { - remote.RUnlock() - } return tlsRequired, tlsConfig, tlsName, tlsTimeout } @@ -2555,21 +2735,12 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot compress := remote.Websocket.Compression // By default the server will mask outbound frames, but it can be disabled with this option. noMasking := remote.Websocket.NoMasking - tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote, false) remote.RUnlock() - // Do TLS here as needed. - if tlsRequired { - // Perform the client-side TLS handshake. - if resetTLSName, err := c.doTLSClientHandshake("leafnode", rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil { - // Check if we need to reset the remote's TLS name. - if resetTLSName { - remote.Lock() - remote.tlsName = _EMPTY_ - remote.Unlock() - } - // 0 will indicate that the connection was already closed - return nil, 0, err - } + // Will do the client-side TLS handshake if needed. + tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts) + if err != nil { + // 0 will indicate that the connection was already closed + return nil, 0, err } // For http request, we need the passed URL to contain either http or https scheme. @@ -2671,7 +2842,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot const connectProcessTimeout = 2 * time.Second // This is invoked for remote LEAF remote connections after processing the INFO -// protocol. This will do the TLS handshake (if need be) +// protocol. func (s *Server) leafNodeResumeConnectProcess(c *client) { clusterName := s.ClusterName() @@ -2680,40 +2851,7 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) { c.mu.Unlock() return } - remote := c.leaf.remote - - var tlsRequired bool - - // In case of websocket, the TLS handshake has been already done. - // So check only for non websocket connections and for configurations - // where the TLS Handshake was not done first. - if !c.isWebsocket() && !remote.TLSHandshakeFirst { - var tlsConfig *tls.Config - var tlsName string - var tlsTimeout float64 - - // Check if TLS is required and gather TLS config variables. - tlsRequired, tlsConfig, tlsName, tlsTimeout = c.leafNodeGetTLSConfigForSolicit(remote, true) - - // If TLS required, peform handshake. - if tlsRequired { - // Get the URL that was used to connect to the remote server. - rURL := remote.getCurrentURL() - - // Perform the client-side TLS handshake. - if resetTLSName, err := c.doTLSClientHandshake("leafnode", rURL, tlsConfig, tlsName, tlsTimeout, c.srv.getOpts().LeafNode.TLSPinnedCerts); err != nil { - // Check if we need to reset the remote's TLS name. - if resetTLSName { - remote.Lock() - remote.tlsName = _EMPTY_ - remote.Unlock() - } - c.mu.Unlock() - return - } - } - } - if err := c.sendLeafConnect(clusterName, tlsRequired, c.headers); err != nil { + if err := c.sendLeafConnect(clusterName, c.headers); err != nil { c.mu.Unlock() c.closeConnection(WriteError) return @@ -2722,6 +2860,9 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) { // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) + // In case there was compression negotiation, the timer could have been + // already created. Destroy and recreate with different callback. + clearTimer(&c.ping.tmr) // timeout leafNodeFinishConnectProcess c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() { c.mu.Lock() diff --git a/server/leafnode_test.go b/server/leafnode_test.go index ad3f52f5..437f3e34 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -22,6 +22,7 @@ import ( "math/rand" "net" "net/url" + "reflect" "strings" "sync" "sync/atomic" @@ -30,6 +31,7 @@ import ( "github.com/nats-io/nkeys" + "github.com/klauspost/compress/s2" jwt "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" @@ -986,14 +988,10 @@ func TestLeafCloseTLSConnection(t *testing.T) { if err := tlsConn.Handshake(); err != nil { t.Fatalf("Unexpected error during handshake: %v", err) } - connectOp := []byte("CONNECT {\"name\":\"leaf\",\"verbose\":false,\"pedantic\":false,\"tls_required\":true}\r\n") + connectOp := []byte("CONNECT {\"name\":\"leaf\",\"verbose\":false,\"pedantic\":false}\r\n") if _, err := tlsConn.Write(connectOp); err != nil { t.Fatalf("Unexpected error writing CONNECT: %v", err) } - infoOp := []byte("INFO {\"server_id\":\"leaf\",\"tls_required\":true}\r\n") - if _, err := tlsConn.Write(infoOp); err != nil { - t.Fatalf("Unexpected error writing CONNECT: %v", err) - } if _, err := tlsConn.Write([]byte("PING\r\n")); err != nil { t.Fatalf("Unexpected error writing PING: %v", err) } @@ -3640,6 +3638,10 @@ func TestLeafNodeNoPingBeforeConnect(t *testing.T) { o := DefaultOptions() o.LeafNode.Port = -1 o.LeafNode.AuthTimeout = 0.5 + // For this test we need to disable compression, because we do use + // the ping timer instead of the auth timer before the negotiation + // is complete. + o.LeafNode.Compression.Mode = CompressionOff s := RunServer(o) defer s.Shutdown() @@ -4980,3 +4982,940 @@ func TestLeafNodeTLSHandshakeFirst(t *testing.T) { reloadUpdateConfig(t, s1, confHub, fmt.Sprintf(tmpl1, "true")) checkLeafNodeConnected(t, s2) } + +func TestLeafNodeCompressionOptions(t *testing.T) { + org := testDefaultLeafNodeCompression + testDefaultLeafNodeCompression = _EMPTY_ + defer func() { testDefaultLeafNodeCompression = org }() + + tmpl := ` + port: -1 + leafnodes { + port: -1 + compression: %s + } + ` + for _, test := range []struct { + name string + mode string + rttVals []int + expected string + rtts []time.Duration + }{ + {"boolean enabled", "true", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string enabled", "enabled", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string EnaBled", "EnaBled", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string on", "on", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string ON", "ON", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string fast", "fast", nil, CompressionS2Fast, nil}, + {"string Fast", "Fast", nil, CompressionS2Fast, nil}, + {"string s2_fast", "s2_fast", nil, CompressionS2Fast, nil}, + {"string s2_Fast", "s2_Fast", nil, CompressionS2Fast, nil}, + {"boolean disabled", "false", nil, CompressionOff, nil}, + {"string disabled", "disabled", nil, CompressionOff, nil}, + {"string DisableD", "DisableD", nil, CompressionOff, nil}, + {"string off", "off", nil, CompressionOff, nil}, + {"string OFF", "OFF", nil, CompressionOff, nil}, + {"better", "better", nil, CompressionS2Better, nil}, + {"Better", "Better", nil, CompressionS2Better, nil}, + {"s2_better", "s2_better", nil, CompressionS2Better, nil}, + {"S2_BETTER", "S2_BETTER", nil, CompressionS2Better, nil}, + {"best", "best", nil, CompressionS2Best, nil}, + {"BEST", "BEST", nil, CompressionS2Best, nil}, + {"s2_best", "s2_best", nil, CompressionS2Best, nil}, + {"S2_BEST", "S2_BEST", nil, CompressionS2Best, nil}, + {"auto no rtts", "auto", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"s2_auto no rtts", "s2_auto", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"auto", "{mode: auto, rtt_thresholds: [%s]}", []int{1}, CompressionS2Auto, []time.Duration{time.Millisecond}}, + {"Auto", "{Mode: Auto, thresholds: [%s]}", []int{1, 2}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond}}, + {"s2_auto", "{mode: s2_auto, thresholds: [%s]}", []int{1, 2, 3}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond, 3 * time.Millisecond}}, + {"s2_AUTO", "{mode: s2_AUTO, thresholds: [%s]}", []int{1, 2, 3, 4}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond, 3 * time.Millisecond, 4 * time.Millisecond}}, + {"s2_auto:-10,5,10", "{mode: s2_auto, thresholds: [%s]}", []int{-10, 5, 10}, CompressionS2Auto, []time.Duration{0, 5 * time.Millisecond, 10 * time.Millisecond}}, + {"s2_auto:5,10,15", "{mode: s2_auto, thresholds: [%s]}", []int{5, 10, 15}, CompressionS2Auto, []time.Duration{5 * time.Millisecond, 10 * time.Millisecond, 15 * time.Millisecond}}, + {"s2_auto:0,5,10", "{mode: s2_auto, thresholds: [%s]}", []int{0, 5, 10}, CompressionS2Auto, []time.Duration{0, 5 * time.Millisecond, 10 * time.Millisecond}}, + {"s2_auto:5,10,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{5, 10, 0, 20}, CompressionS2Auto, []time.Duration{5 * time.Millisecond, 10 * time.Millisecond, 0, 20 * time.Millisecond}}, + {"s2_auto:0,10,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{0, 10, 0, 20}, CompressionS2Auto, []time.Duration{0, 10 * time.Millisecond, 0, 20 * time.Millisecond}}, + {"s2_auto:0,0,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{0, 0, 0, 20}, CompressionS2Auto, []time.Duration{0, 0, 0, 20 * time.Millisecond}}, + {"s2_auto:0,10,0,0", "{mode: s2_auto, rtt_thresholds: [%s]}", []int{0, 10, 0, 0}, CompressionS2Auto, []time.Duration{0, 10 * time.Millisecond}}, + } { + t.Run(test.name, func(t *testing.T) { + var val string + if len(test.rttVals) > 0 { + var rtts string + for i, v := range test.rttVals { + if i > 0 { + rtts += ", " + } + rtts += fmt.Sprintf("%dms", v) + } + val = fmt.Sprintf(test.mode, rtts) + } else { + val = test.mode + } + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, val))) + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + + if cm := o.LeafNode.Compression.Mode; cm != test.expected { + t.Fatalf("Expected compression value to be %q, got %q", test.expected, cm) + } + if !reflect.DeepEqual(test.rtts, o.LeafNode.Compression.RTTThresholds) { + t.Fatalf("Expected RTT tresholds to be %+v, got %+v", test.rtts, o.LeafNode.Compression.RTTThresholds) + } + s.Shutdown() + + o.LeafNode.Port = -1 + o.LeafNode.Compression.Mode = test.mode + if len(test.rttVals) > 0 { + o.LeafNode.Compression.Mode = CompressionS2Auto + o.LeafNode.Compression.RTTThresholds = o.LeafNode.Compression.RTTThresholds[:0] + for _, v := range test.rttVals { + o.LeafNode.Compression.RTTThresholds = append(o.LeafNode.Compression.RTTThresholds, time.Duration(v)*time.Millisecond) + } + } + s = RunServer(o) + defer s.Shutdown() + if cm := o.LeafNode.Compression.Mode; cm != test.expected { + t.Fatalf("Expected compression value to be %q, got %q", test.expected, cm) + } + if !reflect.DeepEqual(test.rtts, o.LeafNode.Compression.RTTThresholds) { + t.Fatalf("Expected RTT tresholds to be %+v, got %+v", test.rtts, o.LeafNode.Compression.RTTThresholds) + } + }) + } + + // Same, but with remotes + tmpl = ` + port: -1 + leafnodes { + port: -1 + remotes [ + { + url: "nats://127.0.0.1:1234" + compression: %s + } + ] + } + ` + for _, test := range []struct { + name string + mode string + rttVals []int + expected string + rtts []time.Duration + }{ + {"boolean enabled", "true", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string enabled", "enabled", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string EnaBled", "EnaBled", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string on", "on", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string ON", "ON", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"string fast", "fast", nil, CompressionS2Fast, nil}, + {"string Fast", "Fast", nil, CompressionS2Fast, nil}, + {"string s2_fast", "s2_fast", nil, CompressionS2Fast, nil}, + {"string s2_Fast", "s2_Fast", nil, CompressionS2Fast, nil}, + {"boolean disabled", "false", nil, CompressionOff, nil}, + {"string disabled", "disabled", nil, CompressionOff, nil}, + {"string DisableD", "DisableD", nil, CompressionOff, nil}, + {"string off", "off", nil, CompressionOff, nil}, + {"string OFF", "OFF", nil, CompressionOff, nil}, + {"better", "better", nil, CompressionS2Better, nil}, + {"Better", "Better", nil, CompressionS2Better, nil}, + {"s2_better", "s2_better", nil, CompressionS2Better, nil}, + {"S2_BETTER", "S2_BETTER", nil, CompressionS2Better, nil}, + {"best", "best", nil, CompressionS2Best, nil}, + {"BEST", "BEST", nil, CompressionS2Best, nil}, + {"s2_best", "s2_best", nil, CompressionS2Best, nil}, + {"S2_BEST", "S2_BEST", nil, CompressionS2Best, nil}, + {"auto no rtts", "auto", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"s2_auto no rtts", "s2_auto", nil, CompressionS2Auto, defaultCompressionS2AutoRTTThresholds}, + {"auto", "{mode: auto, rtt_thresholds: [%s]}", []int{1}, CompressionS2Auto, []time.Duration{time.Millisecond}}, + {"Auto", "{Mode: Auto, thresholds: [%s]}", []int{1, 2}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond}}, + {"s2_auto", "{mode: s2_auto, thresholds: [%s]}", []int{1, 2, 3}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond, 3 * time.Millisecond}}, + {"s2_AUTO", "{mode: s2_AUTO, thresholds: [%s]}", []int{1, 2, 3, 4}, CompressionS2Auto, []time.Duration{time.Millisecond, 2 * time.Millisecond, 3 * time.Millisecond, 4 * time.Millisecond}}, + {"s2_auto:-10,5,10", "{mode: s2_auto, thresholds: [%s]}", []int{-10, 5, 10}, CompressionS2Auto, []time.Duration{0, 5 * time.Millisecond, 10 * time.Millisecond}}, + {"s2_auto:5,10,15", "{mode: s2_auto, thresholds: [%s]}", []int{5, 10, 15}, CompressionS2Auto, []time.Duration{5 * time.Millisecond, 10 * time.Millisecond, 15 * time.Millisecond}}, + {"s2_auto:0,5,10", "{mode: s2_auto, thresholds: [%s]}", []int{0, 5, 10}, CompressionS2Auto, []time.Duration{0, 5 * time.Millisecond, 10 * time.Millisecond}}, + {"s2_auto:5,10,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{5, 10, 0, 20}, CompressionS2Auto, []time.Duration{5 * time.Millisecond, 10 * time.Millisecond, 0, 20 * time.Millisecond}}, + {"s2_auto:0,10,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{0, 10, 0, 20}, CompressionS2Auto, []time.Duration{0, 10 * time.Millisecond, 0, 20 * time.Millisecond}}, + {"s2_auto:0,0,0,20", "{mode: s2_auto, thresholds: [%s]}", []int{0, 0, 0, 20}, CompressionS2Auto, []time.Duration{0, 0, 0, 20 * time.Millisecond}}, + {"s2_auto:0,10,0,0", "{mode: s2_auto, rtt_thresholds: [%s]}", []int{0, 10, 0, 0}, CompressionS2Auto, []time.Duration{0, 10 * time.Millisecond}}, + } { + t.Run("remote leaf "+test.name, func(t *testing.T) { + var val string + if len(test.rttVals) > 0 { + var rtts string + for i, v := range test.rttVals { + if i > 0 { + rtts += ", " + } + rtts += fmt.Sprintf("%dms", v) + } + val = fmt.Sprintf(test.mode, rtts) + } else { + val = test.mode + } + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, val))) + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + + r := o.LeafNode.Remotes[0] + + if cm := r.Compression.Mode; cm != test.expected { + t.Fatalf("Expected compression value to be %q, got %q", test.expected, cm) + } + if !reflect.DeepEqual(test.rtts, r.Compression.RTTThresholds) { + t.Fatalf("Expected RTT tresholds to be %+v, got %+v", test.rtts, r.Compression.RTTThresholds) + } + s.Shutdown() + + o.LeafNode.Port = -1 + o.LeafNode.Remotes[0].Compression.Mode = test.mode + if len(test.rttVals) > 0 { + o.LeafNode.Remotes[0].Compression.Mode = CompressionS2Auto + o.LeafNode.Remotes[0].Compression.RTTThresholds = o.LeafNode.Remotes[0].Compression.RTTThresholds[:0] + for _, v := range test.rttVals { + o.LeafNode.Remotes[0].Compression.RTTThresholds = append(o.LeafNode.Remotes[0].Compression.RTTThresholds, time.Duration(v)*time.Millisecond) + } + } + s = RunServer(o) + defer s.Shutdown() + if cm := o.LeafNode.Remotes[0].Compression.Mode; cm != test.expected { + t.Fatalf("Expected compression value to be %q, got %q", test.expected, cm) + } + if !reflect.DeepEqual(test.rtts, o.LeafNode.Remotes[0].Compression.RTTThresholds) { + t.Fatalf("Expected RTT tresholds to be %+v, got %+v", test.rtts, o.LeafNode.Remotes[0].Compression.RTTThresholds) + } + }) + } + + // Test that with no compression specified, we default to "s2_auto" + conf := createConfFile(t, []byte(` + port: -1 + leafnodes { + port: -1 + } + `)) + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + if o.LeafNode.Compression.Mode != CompressionS2Auto { + t.Fatalf("Expected compression value to be %q, got %q", CompressionAccept, o.LeafNode.Compression.Mode) + } + if !reflect.DeepEqual(defaultCompressionS2AutoRTTThresholds, o.LeafNode.Compression.RTTThresholds) { + t.Fatalf("Expected RTT tresholds to be %+v, got %+v", defaultCompressionS2AutoRTTThresholds, o.LeafNode.Compression.RTTThresholds) + } + // Same for remotes + conf = createConfFile(t, []byte(` + port: -1 + leafnodes { + port: -1 + remotes [ { url: "nats://127.0.0.1:1234" } ] + } + `)) + s, o = RunServerWithConfig(conf) + defer s.Shutdown() + if cm := o.LeafNode.Remotes[0].Compression.Mode; cm != CompressionS2Auto { + t.Fatalf("Expected compression value to be %q, got %q", CompressionAccept, cm) + } + if !reflect.DeepEqual(defaultCompressionS2AutoRTTThresholds, o.LeafNode.Remotes[0].Compression.RTTThresholds) { + t.Fatalf("Expected RTT tresholds to be %+v, got %+v", defaultCompressionS2AutoRTTThresholds, o.LeafNode.Remotes[0].Compression.RTTThresholds) + } + for _, test := range []struct { + name string + mode string + rtts []time.Duration + err string + }{ + {"unsupported mode", "gzip", nil, "Unsupported"}, + {"not ascending order", "s2_auto", []time.Duration{ + 5 * time.Millisecond, + 10 * time.Millisecond, + 2 * time.Millisecond, + }, "ascending"}, + {"too many thresholds", "s2_auto", []time.Duration{ + 5 * time.Millisecond, + 10 * time.Millisecond, + 20 * time.Millisecond, + 40 * time.Millisecond, + 60 * time.Millisecond, + }, "more than 4"}, + {"all 0", "s2_auto", []time.Duration{0, 0, 0, 0}, "at least one"}, + {"single 0", "s2_auto", []time.Duration{0}, "at least one"}, + } { + t.Run(test.name, func(t *testing.T) { + o := DefaultOptions() + o.LeafNode.Port = -1 + o.LeafNode.Compression = CompressionOpts{test.mode, test.rtts} + if _, err := NewServer(o); err == nil || !strings.Contains(err.Error(), test.err) { + t.Fatalf("Unexpected error: %v", err) + } + // Same with remotes + o.LeafNode.Compression = CompressionOpts{} + o.LeafNode.Remotes = []*RemoteLeafOpts{{Compression: CompressionOpts{test.mode, test.rtts}}} + if _, err := NewServer(o); err == nil || !strings.Contains(err.Error(), test.err) { + t.Fatalf("Unexpected error: %v", err) + } + }) + } +} + +func TestLeafNodeCompression(t *testing.T) { + conf1 := createConfFile(t, []byte(` + port: -1 + server_name: "Hub" + accounts { + A { users: [{user: a, password: pwd}] } + B { users: [{user: b, password: pwd}] } + C { users: [{user: c, password: pwd}] } + } + leafnodes { + port: -1 + compression: s2_fast + } + `)) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + port := o1.LeafNode.Port + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "Spoke" + accounts { + A { users: [{user: a, password: pwd}] } + B { users: [{user: b, password: pwd}] } + C { users: [{user: c, password: pwd}] } + } + leafnodes { + remotes [ + { url: "nats://a:pwd@127.0.0.1:%d", account: "A", compression: s2_better } + { url: "nats://b:pwd@127.0.0.1:%d", account: "B", compression: s2_best } + { url: "nats://c:pwd@127.0.0.1:%d", account: "C", compression: off } + ] + } + `, port, port, port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkLeafNodeConnectedCount(t, s1, 3) + checkLeafNodeConnectedCount(t, s2, 3) + + s1.mu.RLock() + for _, l := range s1.leafs { + l.mu.Lock() + l.nc = &testConnSentBytes{Conn: l.nc} + l.mu.Unlock() + } + s1.mu.RUnlock() + + var payloads [][]byte + totalPayloadSize := 0 + count := 26 + for i := 0; i < count; i++ { + n := rand.Intn(2048) + 1 + p := make([]byte, n) + for j := 0; j < n; j++ { + p[j] = byte(i) + 'A' + } + totalPayloadSize += len(p) + payloads = append(payloads, p) + } + + check := func(acc, user, subj string) { + t.Helper() + nc2 := natsConnect(t, s2.ClientURL(), nats.UserInfo(user, "pwd")) + defer nc2.Close() + sub := natsSubSync(t, nc2, subj) + natsFlush(t, nc2) + checkSubInterest(t, s1, acc, subj, time.Second) + + nc1 := natsConnect(t, s1.ClientURL(), nats.UserInfo(user, "pwd")) + defer nc1.Close() + + for i := 0; i < count; i++ { + natsPub(t, nc1, subj, payloads[i]) + } + for i := 0; i < count; i++ { + m := natsNexMsg(t, sub, time.Second) + if !bytes.Equal(m.Data, payloads[i]) { + t.Fatalf("Expected payload %q - got %q", payloads[i], m.Data) + } + } + + // Also check that the leafnode stats shows that compression likely occurred + var out int + s1.mu.RLock() + for _, l := range s1.leafs { + l.mu.Lock() + if l.acc.Name == acc && l.nc != nil { + nc := l.nc.(*testConnSentBytes) + nc.Lock() + out = nc.sent + nc.sent = 0 + nc.Unlock() + } + l.mu.Unlock() + } + s1.mu.RUnlock() + // Except for account "C", where compression should be off, + // "out" should at least be smaller than totalPayloadSize, use 20%. + if acc == "C" { + if int(out) < totalPayloadSize { + t.Fatalf("Expected s1's sent bytes to be at least payload size (%v), got %v", totalPayloadSize, out) + } + } else { + limit := totalPayloadSize * 80 / 100 + if int(out) > limit { + t.Fatalf("Expected s1's sent bytes to be less than %v, got %v (total payload was %v)", limit, out, totalPayloadSize) + } + } + } + check("A", "a", "foo") + check("B", "b", "bar") + check("C", "c", "baz") + + // Check compression settings. S1 should always be s2_fast, except for account "C" + // since "C" wanted compression "off" + l, err := s1.Leafz(nil) + require_NoError(t, err) + for _, r := range l.Leafs { + switch r.Account { + case "C": + if r.Compression != CompressionOff { + t.Fatalf("Expected compression of remote for C account to be %q, got %q", CompressionOff, r.Compression) + } + default: + if r.Compression != CompressionS2Fast { + t.Fatalf("Expected compression of remote for %s account to be %q, got %q", r.Account, CompressionS2Fast, r.Compression) + } + } + } + + l, err = s2.Leafz(nil) + require_NoError(t, err) + for _, r := range l.Leafs { + switch r.Account { + case "A": + if r.Compression != CompressionS2Better { + t.Fatalf("Expected compression for A account to be %q, got %q", CompressionS2Better, r.Compression) + } + case "B": + if r.Compression != CompressionS2Best { + t.Fatalf("Expected compression for B account to be %q, got %q", CompressionS2Best, r.Compression) + } + case "C": + if r.Compression != CompressionOff { + t.Fatalf("Expected compression for C account to be %q, got %q", CompressionOff, r.Compression) + } + } + } +} + +func TestLeafNodeCompressionMatrixModes(t *testing.T) { + for _, test := range []struct { + name string + s1 string + s2 string + s1Expected string + s2Expected string + }{ + {"off off", "off", "off", CompressionOff, CompressionOff}, + {"off accept", "off", "accept", CompressionOff, CompressionOff}, + {"off on", "off", "on", CompressionOff, CompressionOff}, + {"off better", "off", "better", CompressionOff, CompressionOff}, + {"off best", "off", "best", CompressionOff, CompressionOff}, + + {"accept off", "accept", "off", CompressionOff, CompressionOff}, + {"accept accept", "accept", "accept", CompressionOff, CompressionOff}, + // Note: "on", means s2_auto, which will mean uncompressed since RTT is low. + {"accept on", "accept", "on", CompressionS2Fast, CompressionS2Uncompressed}, + {"accept better", "accept", "better", CompressionS2Better, CompressionS2Better}, + {"accept best", "accept", "best", CompressionS2Best, CompressionS2Best}, + + {"on off", "on", "off", CompressionOff, CompressionOff}, + {"on accept", "on", "accept", CompressionS2Uncompressed, CompressionS2Fast}, + {"on on", "on", "on", CompressionS2Uncompressed, CompressionS2Uncompressed}, + {"on better", "on", "better", CompressionS2Uncompressed, CompressionS2Better}, + {"on best", "on", "best", CompressionS2Uncompressed, CompressionS2Best}, + + {"better off", "better", "off", CompressionOff, CompressionOff}, + {"better accept", "better", "accept", CompressionS2Better, CompressionS2Better}, + {"better on", "better", "on", CompressionS2Better, CompressionS2Uncompressed}, + {"better better", "better", "better", CompressionS2Better, CompressionS2Better}, + {"better best", "better", "best", CompressionS2Better, CompressionS2Best}, + + {"best off", "best", "off", CompressionOff, CompressionOff}, + {"best accept", "best", "accept", CompressionS2Best, CompressionS2Best}, + {"best on", "best", "on", CompressionS2Best, CompressionS2Uncompressed}, + {"best better", "best", "better", CompressionS2Best, CompressionS2Better}, + {"best best", "best", "best", CompressionS2Best, CompressionS2Best}, + } { + t.Run(test.name, func(t *testing.T) { + conf1 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "A" + leafnodes { + port: -1 + compression: %s + } + `, test.s1))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "B" + leafnodes { + remotes: [ + {url: "nats://127.0.0.1:%d", compression: %s} + ] + } + `, o1.LeafNode.Port, test.s2))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkLeafNodeConnected(t, s2) + + nc1 := natsConnect(t, s1.ClientURL()) + defer nc1.Close() + + nc2 := natsConnect(t, s2.ClientURL()) + defer nc2.Close() + + payload := make([]byte, 128) + check := func(ncp, ncs *nats.Conn, subj string, s *Server) { + t.Helper() + sub := natsSubSync(t, ncs, subj) + checkSubInterest(t, s, globalAccountName, subj, time.Second) + natsPub(t, ncp, subj, payload) + natsNexMsg(t, sub, time.Second) + + for _, srv := range []*Server{s1, s2} { + lz, err := srv.Leafz(nil) + require_NoError(t, err) + var expected string + if srv == s1 { + expected = test.s1Expected + } else { + expected = test.s2Expected + } + if cm := lz.Leafs[0].Compression; cm != expected { + t.Fatalf("Server %s - expected compression %q, got %q", srv, expected, cm) + } + } + } + check(nc1, nc2, "foo", s1) + check(nc2, nc1, "bar", s2) + }) + } +} + +func TestLeafNodeCompressionWithOlderServer(t *testing.T) { + tmpl1 := ` + port: -1 + server_name: "A" + leafnodes { + port: -1 + compression: "%s" + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl1, CompressionS2Fast))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + tmpl2 := ` + port: -1 + server_name: "B" + leafnodes { + remotes [ + {url: "nats://127.0.0.1:%d", compression: "%s"} + ] + } + ` + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl2, o1.LeafNode.Port, CompressionNotSupported))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkLeafNodeConnected(t, s2) + + getLeafCompMode := func(s *Server) string { + var cm string + s.mu.RLock() + defer s.mu.RUnlock() + for _, l := range s1.leafs { + l.mu.Lock() + cm = l.leaf.compression + l.mu.Unlock() + return cm + } + return _EMPTY_ + } + for _, s := range []*Server{s1, s2} { + if cm := getLeafCompMode(s); cm != CompressionNotSupported { + t.Fatalf("Expected compression not supported, got %q", cm) + } + } + + s2.Shutdown() + s1.Shutdown() + + conf1 = createConfFile(t, []byte(fmt.Sprintf(tmpl1, CompressionNotSupported))) + s1, o1 = RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 = createConfFile(t, []byte(fmt.Sprintf(tmpl2, o1.LeafNode.Port, CompressionS2Fast))) + s2, _ = RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkLeafNodeConnected(t, s2) + for _, s := range []*Server{s1, s2} { + if cm := getLeafCompMode(s); cm != CompressionNotSupported { + t.Fatalf("Expected compression not supported, got %q", cm) + } + } +} + +func TestLeafNodeCompressionAuto(t *testing.T) { + for _, test := range []struct { + name string + s1Ping string + s1Compression string + s2Ping string + s2Compression string + checkS1 bool + }{ + {"remote side", "10s", CompressionS2Fast, "100ms", "{mode: s2_auto, rtt_thresholds: [10ms, 20ms, 30ms]}", false}, + {"accept side", "100ms", "{mode: s2_auto, rtt_thresholds: [10ms, 20ms, 30ms]}", "10s", CompressionS2Fast, true}, + } { + t.Run(test.name, func(t *testing.T) { + conf1 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "A" + ping_interval: "%s" + leafnodes { + port: -1 + compression: %s + } + `, test.s1Ping, test.s1Compression))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + // Start with 0ms RTT + np := createNetProxy(0, 1024*1024*1024, 1024*1024*1024, fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port), true) + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "B" + ping_interval: "%s" + leafnodes { + remotes [ + {url: %s, compression %s} + ] + } + `, test.s2Ping, np.routeURL(), test.s2Compression))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + defer np.stop() + + checkLeafNodeConnected(t, s2) + + checkComp := func(expected string) { + t.Helper() + var s *Server + if test.checkS1 { + s = s1 + } else { + s = s2 + } + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + s.mu.RLock() + defer s.mu.RUnlock() + for _, l := range s.leafs { + l.mu.Lock() + cm := l.leaf.compression + l.mu.Unlock() + if cm != expected { + return fmt.Errorf("Leaf %v compression mode expected to be %q, got %q", l, expected, cm) + } + } + return nil + }) + } + checkComp(CompressionS2Uncompressed) + + // Change the proxy RTT and we should get compression "fast" + np.updateRTT(15 * time.Millisecond) + checkComp(CompressionS2Fast) + + // Now 25ms, and get "better" + np.updateRTT(25 * time.Millisecond) + checkComp(CompressionS2Better) + + // Above 35 and we should get "best" + np.updateRTT(35 * time.Millisecond) + checkComp(CompressionS2Best) + + // Down to 1ms and again should get "uncompressed" + np.updateRTT(1 * time.Millisecond) + checkComp(CompressionS2Uncompressed) + }) + } + + // Make sure that if compression is off on one side, the update of RTT does + // not trigger a compression change. + conf1 := createConfFile(t, []byte(` + port: -1 + server_name: "A" + leafnodes { + port: -1 + compression: off + } + `)) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + // Start with 0ms RTT + np := createNetProxy(0, 1024*1024*1024, 1024*1024*1024, fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port), true) + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "B" + ping_interval: "50ms" + leafnodes { + remotes [ + {url: %s, compression s2_auto} + ] + } + `, np.routeURL()))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + defer np.stop() + + checkLeafNodeConnected(t, s2) + + // Even with a bug of updating compression level while it should have been + // off, the check done below would almost always pass because after + // reconnecting, there could be a chance to get at first compression set + // to "off". So we will double check that the leaf node CID did not change + // at the end of the test. + getCID := func() uint64 { + s2.mu.RLock() + defer s2.mu.RUnlock() + for _, l := range s2.leafs { + l.mu.Lock() + cid := l.cid + l.mu.Unlock() + return cid + } + return 0 + } + oldCID := getCID() + + checkCompOff := func() { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + s2.mu.RLock() + defer s2.mu.RUnlock() + if len(s2.leafs) != 1 { + return fmt.Errorf("Leaf not currently connected") + } + for _, l := range s2.leafs { + l.mu.Lock() + cm := l.leaf.compression + l.mu.Unlock() + if cm != CompressionOff { + return fmt.Errorf("Leaf %v compression mode expected to be %q, got %q", l, CompressionOff, cm) + } + } + return nil + }) + } + checkCompOff() + + // Now change RTT and again, make sure that it is still off + np.updateRTT(20 * time.Millisecond) + time.Sleep(100 * time.Millisecond) + checkCompOff() + if cid := getCID(); cid != oldCID { + t.Fatalf("Leafnode has reconnected, cid was %v, now %v", oldCID, cid) + } +} + +func TestLeafNodeCompressionWithWSCompression(t *testing.T) { + conf1 := createConfFile(t, []byte(` + port: -1 + server_name: "A" + websocket { + port: -1 + no_tls: true + compression: true + } + leafnodes { + port: -1 + compression: s2_fast + } + `)) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "B" + leafnodes { + remotes [ + { + url: "ws://127.0.0.1:%d" + ws_compression: true + compression: s2_fast + } + ] + } + `, o1.Websocket.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkLeafNodeConnected(t, s2) + + nc1 := natsConnect(t, s1.ClientURL()) + defer nc1.Close() + + sub := natsSubSync(t, nc1, "foo") + checkSubInterest(t, s2, globalAccountName, "foo", time.Second) + + nc2 := natsConnect(t, s2.ClientURL()) + defer nc2.Close() + + payload := make([]byte, 1024) + for i := 0; i < len(payload); i++ { + payload[i] = 'A' + } + natsPub(t, nc2, "foo", payload) + msg := natsNexMsg(t, sub, time.Second) + require_True(t, len(msg.Data) == 1024) + for i := 0; i < len(msg.Data); i++ { + if msg.Data[i] != 'A' { + t.Fatalf("Invalid msg: %s", msg.Data) + } + } +} + +func TestLeafNodeCompressionWithWSGetNeedsData(t *testing.T) { + conf1 := createConfFile(t, []byte(` + port: -1 + server_name: "A" + websocket { + port: -1 + no_tls: true + } + leafnodes { + port: -1 + compression: s2_fast + } + `)) + srv1, o1 := RunServerWithConfig(conf1) + defer srv1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "B" + leafnodes { + remotes [ + { + url: "ws://127.0.0.1:%d" + ws_no_masking: true + compression: s2_fast + } + ] + } + `, o1.Websocket.Port))) + srv2, _ := RunServerWithConfig(conf2) + defer srv2.Shutdown() + + checkLeafNodeConnected(t, srv2) + + nc1 := natsConnect(t, srv1.ClientURL()) + defer nc1.Close() + + sub := natsSubSync(t, nc1, "foo") + checkSubInterest(t, srv2, globalAccountName, "foo", time.Second) + + // We want to have the payload more than 126 bytes so that the websocket + // code need to read 2 bytes for the length. See below. + payload := "ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZ" + sentBytes := []byte("LMSG foo 156\r\n" + payload + "\r\n") + h, _ := wsCreateFrameHeader(false, false, wsBinaryMessage, len(sentBytes)) + combined := &bytes.Buffer{} + combined.Write(h) + combined.Write(sentBytes) + toSend := combined.Bytes() + + // We will make a compressed block that cuts the websocket header that + // makes the reader want to read bytes directly from the connection. + // We want to make sure that we are not going to get compressed data + // without going through the (de)compress library. So for that, compress + // the first 3 bytes. + b := &bytes.Buffer{} + w := s2.NewWriter(b) + w.Write(toSend[:3]) + w.Close() + + var nc net.Conn + srv2.mu.RLock() + for _, l := range srv2.leafs { + l.mu.Lock() + nc = l.nc + l.mu.Unlock() + } + srv2.mu.RUnlock() + + nc.Write(b.Bytes()) + + // Pause to make sure other side just gets a partial of the whole WS frame. + time.Sleep(100 * time.Millisecond) + + b.Reset() + w.Reset(b) + w.Write(toSend[3:]) + w.Close() + + nc.Write(b.Bytes()) + + msg := natsNexMsg(t, sub, time.Second) + require_True(t, len(msg.Data) == 156) + require_Equal(t, string(msg.Data), payload) +} + +func TestLeafNodePings(t *testing.T) { + connWithCompressionMaxPingInterval = 50 * time.Millisecond + defer func() { connWithCompressionMaxPingInterval = defaultConnWithCompressionMaxPingInterval }() + + o1 := DefaultOptions() + o1.LeafNode.Port = -1 + s1 := RunServer(o1) + defer s1.Shutdown() + + ua, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port)) + o2 := DefaultOptions() + o2.Cluster.Name = "spoke" + o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}} + s2 := RunServer(o2) + defer s2.Shutdown() + + checkLeafNodeConnected(t, s2) + + ch := make(chan struct{}, 1) + s1.mu.RLock() + for _, l := range s1.leafs { + l.mu.Lock() + l.nc = &capturePingConn{l.nc, ch} + l.mu.Unlock() + break + } + s1.mu.RUnlock() + + for i := 0; i < 5; i++ { + select { + case <-ch: + case <-time.After(250 * time.Millisecond): + t.Fatalf("Did not send PING") + } + } +} diff --git a/server/monitor.go b/server/monitor.go index a4151c47..6799db8d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2102,18 +2102,19 @@ type LeafzOptions struct { // LeafInfo has detailed information on each remote leafnode connection. type LeafInfo struct { - Name string `json:"name"` - IsSpoke bool `json:"is_spoke"` - Account string `json:"account"` - IP string `json:"ip"` - Port int `json:"port"` - RTT string `json:"rtt,omitempty"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - NumSubs uint32 `json:"subscriptions"` - Subs []string `json:"subscriptions_list,omitempty"` + Name string `json:"name"` + IsSpoke bool `json:"is_spoke"` + Account string `json:"account"` + IP string `json:"ip"` + Port int `json:"port"` + RTT string `json:"rtt,omitempty"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + NumSubs uint32 `json:"subscriptions"` + Subs []string `json:"subscriptions_list,omitempty"` + Compression string `json:"compression,omitempty"` } // Leafz returns a Leafz structure containing information about leafnodes. @@ -2143,17 +2144,18 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) { for _, ln := range lconns { ln.mu.Lock() lni := &LeafInfo{ - Name: ln.leaf.remoteServer, - IsSpoke: ln.isSpokeLeafNode(), - Account: ln.acc.Name, - IP: ln.host, - Port: int(ln.port), - RTT: ln.getRTT().String(), - InMsgs: atomic.LoadInt64(&ln.inMsgs), - OutMsgs: ln.outMsgs, - InBytes: atomic.LoadInt64(&ln.inBytes), - OutBytes: ln.outBytes, - NumSubs: uint32(len(ln.subs)), + Name: ln.leaf.remoteServer, + IsSpoke: ln.isSpokeLeafNode(), + Account: ln.acc.Name, + IP: ln.host, + Port: int(ln.port), + RTT: ln.getRTT().String(), + InMsgs: atomic.LoadInt64(&ln.inMsgs), + OutMsgs: ln.outMsgs, + InBytes: atomic.LoadInt64(&ln.inBytes), + OutBytes: ln.outBytes, + NumSubs: uint32(len(ln.subs)), + Compression: ln.leaf.compression, } if opts != nil && opts.Subscriptions { lni.Subs = make([]string, 0, len(ln.subs)) diff --git a/server/opts.go b/server/opts.go index b445a22e..2b69413c 100644 --- a/server/opts.go +++ b/server/opts.go @@ -158,6 +158,9 @@ type LeafNodeOpts struct { NoAdvertise bool `json:"-"` ReconnectInterval time.Duration `json:"-"` + // Compression options + Compression CompressionOpts `json:"-"` + // For solicited connections to other clusters/superclusters. Remotes []*RemoteLeafOpts `json:"remotes,omitempty"` @@ -197,6 +200,10 @@ type RemoteLeafOpts struct { DenyImports []string `json:"-"` DenyExports []string `json:"-"` + // Compression options for this remote. Each remote could have a different + // setting and also be different from the LeafNode options. + Compression CompressionOpts `json:"-"` + // When an URL has the "ws" (or "wss") scheme, then the server will initiate the // connection as a websocket connection. By default, the websocket frames will be // masked (as if this server was a websocket client to the remote server). The @@ -1642,7 +1649,7 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err case "accounts": opts.Cluster.PinnedAccounts, _ = parseStringArray("accounts", tk, <, mv, errors, warnings) case "compression": - if err := parseCompression(&opts.Cluster.Compression, tk, mk, mv); err != nil { + if err := parseCompression(&opts.Cluster.Compression, CompressionS2Fast, tk, mk, mv); err != nil { *errors = append(*errors, err) continue } @@ -1662,7 +1669,10 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err return nil } -func parseCompression(c *CompressionOpts, tk token, mk string, mv interface{}) (retErr error) { +// The parameter `chosenModeForOn` indicates which compression mode to use +// when the user selects "on" (or enabled, true, etc..). This is because +// we may have different defaults depending on where the compression is used. +func parseCompression(c *CompressionOpts, chosenModeForOn string, tk token, mk string, mv interface{}) (retErr error) { var lt token defer convertPanicToError(<, &retErr) @@ -1672,7 +1682,7 @@ func parseCompression(c *CompressionOpts, tk token, mk string, mv interface{}) ( c.Mode = mv case bool: if mv { - c.Mode = CompressionS2Fast + c.Mode = chosenModeForOn } else { c.Mode = CompressionOff } @@ -2191,6 +2201,11 @@ func parseLeafNodes(v interface{}, opts *Options, errors *[]error, warnings *[]e continue } opts.LeafNode.MinVersion = version + case "compression": + if err := parseCompression(&opts.LeafNode.Compression, CompressionS2Auto, tk, mk, mv); err != nil { + *errors = append(*errors, err) + continue + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2416,6 +2431,11 @@ func parseRemoteLeafNodes(v interface{}, errors *[]error, warnings *[]error) ([] remote.Websocket.NoMasking = v.(bool) case "jetstream_cluster_migrate", "js_cluster_migrate": remote.JetStreamClusterMigrate = true + case "compression": + if err := parseCompression(&remote.Compression, CompressionS2Auto, tk, k, v); err != nil { + *errors = append(*errors, err) + continue + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -4814,6 +4834,14 @@ func setBaselineOptions(opts *Options) { if opts.LeafNode.AuthTimeout == 0 { opts.LeafNode.AuthTimeout = getDefaultAuthTimeout(opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout) } + // Default to compression "s2_auto". + if c := &opts.LeafNode.Compression; c.Mode == _EMPTY_ { + if testDefaultLeafNodeCompression != _EMPTY_ { + c.Mode = testDefaultLeafNodeCompression + } else { + c.Mode = CompressionS2Auto + } + } } // Set baseline connect port for remotes. for _, r := range opts.LeafNode.Remotes { @@ -4823,6 +4851,14 @@ func setBaselineOptions(opts *Options) { u.Host = net.JoinHostPort(u.Host, strconv.Itoa(DEFAULT_LEAFNODE_PORT)) } } + // Default to compression "s2_auto". + if c := &r.Compression; c.Mode == _EMPTY_ { + if testDefaultLeafNodeCompression != _EMPTY_ { + c.Mode = testDefaultLeafNodeCompression + } else { + c.Mode = CompressionS2Auto + } + } } } diff --git a/server/reload.go b/server/reload.go index ba3a7c7e..c7c7cbe7 100644 --- a/server/reload.go +++ b/server/reload.go @@ -383,7 +383,8 @@ func (c *clusterOption) Apply(s *Server) { s.setRouteInfoHostPortAndIP() var routes []*client if c.compressChanged { - newMode := s.getOpts().Cluster.Compression.Mode + co := &s.getOpts().Cluster.Compression + newMode := co.Mode s.forEachRoute(func(r *client) { r.mu.Lock() // Skip routes that are "not supported" (because they will never do @@ -398,6 +399,11 @@ func (c *clusterOption) Apply(s *Server) { // these require negotiation. if r.route.compression == CompressionOff || newMode == CompressionOff || newMode == CompressionAccept { routes = append(routes, r) + } else if newMode == CompressionS2Auto { + // If the mode is "s2_auto", we need to check if there is really + // need to change, and at any rate, we want to save the actual + // compression level here, not s2_auto. + r.updateS2AutoCompressionLevel(co, &r.route.compression) } else { // Simply change the compression writer r.out.cw = s2.NewWriter(nil, s2WriterOptions(newMode)...) @@ -791,13 +797,79 @@ func (o *mqttInactiveThresholdReload) Apply(s *Server) { type leafNodeOption struct { noopOption + tlsFirstChanged bool + compressionChanged bool } func (l *leafNodeOption) Apply(s *Server) { opts := s.getOpts() - s.Noticef("Reloaded: LeafNode TLS HandshakeFirst value is: %v", opts.LeafNode.TLSHandshakeFirst) - for _, r := range opts.LeafNode.Remotes { - s.Noticef("Reloaded: LeafNode Remote to %v TLS HandshakeFirst value is: %v", r.URLs, r.TLSHandshakeFirst) + if l.tlsFirstChanged { + s.Noticef("Reloaded: LeafNode TLS HandshakeFirst value is: %v", opts.LeafNode.TLSHandshakeFirst) + for _, r := range opts.LeafNode.Remotes { + s.Noticef("Reloaded: LeafNode Remote to %v TLS HandshakeFirst value is: %v", r.URLs, r.TLSHandshakeFirst) + } + } + if l.compressionChanged { + var leafs []*client + acceptSideCompOpts := &opts.LeafNode.Compression + + s.mu.RLock() + // First, update our internal leaf remote configurations with the new + // compress options. + // Since changing the remotes (as in adding/removing) is currently not + // supported, we know that we should have the same number in Options + // than in leafRemoteCfgs, but to be sure, use the max size. + max := len(opts.LeafNode.Remotes) + if l := len(s.leafRemoteCfgs); l < max { + max = l + } + for i := 0; i < max; i++ { + lr := s.leafRemoteCfgs[i] + lr.Lock() + lr.Compression = opts.LeafNode.Remotes[i].Compression + lr.Unlock() + } + + for _, l := range s.leafs { + var co *CompressionOpts + + l.mu.Lock() + if r := l.leaf.remote; r != nil { + co = &r.Compression + } else { + co = acceptSideCompOpts + } + newMode := co.Mode + // Skip leaf connections that are "not supported" (because they + // will never do compression) or the ones that have already the + // new compression mode. + if l.leaf.compression == CompressionNotSupported || l.leaf.compression == newMode { + l.mu.Unlock() + continue + } + // We need to close the connections if it had compression "off" or the new + // mode is compression "off", or if the new mode is "accept", because + // these require negotiation. + if l.leaf.compression == CompressionOff || newMode == CompressionOff || newMode == CompressionAccept { + leafs = append(leafs, l) + } else if newMode == CompressionS2Auto { + // If the mode is "s2_auto", we need to check if there is really + // need to change, and at any rate, we want to save the actual + // compression level here, not s2_auto. + l.updateS2AutoCompressionLevel(co, &l.leaf.compression) + } else { + // Simply change the compression writer + l.out.cw = s2.NewWriter(nil, s2WriterOptions(newMode)...) + l.leaf.compression = newMode + } + l.mu.Unlock() + } + s.mu.RUnlock() + // Close the connections for which negotiation is required. + for _, l := range leafs { + l.closeConnection(ClientClosed) + } + s.Noticef("Reloaded: LeafNode compression settings") } } @@ -1262,6 +1334,20 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { } } } + // We also support config reload for compression. Check if it changed before + // blanking them out for the deep-equal check at the end. + compressionChanged := !reflect.DeepEqual(tmpOld.Compression, tmpNew.Compression) + if compressionChanged { + tmpOld.Compression, tmpNew.Compression = CompressionOpts{}, CompressionOpts{} + } else if len(tmpOld.Remotes) == len(tmpNew.Remotes) { + // Same that for tls first check, do the remotes now. + for i := 0; i < len(tmpOld.Remotes); i++ { + if !reflect.DeepEqual(tmpOld.Remotes[i].Compression, tmpNew.Remotes[i].Compression) { + compressionChanged = true + break + } + } + } // Need to do the same for remote leafnodes' TLS configs. // But we can't just set remotes' TLSConfig to nil otherwise this @@ -1351,11 +1437,10 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { field.Name, oldValue, newValue) } - // If we detected a change in TLSHandshakeFirst, then let's add to - // the diffOpts so that we can print that we change that. - if handshakeFirstChanged { - diffOpts = append(diffOpts, &leafNodeOption{}) - } + diffOpts = append(diffOpts, &leafNodeOption{ + tlsFirstChanged: handshakeFirstChanged, + compressionChanged: compressionChanged, + }) case "jetstream": new := newValue.(bool) old := oldValue.(bool) @@ -1549,6 +1634,8 @@ func copyRemoteLNConfigForReloadCompare(current []*RemoteLeafOpts) []*RemoteLeaf // For now, remove DenyImports/Exports since those get modified at runtime // to add JS APIs. cp.DenyImports, cp.DenyExports = nil, nil + // Remove compression mode + cp.Compression = CompressionOpts{} rlns = append(rlns, &cp) } return rlns diff --git a/server/reload_test.go b/server/reload_test.go index 7df0db96..66875088 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -20,6 +20,7 @@ import ( "encoding/json" "flag" "fmt" + "io" "log" "net" "net/http" @@ -5411,7 +5412,6 @@ func TestConfigReloadRouteCompression(t *testing.T) { } s1RouteIDs := collect(s1) s2RouteIDs := collect(s2) - s3RouteIDs := collect(s3) s3ID := s3.ID() servers := []*Server{s1, s2} @@ -5437,12 +5437,14 @@ func TestConfigReloadRouteCompression(t *testing.T) { var m map[uint64]struct{} if r.route.remoteID == s3ID { exp = CompressionNotSupported - m = s3RouteIDs } else if s == s1 { exp = s1Expected - m = s1RouteIDs } else { exp = s2Expected + } + if s == s1 { + m = s1RouteIDs + } else { m = s2RouteIDs } _, present := m[r.cid] @@ -5469,7 +5471,6 @@ func TestConfigReloadRouteCompression(t *testing.T) { } s1RouteIDs = collect(s1) s2RouteIDs = collect(s2) - s3RouteIDs = collect(s3) return nil }) } @@ -5478,7 +5479,7 @@ func TestConfigReloadRouteCompression(t *testing.T) { // but not initiate compression, so they should both be "off" checkCompMode(CompressionOff, CompressionOff, false) - // Now relead s1 with "on" (s2_fast), since s2 is *configured* with "accept", + // Now reload s1 with "on" (s2_fast), since s2 is *configured* with "accept", // they should both be CompressionS2Fast, even before we reload s2. reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", _EMPTY_, "compression: on")) checkCompMode(CompressionS2Fast, CompressionS2Fast, true) @@ -5534,3 +5535,312 @@ func TestConfigReloadRouteCompression(t *testing.T) { reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", routes, _EMPTY_)) checkCompMode(CompressionOff, CompressionOff, true) } + +func TestConfigReloadRouteCompressionS2Auto(t *testing.T) { + // This test checks s2_auto specific behavior. It makes sure that we update + // only if the rtt_thresholds and current RTT value warrants a change and + // also that we actually save in c.route.compression the actual compression + // level (not s2_auto). + tmpl1 := ` + port: -1 + server_name: "A" + cluster { + port: -1 + name: "local" + pool_size: -1 + compression: {mode: s2_auto, rtt_thresholds: [%s]} + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl1, "50ms, 100ms, 150ms"))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "B" + cluster { + port: -1 + name: "local" + pool_size: -1 + compression: s2_fast + routes: ["nats://127.0.0.1:%d"] + } + `, o1.Cluster.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + getCompInfo := func() (string, io.Writer) { + var cm string + var cw io.Writer + s1.mu.RLock() + // There should be only 1 route... + s1.forEachRemote(func(r *client) { + r.mu.Lock() + cm = r.route.compression + cw = r.out.cw + r.mu.Unlock() + }) + s1.mu.RUnlock() + return cm, cw + } + // Capture the s2 writer from s1 to s2 + cm, cw := getCompInfo() + + // We do a reload but really the mode is still s2_auto (even if the current + // compression level may be "uncompressed", "better", etc.. so we don't + // expect the writer to have changed. + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "100ms, 200ms, 300ms")) + if ncm, ncw := getCompInfo(); ncm != cm || ncw != cw { + t.Fatalf("Expected compression info to have stayed the same, was %q - %p, got %q - %p", cm, cw, ncm, ncw) + } +} + +func TestConfigReloadLeafNodeCompression(t *testing.T) { + org := testDefaultLeafNodeCompression + testDefaultLeafNodeCompression = _EMPTY_ + defer func() { testDefaultLeafNodeCompression = org }() + + tmpl1 := ` + port: -1 + server_name: "A" + leafnodes { + port: -1 + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl1, "compression: accept"))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + port := o1.LeafNode.Port + + tmpl2 := ` + port: -1 + server_name: "%s" + leafnodes { + remotes [ + { + url: "nats://127.0.0.1:%d" + %s + } + ] + } + ` + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl2, "B", port, "compression: accept"))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + // Run a 3rd server but make it as if it was an old server. We want to + // make sure that reload of s1 and s2 will not affect leafnodes from s3 to + // s1/s2 because these do not support compression. + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmpl2, "C", port, "compression: \"not supported\""))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkLeafNodeConnected(t, s2) + checkLeafNodeConnected(t, s3) + checkLeafNodeConnectedCount(t, s1, 2) + + // Collect leafnodes' cid from servers so we can check if connections are + // recreated when they should and are not when they should not. + collect := func(s *Server) map[uint64]struct{} { + m := make(map[uint64]struct{}) + s.mu.RLock() + defer s.mu.RUnlock() + for _, l := range s.leafs { + l.mu.Lock() + m[l.cid] = struct{}{} + l.mu.Unlock() + } + return m + } + s1LeafNodeIDs := collect(s1) + s2LeafNodeIDs := collect(s2) + + servers := []*Server{s1, s2} + checkCompMode := func(s1Expected, s2Expected string, shouldBeNew bool) { + t.Helper() + // We wait a bit to make sure that we have leaf connections closed + // before checking that they are properly reconnected. + time.Sleep(100 * time.Millisecond) + checkLeafNodeConnected(t, s2) + checkLeafNodeConnected(t, s3) + checkLeafNodeConnectedCount(t, s1, 2) + // Check that all leafnodes are with the expected mode. We need to + // possibly wait a bit since there is negotiation going on. + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + for _, s := range servers { + var err error + s.mu.RLock() + for _, l := range s.leafs { + l.mu.Lock() + var exp string + var m map[uint64]struct{} + if l.leaf.remoteServer == "C" { + exp = CompressionNotSupported + } else if s == s1 { + exp = s1Expected + } else { + exp = s2Expected + } + if s == s1 { + m = s1LeafNodeIDs + } else { + m = s2LeafNodeIDs + } + _, present := m[l.cid] + cm := l.leaf.compression + l.mu.Unlock() + if cm != exp { + err = fmt.Errorf("Expected leaf %v for server %s to have compression mode %q, got %q", l, s, exp, cm) + } + sbn := shouldBeNew + if exp == CompressionNotSupported { + // Override for routes to s3 + sbn = false + } + if sbn && present { + err = fmt.Errorf("Expected leaf %v for server %s to be a new leaf, but it was already present", l, s) + } else if !sbn && !present { + err = fmt.Errorf("Expected leaf %v for server %s to not be new", l, s) + } + if err != nil { + break + } + } + s.mu.RUnlock() + if err != nil { + return err + } + } + s1LeafNodeIDs = collect(s1) + s2LeafNodeIDs = collect(s2) + return nil + }) + } + // Since both started with compression "accept", they should both be set to "off" + checkCompMode(CompressionOff, CompressionOff, false) + + // Now reload s1 with "on" (s2_auto), since s2 is *configured* with "accept", + // s1 should be "uncompressed" (due to low RTT), and s2 is in that case set + // to s2_fast. + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "compression: on")) + checkCompMode(CompressionS2Uncompressed, CompressionS2Fast, true) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl2, "B", port, "compression: on")) + checkCompMode(CompressionS2Uncompressed, CompressionS2Uncompressed, false) + + // Move on with "better" + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "compression: s2_better")) + // s1 should be at "better", but s2 still at "uncompressed" + checkCompMode(CompressionS2Better, CompressionS2Uncompressed, false) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl2, "B", port, "compression: s2_better")) + checkCompMode(CompressionS2Better, CompressionS2Better, false) + + // Move to "best" + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "compression: s2_best")) + checkCompMode(CompressionS2Best, CompressionS2Better, false) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl2, "B", port, "compression: s2_best")) + checkCompMode(CompressionS2Best, CompressionS2Best, false) + + // Now turn off + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "compression: off")) + checkCompMode(CompressionOff, CompressionOff, true) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl2, "B", port, "compression: off")) + checkCompMode(CompressionOff, CompressionOff, false) + + // When "off" (and not "accept"), enabling 1 is not enough, the reload + // has to be done on both to take effect. + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "compression: s2_better")) + checkCompMode(CompressionOff, CompressionOff, true) + // Now reload s2 + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl2, "B", port, "compression: s2_better")) + checkCompMode(CompressionS2Better, CompressionS2Better, true) + + // Try now to have different ones + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "compression: s2_best")) + // S1 should be "best" but S2 should have stayed at "better" + checkCompMode(CompressionS2Best, CompressionS2Better, false) + + // Change the setting to "accept", which in that case we want to have a + // negotiation and use the remote's compression level. So connections + // should be re-created. + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "compression: accept")) + checkCompMode(CompressionS2Better, CompressionS2Better, true) + + // To avoid flapping, add a little sleep here to make sure we have things + // settled before reloading s2. + time.Sleep(100 * time.Millisecond) + // And if we do the same with s2, then we will end-up with no compression. + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl2, "B", port, "compression: accept")) + checkCompMode(CompressionOff, CompressionOff, true) + + // Now remove completely and we should default to s2_auto, which means that + // s1 should be at "uncompressed" and s2 to "fast". + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, _EMPTY_)) + checkCompMode(CompressionS2Uncompressed, CompressionS2Fast, true) + + // Now with s2, both will be "uncompressed" + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl2, "B", port, _EMPTY_)) + checkCompMode(CompressionS2Uncompressed, CompressionS2Uncompressed, false) +} + +func TestConfigReloadLeafNodeCompressionS2Auto(t *testing.T) { + // This test checks s2_auto specific behavior. It makes sure that we update + // only if the rtt_thresholds and current RTT value warrants a change and + // also that we actually save in c.leaf.compression the actual compression + // level (not s2_auto). + tmpl1 := ` + port: -1 + server_name: "A" + leafnodes { + port: -1 + compression: {mode: s2_auto, rtt_thresholds: [%s]} + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl1, "50ms, 100ms, 150ms"))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "B" + leafnodes { + remotes [{ url: "nats://127.0.0.1:%d", compression: s2_fast}] + } + `, o1.LeafNode.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkLeafNodeConnected(t, s2) + + getCompInfo := func() (string, io.Writer) { + var cm string + var cw io.Writer + s1.mu.RLock() + // There should be only 1 leaf... + for _, l := range s1.leafs { + l.mu.Lock() + cm = l.leaf.compression + cw = l.out.cw + l.mu.Unlock() + } + s1.mu.RUnlock() + return cm, cw + } + // Capture the s2 writer from s1 to s2 + cm, cw := getCompInfo() + + // We do a reload but really the mode is still s2_auto (even if the current + // compression level may be "uncompressed", "better", etc.. so we don't + // expect the writer to have changed. + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl1, "100ms, 200ms, 300ms")) + if ncm, ncw := getCompInfo(); ncm != cm || ncw != cw { + t.Fatalf("Expected compression info to have stayed the same, was %q - %p, got %q - %p", cm, cw, ncm, ncw) + } +} diff --git a/server/route.go b/server/route.go index ad24fefa..d7b3938e 100644 --- a/server/route.go +++ b/server/route.go @@ -129,15 +129,6 @@ const ( // Can be changed for tests var routeConnectDelay = DEFAULT_ROUTE_CONNECT -// The default ping interval is set to 2 minutes, which is fine for client -// connections, etc.. but since for route compression, the CompressionS2Auto -// mode uses RTT measurements (ping/pong) to decide which compression level -// to use, we want the interval to not be that high. -const defaultRouteMaxPingInterval = 30 * time.Second - -// Can be changed for tests -var routeMaxPingInterval = defaultRouteMaxPingInterval - // removeReplySub is called when we trip the max on remoteReply subs. func (c *client) removeReplySub(sub *subscription) { if sub == nil { @@ -709,7 +700,7 @@ func (c *client) processRouteInfo(info *Info) { c.flags.set(compressionNegotiated) // Release client lock since following function will need server lock. c.mu.Unlock() - compress, err := s.negotiateRouteCompression(c, didSolicit, accName, info, opts) + compress, err := s.negotiateRouteCompression(c, didSolicit, accName, info.Compression, opts) if err != nil { c.sendErrAndErr(err.Error()) c.closeConnection(ProtocolViolation) @@ -727,7 +718,7 @@ func (c *client) processRouteInfo(info *Info) { // where it was not already sent. c.setFirstPingTimer() if !routeShouldDelayInfo(accName, opts) { - cm := routeCompressionModeForInfoProtocol(opts, c.route.compression) + cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) // Need to release and then reacquire... c.mu.Unlock() s.sendDelayedRouteInfo(c, accName, cm) @@ -836,9 +827,9 @@ func (c *client) processRouteInfo(info *Info) { } } -func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName string, info *Info, opts *Options) (bool, error) { +func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName, infoCompression string, opts *Options) (bool, error) { // Negotiate the appropriate compression mode (or no compression) - cm, err := selectCompressionMode(opts.Cluster.Compression.Mode, info.Compression) + cm, err := selectCompressionMode(opts.Cluster.Compression.Mode, infoCompression) if err != nil { return false, err } @@ -894,21 +885,12 @@ func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName s // If this is a solicited route, we need to send the INFO if it was not // done during createRoute() and will not be done in addRoute(). if didSolicit && !routeShouldDelayInfo(accName, opts) { - cm = routeCompressionModeForInfoProtocol(opts, cm) + cm = compressionModeForInfoProtocol(&opts.Cluster.Compression, cm) s.sendDelayedRouteInfo(c, accName, cm) } return false, nil } -// If the configured compression mode is "auto" then will return that, -// otherwise will return the given `cm` compression mode. -func routeCompressionModeForInfoProtocol(opts *Options, cm string) string { - if opts.Cluster.Compression.Mode == CompressionS2Auto { - return CompressionS2Auto - } - return cm -} - func (s *Server) sendDelayedRouteInfo(c *client, accName, cm string) { s.mu.Lock() infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0) @@ -2358,8 +2340,8 @@ func (s *Server) startRouteAcceptLoop() { } // For tests that want to simulate old servers, do not set the compression // on the INFO protocol if configured with CompressionNotSupported. - if opts.Cluster.Compression.Mode != CompressionNotSupported { - info.Compression = opts.Cluster.Compression.Mode + if cm := opts.Cluster.Compression.Mode; cm != CompressionNotSupported { + info.Compression = cm } if ps := opts.Cluster.PoolSize; ps > 0 { info.RoutePoolSize = ps diff --git a/server/routes_test.go b/server/routes_test.go index 04ddda68..31cbbca0 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -3209,8 +3209,8 @@ func TestRouteCompressionOptions(t *testing.T) { s, o := RunServerWithConfig(conf) defer s.Shutdown() - if o.Cluster.Compression.Mode != test.expected { - t.Fatalf("Expected compression value to be %q, got %q", test.expected, o.Cluster.Compression) + if cm := o.Cluster.Compression.Mode; cm != test.expected { + t.Fatalf("Expected compression value to be %q, got %q", test.expected, cm) } if !reflect.DeepEqual(test.rtts, o.Cluster.Compression.RTTThresholds) { t.Fatalf("Expected RTT tresholds to be %+v, got %+v", test.rtts, o.Cluster.Compression.RTTThresholds) @@ -3228,8 +3228,8 @@ func TestRouteCompressionOptions(t *testing.T) { } s = RunServer(o) defer s.Shutdown() - if o.Cluster.Compression.Mode != test.expected { - t.Fatalf("Expected compression value to be %q, got %q", test.expected, o.Cluster.Compression) + if cm := o.Cluster.Compression.Mode; cm != test.expected { + t.Fatalf("Expected compression value to be %q, got %q", test.expected, cm) } if !reflect.DeepEqual(test.rtts, o.Cluster.Compression.RTTThresholds) { t.Fatalf("Expected RTT tresholds to be %+v, got %+v", test.rtts, o.Cluster.Compression.RTTThresholds) @@ -3245,8 +3245,8 @@ func TestRouteCompressionOptions(t *testing.T) { `)) s, o := RunServerWithConfig(conf) defer s.Shutdown() - if o.Cluster.Compression.Mode != CompressionAccept { - t.Fatalf("Expected compression value to be %q, got %q", CompressionAccept, o.Cluster.Compression.Mode) + if cm := o.Cluster.Compression.Mode; cm != CompressionAccept { + t.Fatalf("Expected compression value to be %q, got %q", CompressionAccept, cm) } for _, test := range []struct { name string @@ -3712,8 +3712,8 @@ func TestRouteCompressionAuto(t *testing.T) { } func TestRoutePings(t *testing.T) { - routeMaxPingInterval = 50 * time.Millisecond - defer func() { routeMaxPingInterval = defaultRouteMaxPingInterval }() + connWithCompressionMaxPingInterval = 50 * time.Millisecond + defer func() { connWithCompressionMaxPingInterval = defaultConnWithCompressionMaxPingInterval }() o1 := DefaultOptions() s1 := RunServer(o1) diff --git a/server/server.go b/server/server.go index 97ec2e21..b868b14d 100644 --- a/server/server.go +++ b/server/server.go @@ -330,10 +330,22 @@ type stats struct { slowConsumers int64 } +// The default ping interval is set to 2 minutes, which is fine for client +// connections, etc.. but for route or leaf compression, the CompressionS2Auto +// mode uses RTT measurements (ping/pong) to decide which compression level +// to use, we want the interval to not be that high. +const defaultConnWithCompressionMaxPingInterval = 30 * time.Second + +// Can be changed for tests +var connWithCompressionMaxPingInterval = defaultConnWithCompressionMaxPingInterval + // This is used by tests so we can run all server tests with a default route -// compression mode. For instance: +// or leafnode compression mode. For instance: // go test -race -v ./server -cluster_compression=fast -var testDefaultClusterCompression string +var ( + testDefaultClusterCompression string + testDefaultLeafNodeCompression string +) // Compression modes. const ( @@ -362,11 +374,23 @@ var defaultCompressionS2AutoRTTThresholds = []time.Duration{ // For a given user provided string, matches to one of the compression mode // constant and updates the provided string to that constant. Returns an // error if the provided compression mode is not known. -func validateAndNormalizeCompressionOption(c *CompressionOpts) error { +// The parameter `chosenModeForOn` indicates which compression mode to use +// when the user selects "on" (or enabled, true, etc..). This is because +// we may have different defaults depending on where the compression is used. +func validateAndNormalizeCompressionOption(c *CompressionOpts, chosenModeForOn string) error { if c == nil { return nil } cmtl := strings.ToLower(c.Mode) + // First, check for the "on" case so that we set to the default compression + // mode for that. The other switch/case will finish setup if needed (for + // instance if the default mode is s2Auto). + switch cmtl { + case "on", "enabled", "true": + cmtl = chosenModeForOn + default: + } + // Check (again) with the proper mode. switch cmtl { case "not supported", "not_supported": c.Mode = CompressionNotSupported @@ -420,7 +444,7 @@ func validateAndNormalizeCompressionOption(c *CompressionOpts) error { } c.Mode = CompressionS2Auto c.RTTThresholds = rtts - case "on", "enabled", "true", "fast", "s2_fast": + case "fast", "s2_fast": c.Mode = CompressionS2Fast case "better", "s2_better": c.Mode = CompressionS2Better @@ -484,6 +508,15 @@ func selectCompressionMode(scm, rcm string) (mode string, err error) { } } +// If the configured compression mode is "auto" then will return that, +// otherwise will return the given `cm` compression mode. +func compressionModeForInfoProtocol(co *CompressionOpts, cm string) string { + if co.Mode == CompressionS2Auto { + return CompressionS2Auto + } + return cm +} + // Given a connection RTT and a list of thresholds durations, this // function will return an S2 compression level such as "uncompressed", // "fast", "better" or "best". For instance, with the following slice: @@ -906,7 +939,7 @@ func (s *Server) ClientURL() string { func validateCluster(o *Options) error { if o.Cluster.Compression.Mode != _EMPTY_ { - if err := validateAndNormalizeCompressionOption(&o.Cluster.Compression); err != nil { + if err := validateAndNormalizeCompressionOption(&o.Cluster.Compression, CompressionS2Fast); err != nil { return err } } diff --git a/server/sublist_test.go b/server/sublist_test.go index 70d99ec1..319e6633 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1583,6 +1583,7 @@ var benchSublistSl = NewSublistWithCache() // https://github.com/golang/go/issues/31859 func TestMain(m *testing.M) { flag.StringVar(&testDefaultClusterCompression, "cluster_compression", _EMPTY_, "Test with this compression level as the default") + flag.StringVar(&testDefaultLeafNodeCompression, "leafnode_compression", _EMPTY_, "Test with this compression level as the default") flag.Parse() initSublist := false flag.Visit(func(f *flag.Flag) { diff --git a/test/leafnode_test.go b/test/leafnode_test.go index ae3d4e97..6226d399 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -1187,7 +1187,9 @@ func runTLSSolicitLeafServer(lso *server.Options) (*server.Server, *server.Optio host, _, _ := net.SplitHostPort(lso.LeafNode.Host) remote.TLSConfig.ServerName = host remote.TLSConfig.InsecureSkipVerify = true + remote.Compression.Mode = server.CompressionOff o.LeafNode.Remotes = []*server.RemoteLeafOpts{remote} + o.LeafNode.Compression.Mode = server.CompressionOff return RunServer(&o), &o } diff --git a/test/ocsp_test.go b/test/ocsp_test.go index bd2b071b..6da46908 100644 --- a/test/ocsp_test.go +++ b/test/ocsp_test.go @@ -1560,6 +1560,9 @@ func TestOCSPLeafNoVerify(t *testing.T) { host: "127.0.0.1" port: -1 advertise: "127.0.0.1" + # for this test, explicitly disable compression because we do it + # in RunServer but here we do a config reload... + compression: off tls { cert_file: "configs/certs/ocsp/server-status-request-url-02-cert.pem" @@ -1743,6 +1746,7 @@ func TestOCSPLeafNoVerify(t *testing.T) { host: "127.0.0.1" port: -1 advertise: "127.0.0.1" + compression: off tls { cert_file: "configs/certs/ocsp/server-status-request-url-08-cert.pem" diff --git a/test/test.go b/test/test.go index 3b2264d5..623763b4 100644 --- a/test/test.go +++ b/test/test.go @@ -83,6 +83,7 @@ func RunServerCallback(opts *server.Options, callback func(*server.Server)) *ser opts.Cluster.PoolSize = -1 // Also disable compression for "test" package. opts.Cluster.Compression.Mode = server.CompressionOff + opts.LeafNode.Compression.Mode = server.CompressionOff s, err := server.NewServer(opts) if err != nil || s == nil {