mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
Merge pull request #3043 from nats-io/move-improvements
Improvements to move semantics.
This commit is contained in:
@@ -1248,9 +1248,9 @@ func (js *jetStream) processRemovePeer(peer string) {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove old peers after the new leader has caught up and taken over.
|
||||
// We are the stream leader.
|
||||
func (js *jetStream) removeOldPeers(mset *stream) {
|
||||
// Remove old peers after the new peers are caught up.
|
||||
// We are the old stream leader here.
|
||||
func (js *jetStream) removeOldPeers(mset *stream, newPreferred string) {
|
||||
// Make sure still valid.
|
||||
mset.mu.Lock()
|
||||
isValid := mset.qch != nil
|
||||
@@ -1275,11 +1275,8 @@ func (js *jetStream) removeOldPeers(mset *stream) {
|
||||
numExpandedPeers := len(csa.Group.Peers)
|
||||
csa.Group.Peers = csa.Group.Peers[:0]
|
||||
|
||||
if ci.Leader != _EMPTY_ {
|
||||
csa.Group.Peers = append(csa.Group.Peers, string(getHash(ci.Leader)))
|
||||
}
|
||||
for _, r := range ci.Replicas {
|
||||
if r.cluster == ci.Name {
|
||||
if r.cluster != ci.Name {
|
||||
csa.Group.Peers = append(csa.Group.Peers, r.peer)
|
||||
}
|
||||
}
|
||||
@@ -1290,12 +1287,14 @@ func (js *jetStream) removeOldPeers(mset *stream) {
|
||||
numPeers := len(cca.Group.Peers)
|
||||
if numPeers == numExpandedPeers {
|
||||
cca.Group.Peers = csa.Group.Peers
|
||||
cca.Group.Preferred = _EMPTY_
|
||||
} else {
|
||||
cca.Group.Peers = cca.Group.Peers[len(cca.Group.Peers)-1:]
|
||||
}
|
||||
cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca))
|
||||
}
|
||||
|
||||
csa.Group.Preferred = newPreferred
|
||||
cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa))
|
||||
}
|
||||
|
||||
@@ -1738,8 +1737,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
startMigrationMonitoring()
|
||||
} else {
|
||||
stopMigrationMonitoring()
|
||||
// Delay before removing the old peers after assuming leadership.
|
||||
time.AfterFunc(2*time.Second, func() { js.removeOldPeers(mset) })
|
||||
}
|
||||
}
|
||||
case <-t.C:
|
||||
@@ -1767,21 +1764,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
}
|
||||
// Track the new peers and check the ones that are current.
|
||||
var newPeers []*PeerInfo
|
||||
quorum, offline := mset.cfg.Replicas/2+1, 0
|
||||
quorum := mset.cfg.Replicas/2 + 1
|
||||
for _, r := range ci.Replicas {
|
||||
if r.cluster != ci.Name {
|
||||
if r.Current {
|
||||
newPeers = append(newPeers, r)
|
||||
} else if r.Offline {
|
||||
offline++
|
||||
}
|
||||
}
|
||||
}
|
||||
// If all are current we are good, or if we have some offline and we have a quorum.
|
||||
if lnp := len(newPeers); lnp == mset.cfg.Replicas || (offline > 0 && lnp >= quorum) {
|
||||
if lnp := len(newPeers); lnp >= quorum {
|
||||
stopMigrationMonitoring()
|
||||
// Transfer leadership for stream.
|
||||
mset.raftNode().StepDown(newPeers[0].peer)
|
||||
// Remove the old peers and transfer leadership.
|
||||
time.AfterFunc(2*time.Second, func() { js.removeOldPeers(mset, newPeers[0].peer) })
|
||||
}
|
||||
case err := <-restoreDoneCh:
|
||||
// We have completed a restore from snapshot on this server. The stream assignment has
|
||||
@@ -2515,7 +2510,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
|
||||
|
||||
if node := mset.raftNode(); node != nil {
|
||||
if node.Leader() {
|
||||
node.StepDown()
|
||||
node.StepDown(sa.Group.Preferred)
|
||||
}
|
||||
node.ProposeRemovePeer(ourID)
|
||||
}
|
||||
@@ -4641,6 +4636,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
|
||||
consumers = append(consumers, cca)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// All other updates make sure no preferred is set.
|
||||
rg.Preferred = _EMPTY_
|
||||
}
|
||||
|
||||
sa := &streamAssignment{Group: rg, Sync: osa.Sync, Created: osa.Created, Config: newCfg, Subject: subject, Reply: reply, Client: ci}
|
||||
|
||||
@@ -1270,20 +1270,15 @@ func (n *raft) StepDown(preferred ...string) error {
|
||||
n.debug("Can not transfer to preferred peer %q", preferred[0])
|
||||
}
|
||||
|
||||
// If we have a new leader selected, let them start the vote process but do not actively stepdown.
|
||||
// If we have a new leader selected, transfer over to them.
|
||||
if maybeLeader != noLeader {
|
||||
n.debug("Selected %q for new leader", maybeLeader)
|
||||
n.sendAppendEntry([]*Entry{{EntryLeaderTransfer, []byte(maybeLeader)}})
|
||||
// If we chose also stepdown. If we were told let them take over.
|
||||
if len(preferred) == 0 {
|
||||
n.debug("Stepping down")
|
||||
stepdown.push(noLeader)
|
||||
}
|
||||
} else {
|
||||
// Force us to stepdown here.
|
||||
n.debug("Stepping down")
|
||||
stepdown.push(noLeader)
|
||||
}
|
||||
// Force us to stepdown here.
|
||||
n.debug("Stepping down")
|
||||
stepdown.push(noLeader)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1383,6 +1378,13 @@ func (n *raft) Peers() []*Peer {
|
||||
// Update our known set of peers.
|
||||
func (n *raft) UpdateKnownPeers(knownPeers []string) {
|
||||
n.Lock()
|
||||
// If this is a scale up, let the normal add peer logic take precedence.
|
||||
// Otherwise if the new peers are slow to start we stall ourselves.
|
||||
if len(knownPeers) > len(n.peers) {
|
||||
n.Unlock()
|
||||
return
|
||||
}
|
||||
// Process like peer state update.
|
||||
ps := &peerState{knownPeers, len(knownPeers), n.extSt}
|
||||
n.processPeerState(ps)
|
||||
isLeader := n.state == Leader
|
||||
|
||||
Reference in New Issue
Block a user