diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c2d76470..4e3d1c1e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1322,46 +1322,6 @@ func (js *jetStream) processRemovePeer(peer string) { } } -// Remove old peers after the new peers are caught up. -// We are the stream leader here -func (js *jetStream) truncateOldPeers(mset *stream, newPreferred string) { - // Make sure still valid. - mset.mu.Lock() - isValid := mset.qch != nil - mset.mu.Unlock() - - if !isValid { - return - } - - sa := mset.streamAssignment() - - js.mu.Lock() - defer js.mu.Unlock() - - // Make sure still valid. - if js.srv == nil || !js.srv.isRunning() { - return - } - - cc, csa := js.cluster, sa.copyGroup() - csa.Group.Peers = csa.Group.Peers[len(csa.Group.Peers)-csa.Config.Replicas:] - // Now do consumers still needing truncating first here, followed by the owning stream. - for _, ca := range csa.consumers { - if r := ca.Config.replicas(csa.Config); r != len(ca.Group.Peers) { - cca := ca.copyGroup() - cca.Group.Peers = cca.Group.Peers[len(cca.Group.Peers)-r:] - cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca)) - } - } - - si, _ := js.srv.nodeToInfo.Load(newPreferred) - csa.Group.Cluster = si.(nodeInfo).cluster - - csa.Group.Preferred = newPreferred - cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) -} - // Assumes all checks have already been done. func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) bool { js.mu.Lock() @@ -1958,6 +1918,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // First make sure all consumer are properly scaled down toSkip := len(rg.Peers) - replicas newPeerSet := rg.Peers[toSkip:] + oldPeerSet := rg.Peers[:toSkip] newPeerTbl := map[string]struct{}{} for _, peer := range newPeerSet { newPeerTbl[peer] = struct{}{} @@ -1987,7 +1948,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps js.mu.RUnlock() if waitOnConsumerScaledown { - continue } @@ -2002,7 +1962,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if !foundLeader { n.StepDown(firstPeer) } else { - js.truncateOldPeers(mset, selfId) + for _, p := range oldPeerSet { + n.ProposeRemovePeer(p) + } + csa := sa.copyGroup() + csa.Group.Peers = newPeerSet + csa.Group.Cluster = s.ClusterName() // use cluster name of leader/self + csa.Group.Preferred = firstPeer + cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) } } case err := <-restoreDoneCh: @@ -3794,6 +3761,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } toSkip := len(rg.Peers) - replicas newPeerSet := rg.Peers[toSkip:] + oldPeerSet := rg.Peers[:toSkip] ci := js.clusterInfo(rg) @@ -3809,7 +3777,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { n.StepDown(firstPeer) } else { // truncate this consumer + for _, p := range oldPeerSet { + n.ProposeRemovePeer(p) + } cca := ca.copyGroup() + cca.Group.Cluster = s.ClusterName() // use cluster name of leader/self cca.Group.Peers = newPeerSet cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca)) } diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index ca19a9a5..82bca608 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -358,11 +358,11 @@ func TestJetStreamJWTMove(t *testing.T) { require_NoError(t, err) require_Equal(t, ci.Cluster.Name, "C1") - sc.waitOnStreamLeader(aExpPub, "MOVE-ME") + sc.clusterForName("C2").waitOnStreamLeader(aExpPub, "MOVE-ME") - checkFor(t, 20*time.Second, 250*time.Millisecond, func() error { + checkFor(t, 30*time.Second, 250*time.Millisecond, func() error { if si, err := js.StreamInfo("MOVE-ME"); err != nil { - return err + return fmt.Errorf("stream: %v", err) } else if si.Cluster.Name != "C2" { return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name) } else if !strings.HasPrefix(si.Cluster.Leader, "C2-") { @@ -374,7 +374,7 @@ func TestJetStreamJWTMove(t *testing.T) { } // Now make sure consumer has leader etc.. if ci, err := js.ConsumerInfo("MOVE-ME", "dur"); err != nil { - return err + return fmt.Errorf("stream: %v", err) } else if ci.Cluster.Name != "C2" { return fmt.Errorf("Wrong cluster: %q", ci.Cluster.Name) } else if ci.Cluster.Leader == _EMPTY_ { @@ -404,6 +404,7 @@ func TestJetStreamJWTMove(t *testing.T) { test(t, 1, accClaim) }) }) + t.Run("non-tiered", func(t *testing.T) { accClaim := jwt.NewAccountClaims(aExpPub) accClaim.Limits.JetStreamLimits = jwt.JetStreamLimits{ @@ -416,7 +417,6 @@ func TestJetStreamJWTMove(t *testing.T) { test(t, 1, accClaim) }) }) - } func TestJetStreamJWTClusteredTiers(t *testing.T) { diff --git a/server/raft.go b/server/raft.go index 3a4502a5..7cc3fca8 100644 --- a/server/raft.go +++ b/server/raft.go @@ -137,7 +137,7 @@ type raft struct { csz int qn int peers map[string]*lps - removed map[string]string + removed map[string]struct{} acks map[uint64]map[string]struct{} pae map[uint64]*appendEntry elect *time.Timer @@ -694,6 +694,10 @@ func (n *raft) ProposeAddPeer(peer string) error { // As a leader if we are proposing to remove a peer assume its already gone. func (n *raft) doRemovePeerAsLeader(peer string) { n.Lock() + if n.removed == nil { + n.removed = map[string]struct{}{} + } + n.removed[peer] = struct{}{} if _, ok := n.peers[peer]; ok { delete(n.peers, peer) // We should decrease our cluster size since we are tracking this peer and the peer is most likely already gone. @@ -2382,9 +2386,9 @@ func (n *raft) applyCommit(index uint64) error { // Make sure we have our removed map. if n.removed == nil { - n.removed = make(map[string]string) + n.removed = make(map[string]struct{}) } - n.removed[peer] = peer + n.removed[peer] = struct{}{} if _, ok := n.peers[peer]; ok { delete(n.peers, peer)