From 6bf50dbb77088b040496e164bb774e9a4129423d Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 18 Aug 2022 13:47:40 -0700 Subject: [PATCH] induce delay prior to scale down (#3381) This is to avoid a narrow race between adding server and them catching up where they also register as current. Also wait for all peers to be caught up. This also avoids clearing catchup marker once catchup stalled. A stalled catchup would remove the marker causing the peer to register as current. Signed-off-by: Matthias Hanel --- server/jetstream_cluster.go | 74 +++++++++++++++++--------- server/jetstream_super_cluster_test.go | 2 +- 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d96350f5..88780704 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1630,6 +1630,9 @@ func currentPeerCount(ci *ClusterInfo, peerSet []string, leaderId string) (curre return } +// how many migration tracker ticks of delay to induce +const scaleDownDelayTicks = 2 + // Monitor our stream node for this stream. func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnapshot bool) { s, cc := js.server(), js.cluster @@ -1709,18 +1712,20 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // For migration tracking. var mmt *time.Ticker var mmtc <-chan time.Time + var mDelayTc int startMigrationMonitoring := func() { if mmt == nil { mmt = time.NewTicker(1 * time.Second) mmtc = mmt.C + mDelayTc = 0 } } stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() - mmt, mmtc = nil, nil + mmt, mmtc, mDelayTc = nil, nil, 0 } } defer stopMigrationMonitoring() @@ -1927,7 +1932,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps continue } // Determine if process is finished - // First make sure all consumer are properly scaled down toSkip := len(rg.Peers) - replicas newPeerSet := rg.Peers[toSkip:] oldPeerSet := rg.Peers[:toSkip] @@ -1936,6 +1940,16 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps newPeerTbl[peer] = struct{}{} } + currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId) + // make sure to wait to ensure that catchup has started. + if currentCount == replicas { + if mDelayTc < scaleDownDelayTicks { + mDelayTc++ + continue + } + } + + // First make sure all consumer are properly scaled down waitOnConsumerScaledown := false js.mu.RLock() if san, ok := cc.streams[accName][sa.Config.Name]; ok { @@ -1963,27 +1977,26 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps continue } - currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId) - - // If all are current we are good, or if we have some offline and we have a quorum. - if quorum := replicas/2 + 1; currentCount >= quorum { - // Remove the old peers or transfer leadership (after which new leader resumes with peer removal). - // stopMigrationMonitoring is invoked on actual leadership change or - // on the next tick when migration completed. - // In case these operations fail, the next tick will retry - if !foundLeader { - n.StepDown(firstPeer) - } else { - for _, p := range oldPeerSet { - n.ProposeRemovePeer(p) - } - csa := sa.copyGroup() - csa.Group.Peers = newPeerSet - csa.Group.Cluster = s.ClusterName() // use cluster name of leader/self - csa.Group.Preferred = firstPeer - cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) + // Remove the old peers or transfer leadership (after which new leader resumes with peer removal). + // stopMigrationMonitoring is invoked on actual leadership change or + // on the next tick when migration completed. + // In case these operations fail, the next tick will retry + if !foundLeader { + s.Debugf("Scale down of '%s' step down ('%s' preferred)", sa.Config.Name, firstPeer) + n.StepDown(firstPeer) + } else { + s.Noticef("Scale down of '%s' to %+v ('%s' preferred)", + sa.Config.Name, s.peerSetToNames(newPeerSet), firstPeer) + for _, p := range oldPeerSet { + n.ProposeRemovePeer(p) } + csa := sa.copyGroup() + csa.Group.Peers = newPeerSet + csa.Group.Cluster = s.ClusterName() // use cluster name of leader/self + csa.Group.Preferred = firstPeer + cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) } + mDelayTc = 0 case err := <-restoreDoneCh: // We have completed a restore from snapshot on this server. The stream assignment has // already been assigned but the replicas will need to catch up out of band. Consumers @@ -3745,18 +3758,20 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // For migration tracking. var mmt *time.Ticker var mmtc <-chan time.Time + var mDelayTc int startMigrationMonitoring := func() { if mmt == nil { mmt = time.NewTicker(1 * time.Second) mmtc = mmt.C + mDelayTc = 0 } } stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() - mmt, mmtc = nil, nil + mmt, mmtc, mDelayTc = nil, nil, 0 } } defer stopMigrationMonitoring() @@ -3861,15 +3876,21 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId) - // If all are current we are good, or if we have some offline and we have a quorum. - if quorum := replicas/2 + 1; currentCount >= quorum { + // If all are current we are good + if currentCount == replicas { + if mDelayTc < scaleDownDelayTicks { + mDelayTc++ + continue + } // Remove the old peers or transfer leadership (after which new leader resumes with peer removal). // stopMigrationMonitoring is invoked on actual leadership change or // on the next tick when migration completed. // In case these operations fail, the next tick will retry if !foundLeader { + s.Debugf("Scale down of '%s > %s' step down ('%s' preferred)", ca.Stream, ca.Name, firstPeer) n.StepDown(firstPeer) } else { + s.Noticef("Scale down of '%s > %s' to %+v", ca.Stream, ca.Name, s.peerSetToNames(newPeerSet)) // truncate this consumer for _, p := range oldPeerSet { n.ProposeRemovePeer(p) @@ -3880,6 +3901,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca)) } } + mDelayTc = 0 case <-t.C: doSnapshot(false) } @@ -7193,7 +7215,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Setup sequences to walk through. seq, last := sreq.FirstSeq, sreq.LastSeq mset.setCatchupPeer(sreq.Peer, last-seq) - defer mset.clearCatchupPeer(sreq.Peer) var spb int sendNextBatchAndContinue := func(qch chan struct{}) bool { @@ -7308,16 +7329,19 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { case <-qch: return case <-remoteQuitCh: + mset.clearCatchupPeer(sreq.Peer) return case <-notActive.C: s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.name()) return case <-nextBatchC: if !sendNextBatchAndContinue(qch) { + mset.clearCatchupPeer(sreq.Peer) return } case <-cbKick: if !sendNextBatchAndContinue(qch) { + mset.clearCatchupPeer(sreq.Peer) return } } diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 89ddc218..6d8862e0 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -1812,7 +1812,7 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) { // Should see the cluster designation and leader switch to C2. // We should also shrink back down to original replica count. - checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("MOVE") if err != nil { return err