From 9e6f965913611935268ea608e5c9d8578c976c48 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 6 Apr 2022 18:40:33 -0600 Subject: [PATCH 1/3] [ADDED] LeafNode `min_version` new option If set, a server configured to accept leafnode connections will reject a remote server whose version is below that value. Note that servers prior to v2.8.0 are not sending their version in the CONNECT protocol, which means that anything below 2.8.0 would be rejected. Configuration example: ``` leafnodes { port: 7422 min_version: 2.8.0 } ``` The option is a string and can have the "v" prefix: ``` min_version: "v2.9.1" ``` Note that although suffix such as `-beta` would be accepted, only the major, minor and update are used for the version comparison. Signed-off-by: Ivan Kozlovic --- server/client.go | 9 +++ server/config_check_test.go | 22 +++++++ server/errors.go | 3 + server/leafnode.go | 37 +++++++++++ server/leafnode_test.go | 128 ++++++++++++++++++++++++++++++++++++ server/monitor.go | 2 + server/opts.go | 13 ++++ test/leafnode_test.go | 1 + 8 files changed, 215 insertions(+) diff --git a/server/client.go b/server/client.go index bea7fea7..7d062382 100644 --- a/server/client.go +++ b/server/client.go @@ -202,6 +202,7 @@ const ( DuplicateRemoteLeafnodeConnection DuplicateClientID DuplicateServerName + MinimumVersionRequired ) // Some flags passed to processMsgResults @@ -1226,6 +1227,14 @@ func (c *client) readLoop(pre []byte) { // to process messages, etc. for i := 0; i < len(bufs); i++ { if err := c.parse(bufs[i]); err != nil { + if err == ErrMinimumVersionRequired { + // Special case here, currently onle for leaf node connections. + // When processing the connect, an error was printed and sent + // back to the remote, but the connection is closed after a + // certain delay (to avoid "rapid" remote reconnection). + // We don't need to do any of the things below, simply return. + return + } if dur := time.Since(start); dur >= readLoopReportThreshold { c.Warnf("Readloop processing time: %v", dur) } diff --git a/server/config_check_test.go b/server/config_check_test.go index edb808d4..2f454478 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1238,6 +1238,28 @@ func TestConfigCheck(t *testing.T) { errorLine: 4, errorPos: 18, }, + { + name: "when leafnode min_version is wrong type", + config: ` + leafnodes { + port: -1 + min_version = 123 + }`, + err: errors.New(`interface conversion: interface {} is int64, not string`), + errorLine: 4, + errorPos: 6, + }, + { + name: "when leafnode min_version has parsing error", + config: ` + leafnodes { + port: -1 + min_version = bad.version + }`, + err: errors.New(`version parsing error: invalid semver`), + errorLine: 4, + errorPos: 6, + }, { name: "when setting latency tracking with a system account", config: ` diff --git a/server/errors.go b/server/errors.go index a5ba0ac9..56b5af3f 100644 --- a/server/errors.go +++ b/server/errors.go @@ -188,6 +188,9 @@ var ( // ErrDuplicateServerName is returned when processing a server remote connection and // the server reports that this server name is already used in the cluster. ErrDuplicateServerName = errors.New("duplicate server name") + + // ErrMinimumVersionRequired is returned when a connection is not at the minimum version required. + ErrMinimumVersionRequired = errors.New("minimum version required") ) // configErr is a configuration error. diff --git a/server/leafnode.go b/server/leafnode.go index 40f068e1..71b0fd2c 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -56,6 +56,10 @@ const leafNodeLoopDetectionSubjectPrefix = "$LDS." // LEAF connection as opposed to a CLIENT. const leafNodeWSPath = "/leafnode" +// This is the time the server will wait, when receiving a CONNECT, +// before closing the connection is the required minimum version is not met. +const leafNodeWaitBeforeClose = 5 * time.Second + type leaf struct { // We have any auth stuff here for solicited connections. remote *leafNodeCfg @@ -252,6 +256,18 @@ func validateLeafNode(o *Options) error { if o.LeafNode.Port == 0 { return nil } + + // If MinVersion is defined, check that it is valid. + if mv := o.LeafNode.MinVersion; mv != _EMPTY_ { + if _, _, _, err := versionComponents(mv); err != nil { + return fmt.Errorf("invalid min_version in leafnode: %v", err) + } + } + + // The checks below will be done only when detecting that we are configured + // with gateways. So if an option validation needs to be done regardless, + // it MUST be done before this point! + if o.Gateway.Name == "" && o.Gateway.Port == 0 { return nil } @@ -613,6 +629,7 @@ var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[- func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error { // We support basic user/pass and operator based user JWT with signatures. cinfo := leafConnectInfo{ + Version: VERSION, TLS: tlsRequired, ID: c.srv.info.ID, Domain: c.srv.info.Domain, @@ -1316,6 +1333,7 @@ func (s *Server) removeLeafNodeConnection(c *client) { // Connect information for solicited leafnodes. type leafConnectInfo struct { + Version string `json:"version,omitempty"` JWT string `json:"jwt,omitempty"` Sig string `json:"sig,omitempty"` User string `json:"user,omitempty"` @@ -1363,6 +1381,25 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return ErrWrongGateway } + if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ { + major, minor, update, _ := versionComponents(mv) + if !versionAtLeast(proto.Version, major, minor, update) { + // We are going to send back an INFO because otherwise recent + // versions of the remote server would simply break the connection + // after 2 seconds without receiving that. Instead, we want the + // other side to just "stall" until we finish waiting for the holding + // period and close the connection below. + s.sendPermsAndAccountInfo(c) + c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv)) + select { + case <-c.srv.quitCh: + case <-time.After(leafNodeWaitBeforeClose): + } + c.closeConnection(MinimumVersionRequired) + return ErrMinimumVersionRequired + } + } + // Check if this server supports headers. supportHeaders := c.srv.supportsHeaders() diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 1e47d93c..b2dd4544 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5280,3 +5280,131 @@ leafnodes:{ require_NoError(t, err) test(jsAA, jsLL) } + +type checkLeafMinVersionLogger struct { + DummyLogger + errCh chan string + connCh chan string +} + +func (l *checkLeafMinVersionLogger) Errorf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, "minimum version") { + select { + case l.errCh <- msg: + default: + } + } +} + +func (l *checkLeafMinVersionLogger) Noticef(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, "Leafnode connection created") { + select { + case l.connCh <- msg: + default: + } + } +} + +func TestLeafNodeMinVersion(t *testing.T) { + conf := createConfFile(t, []byte(` + port: -1 + leafnodes { + port: -1 + min_version: 2.8.0 + } + `)) + defer removeFile(t, conf) + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + + rconf := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + leafnodes { + remotes [ + {url: "nats://127.0.0.1:%d" } + ] + } + `, o.LeafNode.Port))) + defer removeFile(t, rconf) + ln, _ := RunServerWithConfig(rconf) + defer ln.Shutdown() + + checkLeafNodeConnected(t, s) + checkLeafNodeConnected(t, ln) + + ln.Shutdown() + s.Shutdown() + + // Now makes sure we validate options, not just config file. + o.Port = -1 + o.LeafNode.Port = -1 + o.LeafNode.MinVersion = "abc" + if s, err := NewServer(o); err == nil || !strings.Contains(err.Error(), "semver") { + if s != nil { + s.Shutdown() + } + t.Fatalf("Expected error about invalid version, got %v", err) + } + + // Ok, so now to verify that a server rejects a leafnode connection + // we will set the min_version above our current VERSION. So first + // decompose the version: + major, minor, _, err := versionComponents(VERSION) + if err != nil { + t.Fatalf("The current server version %q is not valid: %v", VERSION, err) + } + // Let's make our minimum server an minor version above + mv := fmt.Sprintf("%d.%d.0", major, minor+1) + conf = createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + leafnodes { + port: -1 + min_version: "%s" + } + `, mv))) + defer removeFile(t, conf) + s, o = RunServerWithConfig(conf) + defer s.Shutdown() + + l := &checkLeafMinVersionLogger{errCh: make(chan string, 1), connCh: make(chan string, 1)} + s.SetLogger(l, false, false) + + rconf = createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + leafnodes { + remotes [ + {url: "nats://127.0.0.1:%d" } + ] + } + `, o.LeafNode.Port))) + defer removeFile(t, rconf) + lo := LoadConfig(rconf) + lo.LeafNode.ReconnectInterval = 50 * time.Millisecond + ln = RunServer(lo) + defer ln.Shutdown() + + select { + case <-l.connCh: + case <-time.After(time.Second): + t.Fatal("Remote did not try to connect") + } + + select { + case <-l.errCh: + case <-time.After(time.Second): + t.Fatal("Did not get the minimum version required error") + } + + // Since we have a very small reconnect interval, if the connection was + // closed "right away", then we should have had a reconnect attempt with + // another failure. This should not be the case because the server will + // wait 5s before closing the connection. + select { + case <-l.connCh: + t.Fatal("Should not have tried to reconnect") + case <-time.After(250 * time.Millisecond): + // OK + } +} diff --git a/server/monitor.go b/server/monitor.go index 01918ab9..44c02f21 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2230,6 +2230,8 @@ func (reason ClosedState) String() string { return "Duplicate Client ID" case DuplicateServerName: return "Duplicate Server Name" + case MinimumVersionRequired: + return "Minimum Version Required" } return "Unknown State" diff --git a/server/opts.go b/server/opts.go index cd790108..6dcc98a1 100644 --- a/server/opts.go +++ b/server/opts.go @@ -143,6 +143,11 @@ type LeafNodeOpts struct { // For solicited connections to other clusters/superclusters. Remotes []*RemoteLeafOpts `json:"remotes,omitempty"` + // The accepting side can set a minimum version for connected server. + // Note that since the server version in the CONNECT protocol is + // added started in v2.8.0, any version below that is invalid. + MinVersion string + // Not exported, for tests. resolver netResolver dialTimeout time.Duration @@ -1981,6 +1986,14 @@ func parseLeafNodes(v interface{}, opts *Options, errors *[]error, warnings *[]e case "no_advertise": opts.LeafNode.NoAdvertise = mv.(bool) trackExplicitVal(opts, &opts.inConfig, "LeafNode.NoAdvertise", opts.LeafNode.NoAdvertise) + case "min_version", "minimum_version": + version := mv.(string) + if _, _, _, err := versionComponents(version); err != nil { + err := &configErr{tk, fmt.Sprintf("version parsing error: %v", err)} + *errors = append(*errors, err) + continue + } + opts.LeafNode.MinVersion = version default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 89ff98ba..cb29db33 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -1684,6 +1684,7 @@ func TestLeafNodeOperatorAndPermissions(t *testing.T) { if err != nil { t.Fatalf("Error on subscribe: %v", err) } + leafnc.Flush() // Make sure the interest on "bar" from "sl" server makes it to the "s" server. checkSubInterest(t, s, acc.GetName(), "bar", time.Second) From 7fa267635381aa4bf27913bd4695d684ac785071 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 7 Apr 2022 09:22:51 -0600 Subject: [PATCH 2/3] Fixed comment typos and some rewording Signed-off-by: Ivan Kozlovic --- server/client.go | 9 +++++---- server/leafnode.go | 4 ++-- server/opts.go | 8 +++++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/server/client.go b/server/client.go index 7d062382..f6e04a72 100644 --- a/server/client.go +++ b/server/client.go @@ -1228,10 +1228,11 @@ func (c *client) readLoop(pre []byte) { for i := 0; i < len(bufs); i++ { if err := c.parse(bufs[i]); err != nil { if err == ErrMinimumVersionRequired { - // Special case here, currently onle for leaf node connections. - // When processing the connect, an error was printed and sent - // back to the remote, but the connection is closed after a - // certain delay (to avoid "rapid" remote reconnection). + // Special case here, currently only for leaf node connections. + // When process the CONNECT protocol, if the minimum version + // required was not met, an error was printed and sent back to + // the remote, and connection was closed after a certain delay + // (to avoid "rapid" reconnection from the remote). // We don't need to do any of the things below, simply return. return } diff --git a/server/leafnode.go b/server/leafnode.go index 71b0fd2c..71203e30 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -57,7 +57,7 @@ const leafNodeLoopDetectionSubjectPrefix = "$LDS." const leafNodeWSPath = "/leafnode" // This is the time the server will wait, when receiving a CONNECT, -// before closing the connection is the required minimum version is not met. +// before closing the connection if the required minimum version is not met. const leafNodeWaitBeforeClose = 5 * time.Second type leaf struct { @@ -1386,7 +1386,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro if !versionAtLeast(proto.Version, major, minor, update) { // We are going to send back an INFO because otherwise recent // versions of the remote server would simply break the connection - // after 2 seconds without receiving that. Instead, we want the + // after 2 seconds if not receiving it. Instead, we want the // other side to just "stall" until we finish waiting for the holding // period and close the connection below. s.sendPermsAndAccountInfo(c) diff --git a/server/opts.go b/server/opts.go index 6dcc98a1..846273a2 100644 --- a/server/opts.go +++ b/server/opts.go @@ -143,9 +143,11 @@ type LeafNodeOpts struct { // For solicited connections to other clusters/superclusters. Remotes []*RemoteLeafOpts `json:"remotes,omitempty"` - // The accepting side can set a minimum version for connected server. - // Note that since the server version in the CONNECT protocol is - // added started in v2.8.0, any version below that is invalid. + // This is the minimum version that is accepted for remote connections. + // Note that since the server version in the CONNECT protocol was added + // only starting at v2.8.0, any version below that would result in the + // connection being rejected (since empty version string in CONNECT would + // fail the "version at least" test). MinVersion string // Not exported, for tests. From b5c9583ee2934b566a48fe480de0c97577abc402 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 7 Apr 2022 12:49:34 -0600 Subject: [PATCH 3/3] Reject configuration with value below 2.8.0 Signed-off-by: Ivan Kozlovic --- server/config_check_test.go | 13 ++++++++++++- server/leafnode.go | 15 +++++++++++++-- server/leafnode_test.go | 27 +++++++++++++++++++-------- server/opts.go | 10 +++++----- server/util.go | 13 +++++++++---- 5 files changed, 58 insertions(+), 20 deletions(-) diff --git a/server/config_check_test.go b/server/config_check_test.go index 2f454478..dd476c63 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1256,7 +1256,18 @@ func TestConfigCheck(t *testing.T) { port: -1 min_version = bad.version }`, - err: errors.New(`version parsing error: invalid semver`), + err: errors.New(`invalid leafnode's minimum version: invalid semver`), + errorLine: 4, + errorPos: 6, + }, + { + name: "when leafnode min_version is too low", + config: ` + leafnodes { + port: -1 + min_version = 2.7.9 + }`, + err: errors.New(`the minimum version should be at least 2.8.0`), errorLine: 4, errorPos: 6, }, diff --git a/server/leafnode.go b/server/leafnode.go index 71203e30..78adef19 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -259,8 +259,8 @@ func validateLeafNode(o *Options) error { // If MinVersion is defined, check that it is valid. if mv := o.LeafNode.MinVersion; mv != _EMPTY_ { - if _, _, _, err := versionComponents(mv); err != nil { - return fmt.Errorf("invalid min_version in leafnode: %v", err) + if err := checkLeafMinVersionConfig(mv); err != nil { + return err } } @@ -282,6 +282,17 @@ func validateLeafNode(o *Options) error { return nil } +func checkLeafMinVersionConfig(mv string) error { + if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil { + if err != nil { + return fmt.Errorf("invalid leafnode's minimum version: %v", err) + } else { + return fmt.Errorf("the minimum version should be at least 2.8.0") + } + } + return nil +} + // Used to validate user names in LeafNode configuration. // - rejects mix of single and multiple users. // - rejects duplicate user names. diff --git a/server/leafnode_test.go b/server/leafnode_test.go index b2dd4544..257f7294 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5338,14 +5338,25 @@ func TestLeafNodeMinVersion(t *testing.T) { s.Shutdown() // Now makes sure we validate options, not just config file. - o.Port = -1 - o.LeafNode.Port = -1 - o.LeafNode.MinVersion = "abc" - if s, err := NewServer(o); err == nil || !strings.Contains(err.Error(), "semver") { - if s != nil { - s.Shutdown() - } - t.Fatalf("Expected error about invalid version, got %v", err) + for _, test := range []struct { + name string + version string + err string + }{ + {"invalid version", "abc", "semver"}, + {"version too low", "2.7.9", "the minimum version should be at least 2.8.0"}, + } { + t.Run(test.name, func(t *testing.T) { + o.Port = -1 + o.LeafNode.Port = -1 + o.LeafNode.MinVersion = test.version + if s, err := NewServer(o); err == nil || !strings.Contains(err.Error(), test.err) { + if s != nil { + s.Shutdown() + } + t.Fatalf("Expected error to contain %q, got %v", test.err, err) + } + }) } // Ok, so now to verify that a server rejects a leafnode connection diff --git a/server/opts.go b/server/opts.go index 846273a2..62415f9b 100644 --- a/server/opts.go +++ b/server/opts.go @@ -145,9 +145,9 @@ type LeafNodeOpts struct { // This is the minimum version that is accepted for remote connections. // Note that since the server version in the CONNECT protocol was added - // only starting at v2.8.0, any version below that would result in the - // connection being rejected (since empty version string in CONNECT would - // fail the "version at least" test). + // only starting at v2.8.0, any version below that will be rejected + // (since empty version string in CONNECT would fail the "version at + // least" test). MinVersion string // Not exported, for tests. @@ -1990,8 +1990,8 @@ func parseLeafNodes(v interface{}, opts *Options, errors *[]error, warnings *[]e trackExplicitVal(opts, &opts.inConfig, "LeafNode.NoAdvertise", opts.LeafNode.NoAdvertise) case "min_version", "minimum_version": version := mv.(string) - if _, _, _, err := versionComponents(version); err != nil { - err := &configErr{tk, fmt.Sprintf("version parsing error: %v", err)} + if err := checkLeafMinVersionConfig(version); err != nil { + err = &configErr{tk, err.Error()} *errors = append(*errors, err) continue } diff --git a/server/util.go b/server/util.go index 24f4de9f..24cef1ac 100644 --- a/server/util.go +++ b/server/util.go @@ -59,17 +59,22 @@ func versionComponents(version string) (major, minor, patch int, err error) { return major, minor, patch, err } -func versionAtLeast(version string, emajor, eminor, epatch int) bool { +func versionAtLeastCheckError(version string, emajor, eminor, epatch int) (bool, error) { major, minor, patch, err := versionComponents(version) if err != nil { - return false + return false, err } if major > emajor || (major == emajor && minor > eminor) || (major == emajor && minor == eminor && patch >= epatch) { - return true + return true, nil } - return false + return false, err +} + +func versionAtLeast(version string, emajor, eminor, epatch int) bool { + res, _ := versionAtLeastCheckError(version, emajor, eminor, epatch) + return res } // parseSize expects decimal positive numbers. We