From e438d2f5face8fbf9b418d236e0b6420022b10ed Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 9 Apr 2021 14:20:55 -0700 Subject: [PATCH] Mixed mode improvements. 1. When in mixed mode and only running the global account we now will check the account for JS. 2. Added code to decrease the cluster set size if we guessed wrong in mixed mode setup. Signed-off-by: Derek Collison --- server/events.go | 6 +-- server/jetstream_cluster.go | 44 ++++++++++++++++ server/jetstream_cluster_test.go | 86 ++++++++++++++++++++++++++++++++ server/raft.go | 23 +++++++++ server/route.go | 2 +- server/server.go | 9 +++- 6 files changed, 165 insertions(+), 5 deletions(-) diff --git a/server/events.go b/server/events.go index 9f86df76..e358ac6b 100644 --- a/server/events.go +++ b/server/events.go @@ -912,7 +912,7 @@ func (s *Server) remoteServerShutdown(sub *subscription, _ *client, subject, rep } // Additional processing here. node := string(getHash(si.Name)) - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, true}) + s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, true, true}) } // remoteServerUpdate listens for statsz updates from other servers. @@ -930,7 +930,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, _ *client, subject, reply s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) s.mu.Unlock() } - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false}) + s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false, si.JetStream}) } // updateRemoteServer is called when we have an update from a remote server. @@ -961,7 +961,7 @@ func (s *Server) processNewServer(ms *ServerInfo) { s.ensureGWsInterestOnlyForLeafNodes() // Add to our nodeToName node := string(getHash(ms.Name)) - s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false}) + s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false, ms.JetStream}) } // If GW is enabled on this server and there are any leaf node connections, diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c9f9f21e..fcc3e228 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -740,6 +740,7 @@ func (js *jetStream) monitorCluster() { } isRecovering := true + beenLeader := false for { select { @@ -767,12 +768,55 @@ func (js *jetStream) monitorCluster() { } case isLeader = <-lch: js.processLeaderChange(isLeader) + if isLeader && !beenLeader { + beenLeader = true + if n.NeedSnapshot() { + if err := n.InstallSnapshot(js.metaSnapshot()); err != nil { + s.Warnf("Error snapshotting JetStream cluster state: %v", err) + } + } + js.checkClusterSize() + } case <-t.C: doSnapshot() } } } +// This is called on first leader transition to double check the peers and cluster set size. +func (js *jetStream) checkClusterSize() { + s, n := js.server(), js.getMetaGroup() + if n == nil || !n.Leader() { + return + } + // We will check that we have a correct cluster set size by checking for any non-js servers + // which can happen in mixed mode. + s.Debugf("Checking JetStream cluster size") + ps := n.(*raft).currentPeerState() + if len(ps.knownPeers) >= ps.clusterSize { + return + } + + // If we are here our known set as the leader is not the same as the cluster size. + // Check to see if we have a mixed mode setup. + var totalJS int + for _, p := range s.ActivePeers() { + if si, ok := s.nodeToInfo.Load(p); ok && si != nil { + if si.(nodeInfo).js { + totalJS++ + } + } + } + // If we have less then our cluster size adjust that here. Can not do individual peer removals since + // they will not be in the tracked peers. + if totalJS < ps.clusterSize { + s.Debugf("Adjusting JetStream cluster size from %d to %d", ps.clusterSize, totalJS) + if err := n.AdjustClusterSize(totalJS); err != nil { + s.Warnf("Error adjusting JetStream cluster size: %v", err) + } + } +} + // Represents our stable meta state that we can write out. type writeableStreamAssignment struct { Client *ClientInfo `json:"client,omitempty"` diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 5dc425e8..df8d9686 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5422,6 +5422,51 @@ func TestJetStreamClusterAckPendingWithMaxRedelivered(t *testing.T) { }) } +func TestJetStreamClusterMixedMode(t *testing.T) { + c := createMixedModeCluster(t, "MM5", 3, 2) + defer c.shutdown() + + // Client based API - Non-JS server. + nc, js := jsClientConnect(t, c.serverByName("S-5")) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + ml := c.leader() + if ml == nil { + t.Fatalf("No metaleader") + } + + // Make sure we are tracking only the JS peers. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + peers := ml.JetStreamClusterPeers() + if len(peers) == 3 { + return nil + } + return fmt.Errorf("Not correct number of peers, expected %d, got %d", 3, len(peers)) + }) + + // Grab the underlying raft structure and make sure the system adjusts its cluster set size. + meta := ml.getJetStream().getMetaGroup().(*raft) + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + ps := meta.currentPeerState() + if len(ps.knownPeers) != 3 { + return fmt.Errorf("Expected known peers to be 3, but got %+v", ps.knownPeers) + } + if ps.clusterSize != 3 { + return fmt.Errorf("Expected cluster size to be 3, but got %+v", ps) + } + return nil + }) +} + // Support functions // Used to setup superclusters for tests. @@ -5695,6 +5740,47 @@ var jsClusterImportsTempl = ` } ` +func createMixedModeCluster(t *testing.T, clusterName string, numJsServers, numNonServers int) *cluster { + if clusterName == "" || numJsServers < 1 || numNonServers < 1 { + t.Fatalf("Bad params") + } + + numServers := numJsServers + numNonServers + const startClusterPort = 23232 + + // Build out the routes that will be shared with all configs. + var routes []string + for cp := startClusterPort; cp < startClusterPort+numServers; cp++ { + routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp)) + } + routeConfig := strings.Join(routes, ",") + + // Go ahead and build configurations and start servers. + c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName} + + for cp := startClusterPort; cp < startClusterPort+numServers; cp++ { + storeDir := createDir(t, JetStreamStoreDir) + sn := fmt.Sprintf("S-%d", cp-startClusterPort+1) + conf := fmt.Sprintf(jsClusterTempl, sn, storeDir, clusterName, cp, routeConfig) + + // Disable JS by commmenting it out. + if cp-startClusterPort >= numJsServers { + conf = strings.Replace(conf, "jetstream: ", "#jetstream: ", 1) + } + + s, o := RunServerWithConfig(createConfFile(t, []byte(conf))) + c.servers = append(c.servers, s) + c.opts = append(c.opts, o) + } + c.t = t + + // Wait til we are formed and have a leader. + c.checkClusterFormed() + c.waitOnPeerCount(numJsServers) + + return c +} + // This will create a cluster that is explicitly configured for the routes, etc. // and also has a defined clustername. All configs for routes and cluster name will be the same. func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers int) *cluster { diff --git a/server/raft.go b/server/raft.go index 6a5a450e..54b6a59c 100644 --- a/server/raft.go +++ b/server/raft.go @@ -57,6 +57,7 @@ type RaftNode interface { Peers() []*Peer ProposeAddPeer(peer string) error ProposeRemovePeer(peer string) error + AdjustClusterSize(csz int) error ApplyC() <-chan *CommittedEntry PauseApply() ResumeApply() @@ -683,6 +684,28 @@ func (n *raft) ProposeRemovePeer(peer string) error { return nil } +// AdjustClusterSize will change the cluster set size. +// Must be the leader. +func (n *raft) AdjustClusterSize(csz int) error { + n.Lock() + + if n.state != Leader { + n.Unlock() + return errNotLeader + } + // Same floor as bootstrap. + if csz < 2 { + csz = 2 + } + // Adjust. + n.csz = csz + n.qn = n.csz/2 + 1 + n.Unlock() + + n.sendPeerState() + return nil +} + // PauseApply will allow us to pause processing of append entries onto our // external apply chan. func (n *raft) PauseApply() { diff --git a/server/route.go b/server/route.go index 9b2ee9d0..2dab025b 100644 --- a/server/route.go +++ b/server/route.go @@ -1418,7 +1418,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { if !exists { s.routes[c.cid] = c s.remotes[id] = c - s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, id, false}) + s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, id, false, info.JetStream}) c.mu.Lock() c.route.connectURLs = info.ClientConnectURLs c.route.wsConnURLs = info.WSConnectURLs diff --git a/server/server.go b/server/server.go index 23fa6b79..28c26870 100644 --- a/server/server.go +++ b/server/server.go @@ -253,6 +253,7 @@ type nodeInfo struct { cluster string id string offline bool + js bool } // Make sure all are 64bits for atomic use @@ -355,7 +356,7 @@ func NewServer(opts *Options) (*Server, error) { // Place ourselves in some lookup maps. ourNode := string(getHash(serverName)) - s.nodeToInfo.Store(ourNode, nodeInfo{serverName, opts.Cluster.Name, info.ID, false}) + s.nodeToInfo.Store(ourNode, nodeInfo{serverName, opts.Cluster.Name, info.ID, false, opts.JetStream}) s.routeResolver = opts.Cluster.resolver if s.routeResolver == nil { @@ -1584,6 +1585,12 @@ func (s *Server) Start() { } return true }) + + // We may be in a mixed mode here. + if !s.standAloneMode() && s.globalAccountOnly() { + s.checkJetStreamExports() + s.globalAccount().enableAllJetStreamServiceImports() + } } // Start monitoring if needed