diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 683b3940..13322e3e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2376,7 +2376,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ci := js.clusterInfo(rg) mset.checkClusterInfo(ci) - newPeers, _, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas) + newPeers, oldPeers, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas) // If we are part of the new peerset and we have been passed the baton. // We will handle scale down. @@ -2402,6 +2402,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps continue } + // We are good to go, can scale down here. + for _, p := range oldPeers { + n.ProposeRemovePeer(p) + } + csa := sa.copyGroup() csa.Group.Peers = newPeers csa.Group.Preferred = ourPeerId @@ -2425,6 +2430,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Check if we have a quorom. if current >= neededCurrent { s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader) + n.UpdateKnownPeers(newPeers) n.StepDown(newLeaderPeer) } } diff --git a/server/raft.go b/server/raft.go index f0d8abc6..9ce0b1a4 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1364,7 +1364,15 @@ func (n *raft) StepDown(preferred ...string) error { if maybeLeader != noLeader { n.debug("Selected %q for new leader", maybeLeader) prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader))) - time.AfterFunc(250*time.Millisecond, func() { stepdown.push(noLeader) }) + time.AfterFunc(250*time.Millisecond, func() { + n.RLock() + stillLeader := n.state == Leader + n.RUnlock() + // If we are still the leader force a stepdown. + if stillLeader { + stepdown.push(noLeader) + } + }) } else { // Force us to stepdown here. n.debug("Stepping down") @@ -1402,6 +1410,7 @@ func (n *raft) campaign() error { func (n *raft) xferCampaign() error { n.debug("Starting transfer campaign") if n.state == Leader { + n.lxfer = false return errAlreadyLeader } n.resetElect(10 * time.Millisecond)