fix peer tracking by removing peers before scaledown (#3289)

in doRemovePeerAsLeader the leader also records the removed peer in the removed set

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2022-07-26 22:01:03 +02:00
committed by GitHub
parent 6212087feb
commit 04ffed48b0
3 changed files with 26 additions and 50 deletions

View File

@@ -1322,46 +1322,6 @@ func (js *jetStream) processRemovePeer(peer string) {
}
}
// Remove old peers after the new peers are caught up.
// We are the stream leader here
func (js *jetStream) truncateOldPeers(mset *stream, newPreferred string) {
// Make sure still valid.
mset.mu.Lock()
isValid := mset.qch != nil
mset.mu.Unlock()
if !isValid {
return
}
sa := mset.streamAssignment()
js.mu.Lock()
defer js.mu.Unlock()
// Make sure still valid.
if js.srv == nil || !js.srv.isRunning() {
return
}
cc, csa := js.cluster, sa.copyGroup()
csa.Group.Peers = csa.Group.Peers[len(csa.Group.Peers)-csa.Config.Replicas:]
// Now do consumers still needing truncating first here, followed by the owning stream.
for _, ca := range csa.consumers {
if r := ca.Config.replicas(csa.Config); r != len(ca.Group.Peers) {
cca := ca.copyGroup()
cca.Group.Peers = cca.Group.Peers[len(cca.Group.Peers)-r:]
cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca))
}
}
si, _ := js.srv.nodeToInfo.Load(newPreferred)
csa.Group.Cluster = si.(nodeInfo).cluster
csa.Group.Preferred = newPreferred
cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa))
}
// Assumes all checks have already been done.
func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) bool {
js.mu.Lock()
@@ -1958,6 +1918,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// First make sure all consumer are properly scaled down
toSkip := len(rg.Peers) - replicas
newPeerSet := rg.Peers[toSkip:]
oldPeerSet := rg.Peers[:toSkip]
newPeerTbl := map[string]struct{}{}
for _, peer := range newPeerSet {
newPeerTbl[peer] = struct{}{}
@@ -1987,7 +1948,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
js.mu.RUnlock()
if waitOnConsumerScaledown {
continue
}
@@ -2002,7 +1962,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if !foundLeader {
n.StepDown(firstPeer)
} else {
js.truncateOldPeers(mset, selfId)
for _, p := range oldPeerSet {
n.ProposeRemovePeer(p)
}
csa := sa.copyGroup()
csa.Group.Peers = newPeerSet
csa.Group.Cluster = s.ClusterName() // use cluster name of leader/self
csa.Group.Preferred = firstPeer
cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa))
}
}
case err := <-restoreDoneCh:
@@ -3794,6 +3761,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}
toSkip := len(rg.Peers) - replicas
newPeerSet := rg.Peers[toSkip:]
oldPeerSet := rg.Peers[:toSkip]
ci := js.clusterInfo(rg)
@@ -3809,7 +3777,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
n.StepDown(firstPeer)
} else {
// truncate this consumer
for _, p := range oldPeerSet {
n.ProposeRemovePeer(p)
}
cca := ca.copyGroup()
cca.Group.Cluster = s.ClusterName() // use cluster name of leader/self
cca.Group.Peers = newPeerSet
cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca))
}

View File

@@ -358,11 +358,11 @@ func TestJetStreamJWTMove(t *testing.T) {
require_NoError(t, err)
require_Equal(t, ci.Cluster.Name, "C1")
sc.waitOnStreamLeader(aExpPub, "MOVE-ME")
sc.clusterForName("C2").waitOnStreamLeader(aExpPub, "MOVE-ME")
checkFor(t, 20*time.Second, 250*time.Millisecond, func() error {
checkFor(t, 30*time.Second, 250*time.Millisecond, func() error {
if si, err := js.StreamInfo("MOVE-ME"); err != nil {
return err
return fmt.Errorf("stream: %v", err)
} else if si.Cluster.Name != "C2" {
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
} else if !strings.HasPrefix(si.Cluster.Leader, "C2-") {
@@ -374,7 +374,7 @@ func TestJetStreamJWTMove(t *testing.T) {
}
// Now make sure consumer has leader etc..
if ci, err := js.ConsumerInfo("MOVE-ME", "dur"); err != nil {
return err
return fmt.Errorf("stream: %v", err)
} else if ci.Cluster.Name != "C2" {
return fmt.Errorf("Wrong cluster: %q", ci.Cluster.Name)
} else if ci.Cluster.Leader == _EMPTY_ {
@@ -404,6 +404,7 @@ func TestJetStreamJWTMove(t *testing.T) {
test(t, 1, accClaim)
})
})
t.Run("non-tiered", func(t *testing.T) {
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Limits.JetStreamLimits = jwt.JetStreamLimits{
@@ -416,7 +417,6 @@ func TestJetStreamJWTMove(t *testing.T) {
test(t, 1, accClaim)
})
})
}
func TestJetStreamJWTClusteredTiers(t *testing.T) {

View File

@@ -137,7 +137,7 @@ type raft struct {
csz int
qn int
peers map[string]*lps
removed map[string]string
removed map[string]struct{}
acks map[uint64]map[string]struct{}
pae map[uint64]*appendEntry
elect *time.Timer
@@ -694,6 +694,10 @@ func (n *raft) ProposeAddPeer(peer string) error {
// As a leader if we are proposing to remove a peer assume its already gone.
func (n *raft) doRemovePeerAsLeader(peer string) {
n.Lock()
if n.removed == nil {
n.removed = map[string]struct{}{}
}
n.removed[peer] = struct{}{}
if _, ok := n.peers[peer]; ok {
delete(n.peers, peer)
// We should decrease our cluster size since we are tracking this peer and the peer is most likely already gone.
@@ -2382,9 +2386,9 @@ func (n *raft) applyCommit(index uint64) error {
// Make sure we have our removed map.
if n.removed == nil {
n.removed = make(map[string]string)
n.removed = make(map[string]struct{})
}
n.removed[peer] = peer
n.removed[peer] = struct{}{}
if _, ok := n.peers[peer]; ok {
delete(n.peers, peer)