mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
When checking cluster size we need to make sure we have heard from all peers before making adjustments.
Also check back periodically. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -779,6 +779,10 @@ func (js *jetStream) monitorCluster() {
|
||||
}
|
||||
case <-t.C:
|
||||
doSnapshot()
|
||||
// Periodically check the cluster size.
|
||||
if n.Leader() {
|
||||
js.checkClusterSize()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -791,16 +795,26 @@ func (js *jetStream) checkClusterSize() {
|
||||
}
|
||||
// We will check that we have a correct cluster set size by checking for any non-js servers
|
||||
// which can happen in mixed mode.
|
||||
s.Debugf("Checking JetStream cluster size")
|
||||
ps := n.(*raft).currentPeerState()
|
||||
if len(ps.knownPeers) >= ps.clusterSize {
|
||||
return
|
||||
}
|
||||
|
||||
// Grab our active peers.
|
||||
peers := s.ActivePeers()
|
||||
|
||||
// If we have not registered all of our peers yet we can't do
|
||||
// any adjustments based on a mixed mode. We will periodically check back.
|
||||
if len(peers) < ps.clusterSize {
|
||||
return
|
||||
}
|
||||
|
||||
s.Debugf("Checking JetStream cluster size")
|
||||
|
||||
// If we are here our known set as the leader is not the same as the cluster size.
|
||||
// Check to see if we have a mixed mode setup.
|
||||
var totalJS int
|
||||
for _, p := range s.ActivePeers() {
|
||||
for _, p := range peers {
|
||||
if si, ok := s.nodeToInfo.Load(p); ok && si != nil {
|
||||
if si.(nodeInfo).js {
|
||||
totalJS++
|
||||
|
||||
@@ -4005,6 +4005,9 @@ func TestJetStreamClusterSuperClusterCrossClusterConsumerInterest(t *testing.T)
|
||||
sc := createJetStreamSuperCluster(t, 3, 3)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Since we need all of the peers accounted for to add the stream wait for all to be present.
|
||||
sc.waitOnPeerCount(9)
|
||||
|
||||
// Client based API - Connect to Cluster C1. Stream and consumer will live in C2.
|
||||
s := sc.clusterForName("C1").randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
@@ -5626,6 +5629,21 @@ func (sc *supercluster) randomCluster() *cluster {
|
||||
return clusters[0]
|
||||
}
|
||||
|
||||
func (sc *supercluster) waitOnPeerCount(n int) {
|
||||
sc.t.Helper()
|
||||
sc.waitOnLeader()
|
||||
leader := sc.leader()
|
||||
expires := time.Now().Add(20 * time.Second)
|
||||
for time.Now().Before(expires) {
|
||||
peers := leader.JetStreamClusterPeers()
|
||||
if len(peers) == n {
|
||||
return
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
sc.t.Fatalf("Expected a super cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers()))
|
||||
}
|
||||
|
||||
var jsClusterMirrorSourceImportsTempl = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
|
||||
Reference in New Issue
Block a user