From 1eb08505d465d9c1a03aa32f427a0632ce571f31 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 28 Sep 2023 10:46:32 -0600 Subject: [PATCH] [FIXED] Routes: Pinned Accounts connect/reconnect in some cases The issue is with a server that has a route for a given account but connects to a server that does not support it. The creation of the route for this account will fail - as expected - and the server will stop trying to create the route for this account. But it needs to retry to create this route if it were to reconnect to that same URL in case the server (or its config) is updated to support a route for this account. There was also an issue even with 2.10.0 servers in some gossip situations. Namely, if server B is soliciting connections to A (but not vice-versa) and A would solicit connections to C (but not vice-versa). In this case, connections for pinned-accounts would not be created. Signed-off-by: Ivan Kozlovic --- server/jetstream_cluster_3_test.go | 4 +- server/route.go | 63 ++++++-- server/routes_test.go | 246 ++++++++++++++++++++++++++++- 3 files changed, 295 insertions(+), 18 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ad9b4978..095740e2 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3470,9 +3470,7 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) { }() defer close(qch) - s.mu.RLock() - gacc := s.gacc - s.mu.RUnlock() + gacc := s.GlobalAccount() if gacc == nil { t.Fatalf("No global account") } diff --git a/server/route.go b/server/route.go index 2954b834..493f03e4 100644 --- a/server/route.go +++ b/server/route.go @@ -653,11 +653,14 @@ func (c *client) processRouteInfo(info *Info) { // We receive an INFO from a server that informs us about another server, // so the info.ID in the INFO protocol does not match the ID of this route. if remoteID != _EMPTY_ && remoteID != info.ID { + // We want to know if the existing route supports pooling/pinned-account + // or not when processing the implicit route. + noPool := c.route.noPool c.mu.Unlock() // Process this implicit route. We will check that it is not an explicit // route and/or that it has not been connected already. - s.processImplicitRoute(info) + s.processImplicitRoute(info, noPool) return } @@ -812,10 +815,14 @@ func (c *client) processRouteInfo(info *Info) { } } // For accounts that are configured to have their own route: - // If this is a solicit route, we already have c.route.accName set in createRoute. + // If this is a solicited route, we already have c.route.accName set in createRoute. // For non solicited route (the accept side), we will set the account name that // is present in the INFO protocol. - if !didSolicit { + if didSolicit && len(c.route.accName) > 0 { + // Set it in the info.RouteAccount so that addRoute can use that + // and we properly gossip that this is a route for an account. + info.RouteAccount = string(c.route.accName) + } else if !didSolicit && info.RouteAccount != _EMPTY_ { c.route.accName = []byte(info.RouteAccount) } accName := string(c.route.accName) @@ -1002,7 +1009,7 @@ func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) { // This will process implicit route information received from another server. // We will check to see if we have configured or are already connected, // and if so we will ignore. Otherwise we will attempt to connect. -func (s *Server) processImplicitRoute(info *Info) { +func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { remoteID := info.ID s.mu.Lock() @@ -1012,8 +1019,16 @@ func (s *Server) processImplicitRoute(info *Info) { if remoteID == s.info.ID { return } + + // Snapshot server options. + opts := s.getOpts() + // Check if this route already exists if accName := info.RouteAccount; accName != _EMPTY_ { + // If we don't support pooling/pinned account, bail. + if opts.Cluster.PoolSize <= 0 { + return + } if remotes, ok := s.accRoutes[accName]; ok { if r := remotes[remoteID]; r != nil { return @@ -1034,13 +1049,22 @@ func (s *Server) processImplicitRoute(info *Info) { return } - // Snapshot server options. - opts := s.getOpts() - if info.AuthRequired { r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password) } s.startGoRoutine(func() { s.connectToRoute(r, false, true, info.RouteAccount) }) + // If we are processing an implicit route from a route that does not + // support pooling/pinned-accounts, we won't receive an INFO for each of + // the pinned-accounts that we would normally receive. In that case, just + // initiate routes for all our configured pinned accounts. + if routeNoPool && info.RouteAccount == _EMPTY_ && len(opts.Cluster.PinnedAccounts) > 0 { + // Copy since we are going to pass as closure to a go routine. + rURL := r + for _, an := range opts.Cluster.PinnedAccounts { + accName := an + s.startGoRoutine(func() { s.connectToRoute(rURL, false, true, accName) }) + } + } } // hasThisRouteConfigured returns true if info.Host:info.Port is present @@ -1071,7 +1095,10 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { s.forEachRemote(func(r *client) { r.mu.Lock() - if r.route.remoteID != info.ID { + // If this is a new route for a given account, do not send to a server + // that does not support pooling/pinned-accounts. + if r.route.remoteID != info.ID && + (info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) { r.enqueueProto(infoJSON) } r.mu.Unlock() @@ -1855,7 +1882,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // server and need to handle things differently. if info.RoutePoolSize <= 0 || opts.Cluster.PoolSize < 0 { if accName != _EMPTY_ { - invProtoErr = fmt.Sprintf("Not possible to have a dedicate route for account %q between those servers", accName) + invProtoErr = fmt.Sprintf("Not possible to have a dedicated route for account %q between those servers", accName) // In this case, make sure this route does not attempt to reconnect c.setNoReconnect() } else { @@ -2731,6 +2758,7 @@ func (s *Server) removeRoute(c *client) { opts = s.getOpts() rURL *url.URL noPool bool + didSolicit bool ) c.mu.Lock() cid := c.cid @@ -2749,6 +2777,7 @@ func (s *Server) removeRoute(c *client) { connectURLs = r.connectURLs wsConnectURLs = r.wsConnURLs rURL = r.url + didSolicit = r.didSolicit } c.mu.Unlock() if accName != _EMPTY_ { @@ -2807,10 +2836,18 @@ func (s *Server) removeRoute(c *client) { if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) { s.sendAsyncLeafNodeInfo() } - // If this server has pooling and the route for this remote - // was a "no pool" route, attempt to reconnect. - if s.routesPoolSize > 1 && noPool { - s.startGoRoutine(func() { s.connectToRoute(rURL, true, true, _EMPTY_) }) + // If this server has pooling/pinned accounts and the route for + // this remote was a "no pool" route, attempt to reconnect. + if noPool { + if s.routesPoolSize > 1 { + s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, _EMPTY_) }) + } + if len(opts.Cluster.PinnedAccounts) > 0 { + for _, an := range opts.Cluster.PinnedAccounts { + accName := an + s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, accName) }) + } + } } } // This is for gateway code. Remove this route from a map that uses diff --git a/server/routes_test.go b/server/routes_test.go index 7397c8e0..00fc4cc6 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -2529,6 +2529,243 @@ func TestRoutePerAccountConnectRace(t *testing.T) { } } +func TestRoutePerAccountGossipWorks(t *testing.T) { + tmplA := ` + port: -1 + server_name: "A" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: %d + name: "local" + accounts: ["A"] + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmplA, -1, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + tmplBC := ` + port: -1 + server_name: "%s" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: -1 + name: "local" + %s + accounts: ["A"] + } + ` + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmplBC, "B", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port)))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + // Now connect s3 to s1 and make sure that s2 connects properly to s3. + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmplBC, "C", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port)))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + s3.Shutdown() + s2.Shutdown() + s1.Shutdown() + + // Slightly different version where s2 is connecting to s1, while s1 + // connects to s3 (and s3 does not solicit connections). + + conf1 = createConfFile(t, []byte(fmt.Sprintf(tmplA, -1, _EMPTY_))) + s1, o1 = RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 = createConfFile(t, []byte(fmt.Sprintf(tmplBC, "B", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port)))) + s2, _ = RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + // Start s3 first that will simply accept connections + conf3 = createConfFile(t, []byte(fmt.Sprintf(tmplBC, "C", _EMPTY_))) + s3, o3 := RunServerWithConfig(conf3) + defer s3.Shutdown() + + // Now config reload s1 so that it points to s3. + reloadUpdateConfig(t, s1, conf1, + fmt.Sprintf(tmplA, o1.Cluster.Port, fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o3.Cluster.Port))) + + checkClusterFormed(t, s1, s2, s3) +} + +func TestRoutePerAccountGossipWorksWithOldServerNotSeed(t *testing.T) { + tmplA := ` + port: -1 + server_name: "A" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: %d + name: "local" + accounts: ["A"] + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmplA, -1))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + // Here, server "B" will have no pooling/accounts. + tmplB := ` + port: -1 + server_name: "B" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: -1 + name: "local" + routes: ["nats://127.0.0.1:%d"] + pool_size: -1 + } + ` + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmplB, o1.Cluster.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + l := &captureErrorLogger{errCh: make(chan string, 100)} + s2.SetLogger(l, false, false) + + // Now connect s3 to s1. Server s1 should not gossip to s2 information + // about pinned-account routes or extra routes from the pool. + tmplC := ` + port: -1 + server_name: "C" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: -1 + name: "local" + routes: ["nats://127.0.0.1:%d"] + accounts: ["A"] + } + ` + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmplC, o1.Cluster.Port))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + // We should not have had s2 try to create dedicated routes for "A" or "$SYS" + tm := time.NewTimer(time.Second) + defer tm.Stop() + for { + select { + case err := <-l.errCh: + if strings.Contains(err, "dedicated route") { + t.Fatalf("Server s2 should not have tried to create a dedicated route: %s", err) + } + case <-tm.C: + return + } + } +} + +func TestRoutePerAccountGossipWorksWithOldServerSeed(t *testing.T) { + tmplA := ` + port: -1 + server_name: "A" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: %d + name: "local" + pool_size: %d + %s + } + ` + // Start with s1 being an "old" server, which does not support pooling/pinned-accounts. + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmplA, -1, -1, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + tmplBC := ` + port: -1 + server_name: "%s" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: -1 + name: "local" + pool_size: 3 + routes: ["nats://127.0.0.1:%d"] + accounts: ["A"] + } + ` + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmplBC, "B", o1.Cluster.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + // Now connect s3 to s1 and make sure that s2 connects properly to s3. + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmplBC, "C", o1.Cluster.Port))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + checkRoutes := func(s *Server, expected int) { + t.Helper() + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + if nr := s.NumRoutes(); nr != expected { + return fmt.Errorf("Server %q should have %v routes, got %v", s.Name(), expected, nr) + } + return nil + }) + } + // Since s1 has no pooling/pinned-accounts, there should be only 2 routes, + // one to s2 and one to s3. + checkRoutes(s1, 2) + // s2 and s3 should have 1 route to s1 and 3(pool)+"A"+"$SYS" == 6 + checkRoutes(s2, 6) + checkRoutes(s3, 6) + + s1.Shutdown() + + // The server s1 will now support pooling and accounts pinning. + // Restart the server s1 with the same cluster port otherwise + // s2/s3 would not be able to reconnect. + conf1 = createConfFile(t, []byte(fmt.Sprintf(tmplA, o1.Cluster.Port, 3, "accounts: [\"A\"]"))) + s1, _ = RunServerWithConfig(conf1) + defer s1.Shutdown() + // Make sure reconnect occurs. We should now have 5 routes. + checkClusterFormed(t, s1, s2, s3) + // Now all servers should have 3+2 to each other, so 10 total. + checkRoutes(s1, 10) + checkRoutes(s2, 10) + checkRoutes(s3, 10) +} + func TestRoutePoolPerAccountSubUnsubProtoParsing(t *testing.T) { for _, test := range []struct { name string @@ -2930,9 +3167,11 @@ func TestRoutePoolAndPerAccountOperatorMode(t *testing.T) { return nil }) } - // Route for account "apub" should be a dedicated route + // Route for accounts "apub" and "spub" should be a dedicated route checkRoute(s1, apub, true) checkRoute(s2, apub, true) + checkRoute(s1, spub, true) + checkRoute(s2, spub, true) // Route for account "bpub" should not checkRoute(s1, bpub, false) checkRoute(s2, bpub, false) @@ -2959,9 +3198,12 @@ func TestRoutePoolAndPerAccountOperatorMode(t *testing.T) { // before doing the config reload on s2. checkRoute(s1, bpub, true) checkRoute(s2, bpub, true) - // Account "aoub" should still have its dedicated route + // Account "apub" should still have its dedicated route checkRoute(s1, apub, true) checkRoute(s2, apub, true) + // So the system account + checkRoute(s1, spub, true) + checkRoute(s2, spub, true) // Let's complete the config reload on srvb reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port), fmt.Sprintf(",\"%s\"", bpub)))