Merge pull request #3013 from nats-io/ln_min_version

[ADDED] LeafNode `min_version` new option
This commit is contained in:
Ivan Kozlovic
2022-04-07 13:57:29 -06:00
committed by GitHub
9 changed files with 260 additions and 4 deletions

View File

@@ -202,6 +202,7 @@ const (
DuplicateRemoteLeafnodeConnection
DuplicateClientID
DuplicateServerName
MinimumVersionRequired
)
// Some flags passed to processMsgResults
@@ -1226,6 +1227,15 @@ func (c *client) readLoop(pre []byte) {
// to process messages, etc.
for i := 0; i < len(bufs); i++ {
if err := c.parse(bufs[i]); err != nil {
if err == ErrMinimumVersionRequired {
// Special case here, currently only for leaf node connections.
// When process the CONNECT protocol, if the minimum version
// required was not met, an error was printed and sent back to
// the remote, and connection was closed after a certain delay
// (to avoid "rapid" reconnection from the remote).
// We don't need to do any of the things below, simply return.
return
}
if dur := time.Since(start); dur >= readLoopReportThreshold {
c.Warnf("Readloop processing time: %v", dur)
}

View File

@@ -1238,6 +1238,39 @@ func TestConfigCheck(t *testing.T) {
errorLine: 4,
errorPos: 18,
},
{
name: "when leafnode min_version is wrong type",
config: `
leafnodes {
port: -1
min_version = 123
}`,
err: errors.New(`interface conversion: interface {} is int64, not string`),
errorLine: 4,
errorPos: 6,
},
{
name: "when leafnode min_version has parsing error",
config: `
leafnodes {
port: -1
min_version = bad.version
}`,
err: errors.New(`invalid leafnode's minimum version: invalid semver`),
errorLine: 4,
errorPos: 6,
},
{
name: "when leafnode min_version is too low",
config: `
leafnodes {
port: -1
min_version = 2.7.9
}`,
err: errors.New(`the minimum version should be at least 2.8.0`),
errorLine: 4,
errorPos: 6,
},
{
name: "when setting latency tracking with a system account",
config: `

View File

@@ -188,6 +188,9 @@ var (
// ErrDuplicateServerName is returned when processing a server remote connection and
// the server reports that this server name is already used in the cluster.
ErrDuplicateServerName = errors.New("duplicate server name")
// ErrMinimumVersionRequired is returned when a connection is not at the minimum version required.
ErrMinimumVersionRequired = errors.New("minimum version required")
)
// configErr is a configuration error.

View File

@@ -56,6 +56,10 @@ const leafNodeLoopDetectionSubjectPrefix = "$LDS."
// LEAF connection as opposed to a CLIENT.
const leafNodeWSPath = "/leafnode"
// This is the time the server will wait, when receiving a CONNECT,
// before closing the connection if the required minimum version is not met.
const leafNodeWaitBeforeClose = 5 * time.Second
type leaf struct {
// We have any auth stuff here for solicited connections.
remote *leafNodeCfg
@@ -252,6 +256,18 @@ func validateLeafNode(o *Options) error {
if o.LeafNode.Port == 0 {
return nil
}
// If MinVersion is defined, check that it is valid.
if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
if err := checkLeafMinVersionConfig(mv); err != nil {
return err
}
}
// The checks below will be done only when detecting that we are configured
// with gateways. So if an option validation needs to be done regardless,
// it MUST be done before this point!
if o.Gateway.Name == "" && o.Gateway.Port == 0 {
return nil
}
@@ -266,6 +282,17 @@ func validateLeafNode(o *Options) error {
return nil
}
func checkLeafMinVersionConfig(mv string) error {
if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
if err != nil {
return fmt.Errorf("invalid leafnode's minimum version: %v", err)
} else {
return fmt.Errorf("the minimum version should be at least 2.8.0")
}
}
return nil
}
// Used to validate user names in LeafNode configuration.
// - rejects mix of single and multiple users.
// - rejects duplicate user names.
@@ -613,6 +640,7 @@ var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-
func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error {
// We support basic user/pass and operator based user JWT with signatures.
cinfo := leafConnectInfo{
Version: VERSION,
TLS: tlsRequired,
ID: c.srv.info.ID,
Domain: c.srv.info.Domain,
@@ -1316,6 +1344,7 @@ func (s *Server) removeLeafNodeConnection(c *client) {
// Connect information for solicited leafnodes.
type leafConnectInfo struct {
Version string `json:"version,omitempty"`
JWT string `json:"jwt,omitempty"`
Sig string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
@@ -1363,6 +1392,25 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
return ErrWrongGateway
}
if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
major, minor, update, _ := versionComponents(mv)
if !versionAtLeast(proto.Version, major, minor, update) {
// We are going to send back an INFO because otherwise recent
// versions of the remote server would simply break the connection
// after 2 seconds if not receiving it. Instead, we want the
// other side to just "stall" until we finish waiting for the holding
// period and close the connection below.
s.sendPermsAndAccountInfo(c)
c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
select {
case <-c.srv.quitCh:
case <-time.After(leafNodeWaitBeforeClose):
}
c.closeConnection(MinimumVersionRequired)
return ErrMinimumVersionRequired
}
}
// Check if this server supports headers.
supportHeaders := c.srv.supportsHeaders()

View File

@@ -5280,3 +5280,142 @@ leafnodes:{
require_NoError(t, err)
test(jsAA, jsLL)
}
type checkLeafMinVersionLogger struct {
DummyLogger
errCh chan string
connCh chan string
}
func (l *checkLeafMinVersionLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "minimum version") {
select {
case l.errCh <- msg:
default:
}
}
}
func (l *checkLeafMinVersionLogger) Noticef(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "Leafnode connection created") {
select {
case l.connCh <- msg:
default:
}
}
}
func TestLeafNodeMinVersion(t *testing.T) {
conf := createConfFile(t, []byte(`
port: -1
leafnodes {
port: -1
min_version: 2.8.0
}
`))
defer removeFile(t, conf)
s, o := RunServerWithConfig(conf)
defer s.Shutdown()
rconf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
remotes [
{url: "nats://127.0.0.1:%d" }
]
}
`, o.LeafNode.Port)))
defer removeFile(t, rconf)
ln, _ := RunServerWithConfig(rconf)
defer ln.Shutdown()
checkLeafNodeConnected(t, s)
checkLeafNodeConnected(t, ln)
ln.Shutdown()
s.Shutdown()
// Now makes sure we validate options, not just config file.
for _, test := range []struct {
name string
version string
err string
}{
{"invalid version", "abc", "semver"},
{"version too low", "2.7.9", "the minimum version should be at least 2.8.0"},
} {
t.Run(test.name, func(t *testing.T) {
o.Port = -1
o.LeafNode.Port = -1
o.LeafNode.MinVersion = test.version
if s, err := NewServer(o); err == nil || !strings.Contains(err.Error(), test.err) {
if s != nil {
s.Shutdown()
}
t.Fatalf("Expected error to contain %q, got %v", test.err, err)
}
})
}
// Ok, so now to verify that a server rejects a leafnode connection
// we will set the min_version above our current VERSION. So first
// decompose the version:
major, minor, _, err := versionComponents(VERSION)
if err != nil {
t.Fatalf("The current server version %q is not valid: %v", VERSION, err)
}
// Let's make our minimum server an minor version above
mv := fmt.Sprintf("%d.%d.0", major, minor+1)
conf = createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
port: -1
min_version: "%s"
}
`, mv)))
defer removeFile(t, conf)
s, o = RunServerWithConfig(conf)
defer s.Shutdown()
l := &checkLeafMinVersionLogger{errCh: make(chan string, 1), connCh: make(chan string, 1)}
s.SetLogger(l, false, false)
rconf = createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
remotes [
{url: "nats://127.0.0.1:%d" }
]
}
`, o.LeafNode.Port)))
defer removeFile(t, rconf)
lo := LoadConfig(rconf)
lo.LeafNode.ReconnectInterval = 50 * time.Millisecond
ln = RunServer(lo)
defer ln.Shutdown()
select {
case <-l.connCh:
case <-time.After(time.Second):
t.Fatal("Remote did not try to connect")
}
select {
case <-l.errCh:
case <-time.After(time.Second):
t.Fatal("Did not get the minimum version required error")
}
// Since we have a very small reconnect interval, if the connection was
// closed "right away", then we should have had a reconnect attempt with
// another failure. This should not be the case because the server will
// wait 5s before closing the connection.
select {
case <-l.connCh:
t.Fatal("Should not have tried to reconnect")
case <-time.After(250 * time.Millisecond):
// OK
}
}

