diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4e27eef7..091e8b30 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1248,9 +1248,9 @@ func (js *jetStream) processRemovePeer(peer string) { } } -// Remove old peers after the new leader has caught up and taken over. -// We are the stream leader. -func (js *jetStream) removeOldPeers(mset *stream) { +// Remove old peers after the new peers are caught up. +// We are the old stream leader here. +func (js *jetStream) removeOldPeers(mset *stream, newPreferred string) { // Make sure still valid. mset.mu.Lock() isValid := mset.qch != nil @@ -1275,11 +1275,8 @@ func (js *jetStream) removeOldPeers(mset *stream) { numExpandedPeers := len(csa.Group.Peers) csa.Group.Peers = csa.Group.Peers[:0] - if ci.Leader != _EMPTY_ { - csa.Group.Peers = append(csa.Group.Peers, string(getHash(ci.Leader))) - } for _, r := range ci.Replicas { - if r.cluster == ci.Name { + if r.cluster != ci.Name { csa.Group.Peers = append(csa.Group.Peers, r.peer) } } @@ -1290,12 +1287,14 @@ func (js *jetStream) removeOldPeers(mset *stream) { numPeers := len(cca.Group.Peers) if numPeers == numExpandedPeers { cca.Group.Peers = csa.Group.Peers + cca.Group.Preferred = _EMPTY_ } else { cca.Group.Peers = cca.Group.Peers[len(cca.Group.Peers)-1:] } cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca)) } + csa.Group.Preferred = newPreferred cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) } @@ -1738,8 +1737,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps startMigrationMonitoring() } else { stopMigrationMonitoring() - // Delay before removing the old peers after assuming leadership. - time.AfterFunc(2*time.Second, func() { js.removeOldPeers(mset) }) } } case <-t.C: @@ -1767,21 +1764,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } // Track the new peers and check the ones that are current. var newPeers []*PeerInfo - quorum, offline := mset.cfg.Replicas/2+1, 0 + quorum := mset.cfg.Replicas/2 + 1 for _, r := range ci.Replicas { if r.cluster != ci.Name { if r.Current { newPeers = append(newPeers, r) - } else if r.Offline { - offline++ } } } // If all are current we are good, or if we have some offline and we have a quorum. - if lnp := len(newPeers); lnp == mset.cfg.Replicas || (offline > 0 && lnp >= quorum) { + if lnp := len(newPeers); lnp >= quorum { stopMigrationMonitoring() - // Transfer leadership for stream. - mset.raftNode().StepDown(newPeers[0].peer) + // Remove the old peers and transfer leadership. + time.AfterFunc(2*time.Second, func() { js.removeOldPeers(mset, newPeers[0].peer) }) } case err := <-restoreDoneCh: // We have completed a restore from snapshot on this server. The stream assignment has @@ -2515,7 +2510,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { if node := mset.raftNode(); node != nil { if node.Leader() { - node.StepDown() + node.StepDown(sa.Group.Preferred) } node.ProposeRemovePeer(ourID) } @@ -4641,6 +4636,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su consumers = append(consumers, cca) } } + } else { + // All other updates make sure no preferred is set. + rg.Preferred = _EMPTY_ } sa := &streamAssignment{Group: rg, Sync: osa.Sync, Created: osa.Created, Config: newCfg, Subject: subject, Reply: reply, Client: ci} diff --git a/server/raft.go b/server/raft.go index 5c3bef6d..77899c5b 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1270,20 +1270,15 @@ func (n *raft) StepDown(preferred ...string) error { n.debug("Can not transfer to preferred peer %q", preferred[0]) } - // If we have a new leader selected, let them start the vote process but do not actively stepdown. + // If we have a new leader selected, transfer over to them. if maybeLeader != noLeader { n.debug("Selected %q for new leader", maybeLeader) n.sendAppendEntry([]*Entry{{EntryLeaderTransfer, []byte(maybeLeader)}}) - // If we chose also stepdown. If we were told let them take over. - if len(preferred) == 0 { - n.debug("Stepping down") - stepdown.push(noLeader) - } - } else { - // Force us to stepdown here. - n.debug("Stepping down") - stepdown.push(noLeader) } + // Force us to stepdown here. + n.debug("Stepping down") + stepdown.push(noLeader) + return nil } @@ -1383,6 +1378,13 @@ func (n *raft) Peers() []*Peer { // Update our known set of peers. func (n *raft) UpdateKnownPeers(knownPeers []string) { n.Lock() + // If this is a scale up, let the normal add peer logic take precedence. + // Otherwise if the new peers are slow to start we stall ourselves. + if len(knownPeers) > len(n.peers) { + n.Unlock() + return + } + // Process like peer state update. ps := &peerState{knownPeers, len(knownPeers), n.extSt} n.processPeerState(ps) isLeader := n.state == Leader