diff --git a/server/events.go b/server/events.go index 9f86df76..e358ac6b 100644 --- a/server/events.go +++ b/server/events.go @@ -912,7 +912,7 @@ func (s *Server) remoteServerShutdown(sub *subscription, _ *client, subject, rep } // Additional processing here. node := string(getHash(si.Name)) - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, true}) + s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, true, true}) } // remoteServerUpdate listens for statsz updates from other servers. @@ -930,7 +930,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, _ *client, subject, reply s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) s.mu.Unlock() } - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false}) + s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false, si.JetStream}) } // updateRemoteServer is called when we have an update from a remote server. @@ -961,7 +961,7 @@ func (s *Server) processNewServer(ms *ServerInfo) { s.ensureGWsInterestOnlyForLeafNodes() // Add to our nodeToName node := string(getHash(ms.Name)) - s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false}) + s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false, ms.JetStream}) } // If GW is enabled on this server and there are any leaf node connections, diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c9f9f21e..fcc3e228 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -740,6 +740,7 @@ func (js *jetStream) monitorCluster() { } isRecovering := true + beenLeader := false for { select { @@ -767,12 +768,55 @@ func (js *jetStream) monitorCluster() { } case isLeader = <-lch: js.processLeaderChange(isLeader) + if isLeader && !beenLeader { + beenLeader = true + if n.NeedSnapshot() { + if err := n.InstallSnapshot(js.metaSnapshot()); err != nil { + s.Warnf("Error snapshotting JetStream cluster state: %v", err) + } + } + js.checkClusterSize() + } case <-t.C: doSnapshot() } } } +// This is called on first leader transition to double check the peers and cluster set size. +func (js *jetStream) checkClusterSize() { + s, n := js.server(), js.getMetaGroup() + if n == nil || !n.Leader() { + return + } + // We will check that we have a correct cluster set size by checking for any non-js servers + // which can happen in mixed mode. + s.Debugf("Checking JetStream cluster size") + ps := n.(*raft).currentPeerState() + if len(ps.knownPeers) >= ps.clusterSize { + return + } + + // If we are here our known set as the leader is not the same as the cluster size. + // Check to see if we have a mixed mode setup. + var totalJS int + for _, p := range s.ActivePeers() { + if si, ok := s.nodeToInfo.Load(p); ok && si != nil { + if si.(nodeInfo).js { + totalJS++ + } + } + } + // If we have less then our cluster size adjust that here. Can not do individual peer removals since + // they will not be in the tracked peers. + if totalJS < ps.clusterSize { + s.Debugf("Adjusting JetStream cluster size from %d to %d", ps.clusterSize, totalJS) + if err := n.AdjustClusterSize(totalJS); err != nil { + s.Warnf("Error adjusting JetStream cluster size: %v", err) + } + } +} + // Represents our stable meta state that we can write out. type writeableStreamAssignment struct { Client *ClientInfo `json:"client,omitempty"` diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 5dc425e8..df8d9686 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5422,6 +5422,51 @@ func TestJetStreamClusterAckPendingWithMaxRedelivered(t *testing.T) { }) } +func TestJetStreamClusterMixedMode(t *testing.T) { + c := createMixedModeCluster(t, "MM5", 3, 2) + defer c.shutdown() + + // Client based API - Non-JS server. + nc, js := jsClientConnect(t, c.serverByName("S-5")) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + ml := c.leader() + if ml == nil { + t.Fatalf("No metaleader") + } + + // Make sure we are tracking only the JS peers. + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + peers := ml.JetStreamClusterPeers() + if len(peers) == 3 { + return nil + } + return fmt.Errorf("Not correct number of peers, expected %d, got %d", 3, len(peers)) + }) + + // Grab the underlying raft structure and make sure the system adjusts its cluster set size. + meta := ml.getJetStream().getMetaGroup().(*raft) + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + ps := meta.currentPeerState() + if len(ps.knownPeers) != 3 { + return fmt.Errorf("Expected known peers to be 3, but got %+v", ps.knownPeers) + } + if ps.clusterSize != 3 { + return fmt.Errorf("Expected cluster size to be 3, but got %+v", ps) + } + return nil + }) +} + // Support functions // Used to setup superclusters for tests. @@ -5695,6 +5740,47 @@ var jsClusterImportsTempl = ` } ` +func createMixedModeCluster(t *testing.T, clusterName string, numJsServers, numNonServers int) *cluster { + if clusterName == "" || numJsServers < 1 || numNonServers < 1 { + t.Fatalf("Bad params") + } + + numServers := numJsServers + numNonServers + const startClusterPort = 23232 + + // Build out the routes that will be shared with all configs. + var routes []string + for cp := startClusterPort; cp < startClusterPort+numServers; cp++ { + routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp)) + } + routeConfig := strings.Join(routes, ",") + + // Go ahead and build configurations and start servers. + c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName} + + for cp := startClusterPort; cp < startClusterPort+numServers; cp++ { + storeDir := createDir(t, JetStreamStoreDir) + sn := fmt.Sprintf("S-%d", cp-startClusterPort+1) + conf := fmt.Sprintf(jsClusterTempl, sn, storeDir, clusterName, cp, routeConfig) + + // Disable JS by commmenting it out. + if cp-startClusterPort >= numJsServers { + conf = strings.Replace(conf, "jetstream: ", "#jetstream: ", 1) + } + + s, o := RunServerWithConfig(createConfFile(t, []byte(conf))) + c.servers = append(c.servers, s) + c.opts = append(c.opts, o) + } + c.t = t + + // Wait til we are formed and have a leader. + c.checkClusterFormed() + c.waitOnPeerCount(numJsServers) + + return c +} + // This will create a cluster that is explicitly configured for the routes, etc. // and also has a defined clustername. All configs for routes and cluster name will be the same. func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers int) *cluster { diff --git a/server/raft.go b/server/raft.go index 6a5a450e..54b6a59c 100644 --- a/server/raft.go +++ b/server/raft.go @@ -57,6 +57,7 @@ type RaftNode interface { Peers() []*Peer ProposeAddPeer(peer string) error ProposeRemovePeer(peer string) error + AdjustClusterSize(csz int) error ApplyC() <-chan *CommittedEntry PauseApply() ResumeApply() @@ -683,6 +684,28 @@ func (n *raft) ProposeRemovePeer(peer string) error { return nil } +// AdjustClusterSize will change the cluster set size. +// Must be the leader. +func (n *raft) AdjustClusterSize(csz int) error { + n.Lock() + + if n.state != Leader { + n.Unlock() + return errNotLeader + } + // Same floor as bootstrap. + if csz < 2 { + csz = 2 + } + // Adjust. + n.csz = csz + n.qn = n.csz/2 + 1 + n.Unlock() + + n.sendPeerState() + return nil +} + // PauseApply will allow us to pause processing of append entries onto our // external apply chan. func (n *raft) PauseApply() { diff --git a/server/route.go b/server/route.go index 9b2ee9d0..2dab025b 100644 --- a/server/route.go +++ b/server/route.go @@ -1418,7 +1418,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { if !exists { s.routes[c.cid] = c s.remotes[id] = c - s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, id, false}) + s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, id, false, info.JetStream}) c.mu.Lock() c.route.connectURLs = info.ClientConnectURLs c.route.wsConnURLs = info.WSConnectURLs diff --git a/server/server.go b/server/server.go index 23fa6b79..28c26870 100644 --- a/server/server.go +++ b/server/server.go @@ -253,6 +253,7 @@ type nodeInfo struct { cluster string id string offline bool + js bool } // Make sure all are 64bits for atomic use @@ -355,7 +356,7 @@ func NewServer(opts *Options) (*Server, error) { // Place ourselves in some lookup maps. ourNode := string(getHash(serverName)) - s.nodeToInfo.Store(ourNode, nodeInfo{serverName, opts.Cluster.Name, info.ID, false}) + s.nodeToInfo.Store(ourNode, nodeInfo{serverName, opts.Cluster.Name, info.ID, false, opts.JetStream}) s.routeResolver = opts.Cluster.resolver if s.routeResolver == nil { @@ -1584,6 +1585,12 @@ func (s *Server) Start() { } return true }) + + // We may be in a mixed mode here. + if !s.standAloneMode() && s.globalAccountOnly() { + s.checkJetStreamExports() + s.globalAccount().enableAllJetStreamServiceImports() + } } // Start monitoring if needed