View File

@@ -2230,6 +2230,8 @@ func (reason ClosedState) String() string {
return "Duplicate Client ID"
case DuplicateServerName:
return "Duplicate Server Name"
case MinimumVersionRequired:
return "Minimum Version Required"
}
return "Unknown State"

View File

@@ -143,6 +143,13 @@ type LeafNodeOpts struct {
// For solicited connections to other clusters/superclusters.
Remotes []*RemoteLeafOpts `json:"remotes,omitempty"`
// This is the minimum version that is accepted for remote connections.
// Note that since the server version in the CONNECT protocol was added
// only starting at v2.8.0, any version below that will be rejected
// (since empty version string in CONNECT would fail the "version at
// least" test).
MinVersion string
// Not exported, for tests.
resolver netResolver
dialTimeout time.Duration
@@ -1981,6 +1988,14 @@ func parseLeafNodes(v interface{}, opts *Options, errors *[]error, warnings *[]e
case "no_advertise":
opts.LeafNode.NoAdvertise = mv.(bool)
trackExplicitVal(opts, &opts.inConfig, "LeafNode.NoAdvertise", opts.LeafNode.NoAdvertise)
case "min_version", "minimum_version":
version := mv.(string)
if err := checkLeafMinVersionConfig(version); err != nil {
err = &configErr{tk, err.Error()}
*errors = append(*errors, err)
continue
}
opts.LeafNode.MinVersion = version
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{

View File

@@ -59,17 +59,22 @@ func versionComponents(version string) (major, minor, patch int, err error) {
return major, minor, patch, err
}
func versionAtLeast(version string, emajor, eminor, epatch int) bool {
func versionAtLeastCheckError(version string, emajor, eminor, epatch int) (bool, error) {
major, minor, patch, err := versionComponents(version)
if err != nil {
return false
return false, err
}
if major > emajor ||
(major == emajor && minor > eminor) ||
(major == emajor && minor == eminor && patch >= epatch) {
return true
return true, nil
}
return false
return false, err
}
func versionAtLeast(version string, emajor, eminor, epatch int) bool {
res, _ := versionAtLeastCheckError(version, emajor, eminor, epatch)
return res
}
// parseSize expects decimal positive numbers. We

View File

@@ -1684,6 +1684,7 @@ func TestLeafNodeOperatorAndPermissions(t *testing.T) {
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
leafnc.Flush()
// Make sure the interest on "bar" from "sl" server makes it to the "s" server.
checkSubInterest(t, s, acc.GetName(), "bar", time.Second)