diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c5484b2c..e118a689 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -779,6 +779,10 @@ func (js *jetStream) monitorCluster() { } case <-t.C: doSnapshot() + // Periodically check the cluster size. + if n.Leader() { + js.checkClusterSize() + } } } } @@ -791,16 +795,26 @@ func (js *jetStream) checkClusterSize() { } // We will check that we have a correct cluster set size by checking for any non-js servers // which can happen in mixed mode. - s.Debugf("Checking JetStream cluster size") ps := n.(*raft).currentPeerState() if len(ps.knownPeers) >= ps.clusterSize { return } + // Grab our active peers. + peers := s.ActivePeers() + + // If we have not registered all of our peers yet we can't do + // any adjustments based on a mixed mode. We will periodically check back. + if len(peers) < ps.clusterSize { + return + } + + s.Debugf("Checking JetStream cluster size") + // If we are here our known set as the leader is not the same as the cluster size. // Check to see if we have a mixed mode setup. var totalJS int - for _, p := range s.ActivePeers() { + for _, p := range peers { if si, ok := s.nodeToInfo.Load(p); ok && si != nil { if si.(nodeInfo).js { totalJS++ diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index fad1ee02..a7a89f8b 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4005,6 +4005,9 @@ func TestJetStreamClusterSuperClusterCrossClusterConsumerInterest(t *testing.T) sc := createJetStreamSuperCluster(t, 3, 3) defer sc.shutdown() + // Since we need all of the peers accounted for to add the stream wait for all to be present. + sc.waitOnPeerCount(9) + // Client based API - Connect to Cluster C1. Stream and consumer will live in C2. s := sc.clusterForName("C1").randomServer() nc, js := jsClientConnect(t, s) @@ -5626,6 +5629,21 @@ func (sc *supercluster) randomCluster() *cluster { return clusters[0] } +func (sc *supercluster) waitOnPeerCount(n int) { + sc.t.Helper() + sc.waitOnLeader() + leader := sc.leader() + expires := time.Now().Add(20 * time.Second) + for time.Now().Before(expires) { + peers := leader.JetStreamClusterPeers() + if len(peers) == n { + return + } + time.Sleep(100 * time.Millisecond) + } + sc.t.Fatalf("Expected a super cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers())) +} + var jsClusterMirrorSourceImportsTempl = ` listen: 127.0.0.1:-1 server_name: %s