mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
@@ -912,7 +912,7 @@ func (s *Server) remoteServerShutdown(sub *subscription, _ *client, subject, rep
|
||||
}
|
||||
// Additional processing here.
|
||||
node := string(getHash(si.Name))
|
||||
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, true})
|
||||
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, true, true})
|
||||
}
|
||||
|
||||
// remoteServerUpdate listens for statsz updates from other servers.
|
||||
@@ -930,7 +930,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, _ *client, subject, reply
|
||||
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
|
||||
s.mu.Unlock()
|
||||
}
|
||||
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false})
|
||||
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false, si.JetStream})
|
||||
}
|
||||
|
||||
// updateRemoteServer is called when we have an update from a remote server.
|
||||
@@ -961,7 +961,7 @@ func (s *Server) processNewServer(ms *ServerInfo) {
|
||||
s.ensureGWsInterestOnlyForLeafNodes()
|
||||
// Add to our nodeToName
|
||||
node := string(getHash(ms.Name))
|
||||
s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false})
|
||||
s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false, ms.JetStream})
|
||||
}
|
||||
|
||||
// If GW is enabled on this server and there are any leaf node connections,
|
||||
|
||||
@@ -740,6 +740,7 @@ func (js *jetStream) monitorCluster() {
|
||||
}
|
||||
|
||||
isRecovering := true
|
||||
beenLeader := false
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -767,12 +768,55 @@ func (js *jetStream) monitorCluster() {
|
||||
}
|
||||
case isLeader = <-lch:
|
||||
js.processLeaderChange(isLeader)
|
||||
if isLeader && !beenLeader {
|
||||
beenLeader = true
|
||||
if n.NeedSnapshot() {
|
||||
if err := n.InstallSnapshot(js.metaSnapshot()); err != nil {
|
||||
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
|
||||
}
|
||||
}
|
||||
js.checkClusterSize()
|
||||
}
|
||||
case <-t.C:
|
||||
doSnapshot()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is called on first leader transition to double check the peers and cluster set size.
|
||||
func (js *jetStream) checkClusterSize() {
|
||||
s, n := js.server(), js.getMetaGroup()
|
||||
if n == nil {
|
||||
return
|
||||
}
|
||||
// We will check that we have a correct cluster set size by checking for any non-js servers
|
||||
// which can happen in mixed mode.
|
||||
s.Debugf("Checking JetStream cluster size")
|
||||
ps := n.(*raft).currentPeerState()
|
||||
if len(ps.knownPeers) >= ps.clusterSize {
|
||||
return
|
||||
}
|
||||
|
||||
// If we are here our known set as the leader is not the same as the cluster size.
|
||||
// Check to see if we have a mixed mode setup.
|
||||
var totalJS int
|
||||
for _, p := range s.ActivePeers() {
|
||||
if si, ok := s.nodeToInfo.Load(p); ok && si != nil {
|
||||
if si.(nodeInfo).js {
|
||||
totalJS++
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we have less then our cluster size adjust that here. Can not do individual peer removals since
|
||||
// they will not be in the tracked peers.
|
||||
if totalJS < ps.clusterSize {
|
||||
s.Debugf("Adjusting JetStream cluster size from %d to %d", ps.clusterSize, totalJS)
|
||||
if err := n.AdjustClusterSize(totalJS); err != nil {
|
||||
s.Warnf("Error adjusting JetStream cluster size: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Represents our stable meta state that we can write out.
|
||||
type writeableStreamAssignment struct {
|
||||
Client *ClientInfo `json:"client,omitempty"`
|
||||
|
||||
@@ -57,6 +57,7 @@ type RaftNode interface {
|
||||
Peers() []*Peer
|
||||
ProposeAddPeer(peer string) error
|
||||
ProposeRemovePeer(peer string) error
|
||||
AdjustClusterSize(csz int) error
|
||||
ApplyC() <-chan *CommittedEntry
|
||||
PauseApply()
|
||||
ResumeApply()
|
||||
@@ -683,6 +684,28 @@ func (n *raft) ProposeRemovePeer(peer string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AdjustClusterSize will change the cluster set size.
|
||||
// Must be the leader.
|
||||
func (n *raft) AdjustClusterSize(csz int) error {
|
||||
n.Lock()
|
||||
|
||||
if n.state != Leader {
|
||||
n.Unlock()
|
||||
return errNotLeader
|
||||
}
|
||||
// Same floor as bootstrap.
|
||||
if csz < 2 {
|
||||
csz = 2
|
||||
}
|
||||
// Adjust.
|
||||
n.csz = csz
|
||||
n.qn = n.csz/2 + 1
|
||||
n.Unlock()
|
||||
|
||||
n.sendPeerState()
|
||||
return nil
|
||||
}
|
||||
|
||||
// PauseApply will allow us to pause processing of append entries onto our
|
||||
// external apply chan.
|
||||
func (n *raft) PauseApply() {
|
||||
|
||||
@@ -1418,7 +1418,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
|
||||
if !exists {
|
||||
s.routes[c.cid] = c
|
||||
s.remotes[id] = c
|
||||
s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, id, false})
|
||||
s.nodeToInfo.Store(c.route.hash, nodeInfo{c.route.remoteName, s.info.Cluster, id, false, info.JetStream})
|
||||
c.mu.Lock()
|
||||
c.route.connectURLs = info.ClientConnectURLs
|
||||
c.route.wsConnURLs = info.WSConnectURLs
|
||||
|
||||
@@ -253,6 +253,7 @@ type nodeInfo struct {
|
||||
cluster string
|
||||
id string
|
||||
offline bool
|
||||
js bool
|
||||
}
|
||||
|
||||
// Make sure all are 64bits for atomic use
|
||||
@@ -355,7 +356,7 @@ func NewServer(opts *Options) (*Server, error) {
|
||||
|
||||
// Place ourselves in some lookup maps.
|
||||
ourNode := string(getHash(serverName))
|
||||
s.nodeToInfo.Store(ourNode, nodeInfo{serverName, opts.Cluster.Name, info.ID, false})
|
||||
s.nodeToInfo.Store(ourNode, nodeInfo{serverName, opts.Cluster.Name, info.ID, false, opts.JetStream})
|
||||
|
||||
s.routeResolver = opts.Cluster.resolver
|
||||
if s.routeResolver == nil {
|
||||
|
||||
Reference in New Issue
Block a user