mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
On cold start in mixed mode if the js servers were not > non-js we could stall.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -549,12 +549,11 @@ func (s *Server) eventsEnabled() bool {
|
||||
// from a system events perspective.
|
||||
func (s *Server) TrackedRemoteServers() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.running || !s.eventsEnabled() {
|
||||
return -1
|
||||
}
|
||||
ns := len(s.sys.servers)
|
||||
s.mu.Unlock()
|
||||
return ns
|
||||
return len(s.sys.servers)
|
||||
}
|
||||
|
||||
// Check for orphan servers who may have gone away without notification.
|
||||
@@ -672,12 +671,15 @@ func (s *Server) sendStatsz(subj string) {
|
||||
jStat.Stats = js.usageStats()
|
||||
if mg := js.getMetaGroup(); mg != nil {
|
||||
if mg.Leader() {
|
||||
jStat.Meta = s.raftNodeToClusterInfo(mg)
|
||||
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
|
||||
jStat.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()}
|
||||
}
|
||||
} else {
|
||||
// non leader only include a shortened version without peers
|
||||
jStat.Meta = &ClusterInfo{
|
||||
Name: s.ClusterName(),
|
||||
jStat.Meta = &MetaClusterInfo{
|
||||
Name: mg.Group(),
|
||||
Leader: s.serverNameForNode(mg.GroupLeader()),
|
||||
Size: mg.ClusterSize(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1088,6 +1090,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, _ *client, _ *Account, su
|
||||
if !s.sameDomain(si.Domain) {
|
||||
return
|
||||
}
|
||||
|
||||
node := string(getHash(si.Name))
|
||||
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, false, si.JetStream})
|
||||
}
|
||||
|
||||
@@ -153,6 +153,24 @@ const (
|
||||
defaultMetaFSBlkSize = 1024 * 1024
|
||||
)
|
||||
|
||||
// Returns information useful in mixed mode.
|
||||
func (s *Server) trackedJetStreamServers() (js, total int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.running || !s.eventsEnabled() {
|
||||
return -1, -1
|
||||
}
|
||||
s.nodeToInfo.Range(func(k, v interface{}) bool {
|
||||
si := v.(nodeInfo)
|
||||
if si.js {
|
||||
js++
|
||||
}
|
||||
total++
|
||||
return true
|
||||
})
|
||||
return js, total
|
||||
}
|
||||
|
||||
func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
|
||||
s.mu.Lock()
|
||||
shutdown := s.shutdown
|
||||
@@ -726,6 +744,11 @@ func (js *jetStream) monitorCluster() {
|
||||
t := time.NewTicker(compactInterval)
|
||||
defer t.Stop()
|
||||
|
||||
// Used to check cold boot cluster when possibly in mixed mode.
|
||||
const leaderCheckInterval = time.Second
|
||||
lt := time.NewTicker(leaderCheckInterval)
|
||||
defer lt.Stop()
|
||||
|
||||
var (
|
||||
isLeader bool
|
||||
lastSnap []byte
|
||||
@@ -785,6 +808,20 @@ func (js *jetStream) monitorCluster() {
|
||||
if n.Leader() {
|
||||
js.checkClusterSize()
|
||||
}
|
||||
case <-lt.C:
|
||||
s.Debugf("Checking JetStream cluster state")
|
||||
// If we have a current leader or had one in the past we can cancel this here since the metaleader
|
||||
// will be in charge of all peer state changes.
|
||||
// For cold boot only.
|
||||
if n.GroupLeader() != _EMPTY_ || n.HadPreviousLeader() {
|
||||
lt.Stop()
|
||||
continue
|
||||
}
|
||||
// If we are here we do not have a leader and we did not have a previous one, so cold start.
|
||||
// Check to see if we can adjust our cluster size.
|
||||
if js, total := s.trackedJetStreamServers(); js < total {
|
||||
n.AdjustBootClusterSize(js)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8765,6 +8765,30 @@ func TestJetStreamAccountLimitsAndRestart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) {
|
||||
// Purposely make this unbalanced. Without changes this will never form a quorum to elect the meta-leader.
|
||||
c := createMixedModeCluster(t, jsMixedModeGlobalAccountTempl, "MMCS5", _EMPTY_, 3, 4, false)
|
||||
defer c.shutdown()
|
||||
|
||||
// Make sure we report cluster size.
|
||||
checkClusterSize := func(s *Server) {
|
||||
t.Helper()
|
||||
jsi, err := s.Jsz(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if jsi.Meta == nil {
|
||||
t.Fatalf("Expected a cluster info")
|
||||
}
|
||||
if jsi.Meta.Size != 3 {
|
||||
t.Fatalf("Expected cluster size to be adjusted to %d, but got %d", 3, jsi.Meta.Size)
|
||||
}
|
||||
}
|
||||
|
||||
checkClusterSize(c.leader())
|
||||
checkClusterSize(c.randomNonLeader())
|
||||
}
|
||||
|
||||
// Support functions
|
||||
|
||||
// Used to setup superclusters for tests.
|
||||
|
||||
@@ -1142,7 +1142,7 @@ type Varz struct {
|
||||
type JetStreamVarz struct {
|
||||
Config *JetStreamConfig `json:"config,omitempty"`
|
||||
Stats *JetStreamStats `json:"stats,omitempty"`
|
||||
Meta *ClusterInfo `json:"meta,omitempty"`
|
||||
Meta *MetaClusterInfo `json:"meta,omitempty"`
|
||||
}
|
||||
|
||||
// ClusterOptsVarz contains monitoring cluster information
|
||||
@@ -1286,7 +1286,9 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
|
||||
}
|
||||
v.Stats = js.usageStats()
|
||||
if mg := js.getMetaGroup(); mg != nil {
|
||||
v.Meta = s.raftNodeToClusterInfo(mg)
|
||||
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
|
||||
v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2391,19 +2393,28 @@ type AccountDetail struct {
|
||||
Streams []StreamDetail `json:"stream_detail,omitempty"`
|
||||
}
|
||||
|
||||
// LeafInfo has detailed information on each remote leafnode connection.
|
||||
// MetaClusterInfo shows information about the meta group.
|
||||
type MetaClusterInfo struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Leader string `json:"leader,omitempty"`
|
||||
Replicas []*PeerInfo `json:"replicas,omitempty"`
|
||||
Size int `json:"cluster_size"`
|
||||
}
|
||||
|
||||
// JSInfo has detailed information on JetStream.
|
||||
type JSInfo struct {
|
||||
ID string `json:"server_id"`
|
||||
Now time.Time `json:"now"`
|
||||
Disabled bool `json:"disabled,omitempty"`
|
||||
Config JetStreamConfig `json:"config,omitempty"`
|
||||
JetStreamStats
|
||||
APICalls int64 `json:"current_api_calls"`
|
||||
Streams int `json:"total_streams,omitempty"`
|
||||
Consumers int `json:"total_consumers,omitempty"`
|
||||
Messages uint64 `json:"total_messages,omitempty"`
|
||||
Bytes uint64 `json:"total_message_bytes,omitempty"`
|
||||
Meta *ClusterInfo `json:"meta_cluster,omitempty"`
|
||||
APICalls int64 `json:"current_api_calls"`
|
||||
Streams int `json:"total_streams,omitempty"`
|
||||
Consumers int `json:"total_consumers,omitempty"`
|
||||
Messages uint64 `json:"total_messages,omitempty"`
|
||||
Bytes uint64 `json:"total_message_bytes,omitempty"`
|
||||
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
|
||||
|
||||
// aggregate raft info
|
||||
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
|
||||
}
|
||||
@@ -2554,7 +2565,11 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
s.js.mu.RUnlock()
|
||||
jsi.APICalls = atomic.LoadInt64(&s.js.apiCalls)
|
||||
|
||||
jsi.Meta = s.raftNodeToClusterInfo(s.js.getMetaGroup())
|
||||
if mg := s.js.getMetaGroup(); mg != nil {
|
||||
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
|
||||
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Replicas: ci.Replicas, Size: mg.ClusterSize()}
|
||||
}
|
||||
}
|
||||
jsi.JetStreamStats = *s.js.usageStats()
|
||||
|
||||
filterIdx := -1
|
||||
|
||||
@@ -58,6 +58,8 @@ type RaftNode interface {
|
||||
ProposeAddPeer(peer string) error
|
||||
ProposeRemovePeer(peer string) error
|
||||
AdjustClusterSize(csz int) error
|
||||
AdjustBootClusterSize(csz int) error
|
||||
ClusterSize() int
|
||||
ApplyC() <-chan *CommittedEntry
|
||||
PauseApply()
|
||||
ResumeApply()
|
||||
@@ -235,22 +237,23 @@ type RaftConfig struct {
|
||||
}
|
||||
|
||||
var (
|
||||
errProposalFailed = errors.New("raft: proposal failed")
|
||||
errNotLeader = errors.New("raft: not leader")
|
||||
errAlreadyLeader = errors.New("raft: already leader")
|
||||
errNilCfg = errors.New("raft: no config given")
|
||||
errCorruptPeers = errors.New("raft: corrupt peer state")
|
||||
errStepdownFailed = errors.New("raft: stepdown failed")
|
||||
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
|
||||
errEntryStoreFailed = errors.New("raft: could not storeentry to WAL")
|
||||
errNodeClosed = errors.New("raft: node is closed")
|
||||
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
|
||||
errNoSnapAvailable = errors.New("raft: no snapshot available")
|
||||
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
|
||||
errSnapshotCorrupt = errors.New("raft: snapshot corrupt")
|
||||
errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader")
|
||||
errStepdownNoPeer = errors.New("raft: stepdown failed, could not match new leader")
|
||||
errNoPeerState = errors.New("raft: no peerstate")
|
||||
errProposalFailed = errors.New("raft: proposal failed")
|
||||
errNotLeader = errors.New("raft: not leader")
|
||||
errAlreadyLeader = errors.New("raft: already leader")
|
||||
errNilCfg = errors.New("raft: no config given")
|
||||
errCorruptPeers = errors.New("raft: corrupt peer state")
|
||||
errStepdownFailed = errors.New("raft: stepdown failed")
|
||||
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
|
||||
errEntryStoreFailed = errors.New("raft: could not storeentry to WAL")
|
||||
errNodeClosed = errors.New("raft: node is closed")
|
||||
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
|
||||
errNoSnapAvailable = errors.New("raft: no snapshot available")
|
||||
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
|
||||
errSnapshotCorrupt = errors.New("raft: snapshot corrupt")
|
||||
errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader")
|
||||
errStepdownNoPeer = errors.New("raft: stepdown failed, could not match new leader")
|
||||
errNoPeerState = errors.New("raft: no peerstate")
|
||||
errAdjustBootCluster = errors.New("raft: can not adjust boot peer size on established group")
|
||||
)
|
||||
|
||||
// This will bootstrap a raftNode by writing its config into the store directory.
|
||||
@@ -703,11 +706,39 @@ func (n *raft) ProposeRemovePeer(peer string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClusterSize reports back the total cluster size.
|
||||
// This effects quorum etc.
|
||||
func (n *raft) ClusterSize() int {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
return n.csz
|
||||
}
|
||||
|
||||
// AdjustBootClusterSize can be called to adjust the boot cluster size.
|
||||
// Will error if called on a group with a leader or a previous leader.
|
||||
// This can be helpful in mixed mode.
|
||||
func (n *raft) AdjustBootClusterSize(csz int) error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
if n.leader != noLeader || n.pleader {
|
||||
return errAdjustBootCluster
|
||||
}
|
||||
// Same floor as bootstrap.
|
||||
if csz < 2 {
|
||||
csz = 2
|
||||
}
|
||||
// Adjust.
|
||||
n.csz = csz
|
||||
n.qn = n.csz/2 + 1
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AdjustClusterSize will change the cluster set size.
|
||||
// Must be the leader.
|
||||
func (n *raft) AdjustClusterSize(csz int) error {
|
||||
n.Lock()
|
||||
|
||||
if n.state != Leader {
|
||||
n.Unlock()
|
||||
return errNotLeader
|
||||
|
||||
Reference in New Issue
Block a user