diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ad9b4978..095740e2 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3470,9 +3470,7 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) { }() defer close(qch) - s.mu.RLock() - gacc := s.gacc - s.mu.RUnlock() + gacc := s.GlobalAccount() if gacc == nil { t.Fatalf("No global account") } diff --git a/server/route.go b/server/route.go index 2954b834..493f03e4 100644 --- a/server/route.go +++ b/server/route.go @@ -653,11 +653,14 @@ func (c *client) processRouteInfo(info *Info) { // We receive an INFO from a server that informs us about another server, // so the info.ID in the INFO protocol does not match the ID of this route. if remoteID != _EMPTY_ && remoteID != info.ID { + // We want to know if the existing route supports pooling/pinned-account + // or not when processing the implicit route. + noPool := c.route.noPool c.mu.Unlock() // Process this implicit route. We will check that it is not an explicit // route and/or that it has not been connected already. - s.processImplicitRoute(info) + s.processImplicitRoute(info, noPool) return } @@ -812,10 +815,14 @@ func (c *client) processRouteInfo(info *Info) { } } // For accounts that are configured to have their own route: - // If this is a solicit route, we already have c.route.accName set in createRoute. + // If this is a solicited route, we already have c.route.accName set in createRoute. // For non solicited route (the accept side), we will set the account name that // is present in the INFO protocol. - if !didSolicit { + if didSolicit && len(c.route.accName) > 0 { + // Set it in the info.RouteAccount so that addRoute can use that + // and we properly gossip that this is a route for an account. + info.RouteAccount = string(c.route.accName) + } else if !didSolicit && info.RouteAccount != _EMPTY_ { c.route.accName = []byte(info.RouteAccount) } accName := string(c.route.accName) @@ -1002,7 +1009,7 @@ func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) { // This will process implicit route information received from another server. // We will check to see if we have configured or are already connected, // and if so we will ignore. Otherwise we will attempt to connect. -func (s *Server) processImplicitRoute(info *Info) { +func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { remoteID := info.ID s.mu.Lock() @@ -1012,8 +1019,16 @@ func (s *Server) processImplicitRoute(info *Info) { if remoteID == s.info.ID { return } + + // Snapshot server options. + opts := s.getOpts() + // Check if this route already exists if accName := info.RouteAccount; accName != _EMPTY_ { + // If we don't support pooling/pinned account, bail. + if opts.Cluster.PoolSize <= 0 { + return + } if remotes, ok := s.accRoutes[accName]; ok { if r := remotes[remoteID]; r != nil { return @@ -1034,13 +1049,22 @@ func (s *Server) processImplicitRoute(info *Info) { return } - // Snapshot server options. - opts := s.getOpts() - if info.AuthRequired { r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password) } s.startGoRoutine(func() { s.connectToRoute(r, false, true, info.RouteAccount) }) + // If we are processing an implicit route from a route that does not + // support pooling/pinned-accounts, we won't receive an INFO for each of + // the pinned-accounts that we would normally receive. In that case, just + // initiate routes for all our configured pinned accounts. + if routeNoPool && info.RouteAccount == _EMPTY_ && len(opts.Cluster.PinnedAccounts) > 0 { + // Copy since we are going to pass as closure to a go routine. + rURL := r + for _, an := range opts.Cluster.PinnedAccounts { + accName := an + s.startGoRoutine(func() { s.connectToRoute(rURL, false, true, accName) }) + } + } } // hasThisRouteConfigured returns true if info.Host:info.Port is present @@ -1071,7 +1095,10 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { s.forEachRemote(func(r *client) { r.mu.Lock() - if r.route.remoteID != info.ID { + // If this is a new route for a given account, do not send to a server + // that does not support pooling/pinned-accounts. + if r.route.remoteID != info.ID && + (info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) { r.enqueueProto(infoJSON) } r.mu.Unlock() @@ -1855,7 +1882,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // server and need to handle things differently. if info.RoutePoolSize <= 0 || opts.Cluster.PoolSize < 0 { if accName != _EMPTY_ { - invProtoErr = fmt.Sprintf("Not possible to have a dedicate route for account %q between those servers", accName) + invProtoErr = fmt.Sprintf("Not possible to have a dedicated route for account %q between those servers", accName) // In this case, make sure this route does not attempt to reconnect c.setNoReconnect() } else { @@ -2731,6 +2758,7 @@ func (s *Server) removeRoute(c *client) { opts = s.getOpts() rURL *url.URL noPool bool + didSolicit bool ) c.mu.Lock() cid := c.cid @@ -2749,6 +2777,7 @@ func (s *Server) removeRoute(c *client) { connectURLs = r.connectURLs wsConnectURLs = r.wsConnURLs rURL = r.url + didSolicit = r.didSolicit } c.mu.Unlock() if accName != _EMPTY_ { @@ -2807,10 +2836,18 @@ func (s *Server) removeRoute(c *client) { if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) { s.sendAsyncLeafNodeInfo() } - // If this server has pooling and the route for this remote - // was a "no pool" route, attempt to reconnect. - if s.routesPoolSize > 1 && noPool { - s.startGoRoutine(func() { s.connectToRoute(rURL, true, true, _EMPTY_) }) + // If this server has pooling/pinned accounts and the route for + // this remote was a "no pool" route, attempt to reconnect. + if noPool { + if s.routesPoolSize > 1 { + s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, _EMPTY_) }) + } + if len(opts.Cluster.PinnedAccounts) > 0 { + for _, an := range opts.Cluster.PinnedAccounts { + accName := an + s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, accName) }) + } + } } } // This is for gateway code. Remove this route from a map that uses diff --git a/server/routes_test.go b/server/routes_test.go index 7397c8e0..00fc4cc6 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -2529,6 +2529,243 @@ func TestRoutePerAccountConnectRace(t *testing.T) { } } +func TestRoutePerAccountGossipWorks(t *testing.T) { + tmplA := ` + port: -1 + server_name: "A" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: %d + name: "local" + accounts: ["A"] + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmplA, -1, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + tmplBC := ` + port: -1 + server_name: "%s" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: -1 + name: "local" + %s + accounts: ["A"] + } + ` + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmplBC, "B", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port)))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + // Now connect s3 to s1 and make sure that s2 connects properly to s3. + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmplBC, "C", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port)))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + s3.Shutdown() + s2.Shutdown() + s1.Shutdown() + + // Slightly different version where s2 is connecting to s1, while s1 + // connects to s3 (and s3 does not solicit connections). + + conf1 = createConfFile(t, []byte(fmt.Sprintf(tmplA, -1, _EMPTY_))) + s1, o1 = RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 = createConfFile(t, []byte(fmt.Sprintf(tmplBC, "B", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port)))) + s2, _ = RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + // Start s3 first that will simply accept connections + conf3 = createConfFile(t, []byte(fmt.Sprintf(tmplBC, "C", _EMPTY_))) + s3, o3 := RunServerWithConfig(conf3) + defer s3.Shutdown() + + // Now config reload s1 so that it points to s3. + reloadUpdateConfig(t, s1, conf1, + fmt.Sprintf(tmplA, o1.Cluster.Port, fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o3.Cluster.Port))) + + checkClusterFormed(t, s1, s2, s3) +} + +func TestRoutePerAccountGossipWorksWithOldServerNotSeed(t *testing.T) { + tmplA := ` + port: -1 + server_name: "A" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: %d + name: "local" + accounts: ["A"] + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmplA, -1))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + // Here, server "B" will have no pooling/accounts. + tmplB := ` + port: -1 + server_name: "B" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: -1 + name: "local" + routes: ["nats://127.0.0.1:%d"] + pool_size: -1 + } + ` + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmplB, o1.Cluster.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + l := &captureErrorLogger{errCh: make(chan string, 100)} + s2.SetLogger(l, false, false) + + // Now connect s3 to s1. Server s1 should not gossip to s2 information + // about pinned-account routes or extra routes from the pool. + tmplC := ` + port: -1 + server_name: "C" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: -1 + name: "local" + routes: ["nats://127.0.0.1:%d"] + accounts: ["A"] + } + ` + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmplC, o1.Cluster.Port))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + // We should not have had s2 try to create dedicated routes for "A" or "$SYS" + tm := time.NewTimer(time.Second) + defer tm.Stop() + for { + select { + case err := <-l.errCh: + if strings.Contains(err, "dedicated route") { + t.Fatalf("Server s2 should not have tried to create a dedicated route: %s", err) + } + case <-tm.C: + return + } + } +} + +func TestRoutePerAccountGossipWorksWithOldServerSeed(t *testing.T) { + tmplA := ` + port: -1 + server_name: "A" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: %d + name: "local" + pool_size: %d + %s + } + ` + // Start with s1 being an "old" server, which does not support pooling/pinned-accounts. + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmplA, -1, -1, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + tmplBC := ` + port: -1 + server_name: "%s" + accounts { + A { users: [{user: "A", password: "pwd"}] } + B { users: [{user: "B", password: "pwd"}] } + } + cluster { + port: -1 + name: "local" + pool_size: 3 + routes: ["nats://127.0.0.1:%d"] + accounts: ["A"] + } + ` + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmplBC, "B", o1.Cluster.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + // Now connect s3 to s1 and make sure that s2 connects properly to s3. + conf3 := createConfFile(t, []byte(fmt.Sprintf(tmplBC, "C", o1.Cluster.Port))) + s3, _ := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + checkRoutes := func(s *Server, expected int) { + t.Helper() + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + if nr := s.NumRoutes(); nr != expected { + return fmt.Errorf("Server %q should have %v routes, got %v", s.Name(), expected, nr) + } + return nil + }) + } + // Since s1 has no pooling/pinned-accounts, there should be only 2 routes, + // one to s2 and one to s3. + checkRoutes(s1, 2) + // s2 and s3 should have 1 route to s1 and 3(pool)+"A"+"$SYS" == 6 + checkRoutes(s2, 6) + checkRoutes(s3, 6) + + s1.Shutdown() + + // The server s1 will now support pooling and accounts pinning. + // Restart the server s1 with the same cluster port otherwise + // s2/s3 would not be able to reconnect. + conf1 = createConfFile(t, []byte(fmt.Sprintf(tmplA, o1.Cluster.Port, 3, "accounts: [\"A\"]"))) + s1, _ = RunServerWithConfig(conf1) + defer s1.Shutdown() + // Make sure reconnect occurs. We should now have 5 routes. + checkClusterFormed(t, s1, s2, s3) + // Now all servers should have 3+2 to each other, so 10 total. + checkRoutes(s1, 10) + checkRoutes(s2, 10) + checkRoutes(s3, 10) +} + func TestRoutePoolPerAccountSubUnsubProtoParsing(t *testing.T) { for _, test := range []struct { name string @@ -2930,9 +3167,11 @@ func TestRoutePoolAndPerAccountOperatorMode(t *testing.T) { return nil }) } - // Route for account "apub" should be a dedicated route + // Route for accounts "apub" and "spub" should be a dedicated route checkRoute(s1, apub, true) checkRoute(s2, apub, true) + checkRoute(s1, spub, true) + checkRoute(s2, spub, true) // Route for account "bpub" should not checkRoute(s1, bpub, false) checkRoute(s2, bpub, false) @@ -2959,9 +3198,12 @@ func TestRoutePoolAndPerAccountOperatorMode(t *testing.T) { // before doing the config reload on s2. checkRoute(s1, bpub, true) checkRoute(s2, bpub, true) - // Account "aoub" should still have its dedicated route + // Account "apub" should still have its dedicated route checkRoute(s1, apub, true) checkRoute(s2, apub, true) + // So the system account + checkRoute(s1, spub, true) + checkRoute(s2, spub, true) // Let's complete the config reload on srvb reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port), fmt.Sprintf(",\"%s\"", bpub)))