diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d96350f5..88780704 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1630,6 +1630,9 @@ func currentPeerCount(ci *ClusterInfo, peerSet []string, leaderId string) (curre return } +// how many migration tracker ticks of delay to induce +const scaleDownDelayTicks = 2 + // Monitor our stream node for this stream. func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnapshot bool) { s, cc := js.server(), js.cluster @@ -1709,18 +1712,20 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // For migration tracking. var mmt *time.Ticker var mmtc <-chan time.Time + var mDelayTc int startMigrationMonitoring := func() { if mmt == nil { mmt = time.NewTicker(1 * time.Second) mmtc = mmt.C + mDelayTc = 0 } } stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() - mmt, mmtc = nil, nil + mmt, mmtc, mDelayTc = nil, nil, 0 } } defer stopMigrationMonitoring() @@ -1927,7 +1932,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps continue } // Determine if process is finished - // First make sure all consumer are properly scaled down toSkip := len(rg.Peers) - replicas newPeerSet := rg.Peers[toSkip:] oldPeerSet := rg.Peers[:toSkip] @@ -1936,6 +1940,16 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps newPeerTbl[peer] = struct{}{} } + currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId) + // make sure to wait to ensure that catchup has started. + if currentCount == replicas { + if mDelayTc < scaleDownDelayTicks { + mDelayTc++ + continue + } + } + + // First make sure all consumer are properly scaled down waitOnConsumerScaledown := false js.mu.RLock() if san, ok := cc.streams[accName][sa.Config.Name]; ok { @@ -1963,27 +1977,26 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps continue } - currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId) - - // If all are current we are good, or if we have some offline and we have a quorum. - if quorum := replicas/2 + 1; currentCount >= quorum { - // Remove the old peers or transfer leadership (after which new leader resumes with peer removal). - // stopMigrationMonitoring is invoked on actual leadership change or - // on the next tick when migration completed. - // In case these operations fail, the next tick will retry - if !foundLeader { - n.StepDown(firstPeer) - } else { - for _, p := range oldPeerSet { - n.ProposeRemovePeer(p) - } - csa := sa.copyGroup() - csa.Group.Peers = newPeerSet - csa.Group.Cluster = s.ClusterName() // use cluster name of leader/self - csa.Group.Preferred = firstPeer - cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) + // Remove the old peers or transfer leadership (after which new leader resumes with peer removal). + // stopMigrationMonitoring is invoked on actual leadership change or + // on the next tick when migration completed. + // In case these operations fail, the next tick will retry + if !foundLeader { + s.Debugf("Scale down of '%s' step down ('%s' preferred)", sa.Config.Name, firstPeer) + n.StepDown(firstPeer) + } else { + s.Noticef("Scale down of '%s' to %+v ('%s' preferred)", + sa.Config.Name, s.peerSetToNames(newPeerSet), firstPeer) + for _, p := range oldPeerSet { + n.ProposeRemovePeer(p) } + csa := sa.copyGroup() + csa.Group.Peers = newPeerSet + csa.Group.Cluster = s.ClusterName() // use cluster name of leader/self + csa.Group.Preferred = firstPeer + cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) } + mDelayTc = 0 case err := <-restoreDoneCh: // We have completed a restore from snapshot on this server. The stream assignment has // already been assigned but the replicas will need to catch up out of band. Consumers @@ -3745,18 +3758,20 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // For migration tracking. var mmt *time.Ticker var mmtc <-chan time.Time + var mDelayTc int startMigrationMonitoring := func() { if mmt == nil { mmt = time.NewTicker(1 * time.Second) mmtc = mmt.C + mDelayTc = 0 } } stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() - mmt, mmtc = nil, nil + mmt, mmtc, mDelayTc = nil, nil, 0 } } defer stopMigrationMonitoring() @@ -3861,15 +3876,21 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId) - // If all are current we are good, or if we have some offline and we have a quorum. - if quorum := replicas/2 + 1; currentCount >= quorum { + // If all are current we are good + if currentCount == replicas { + if mDelayTc < scaleDownDelayTicks { + mDelayTc++ + continue + } // Remove the old peers or transfer leadership (after which new leader resumes with peer removal). // stopMigrationMonitoring is invoked on actual leadership change or // on the next tick when migration completed. // In case these operations fail, the next tick will retry if !foundLeader { + s.Debugf("Scale down of '%s > %s' step down ('%s' preferred)", ca.Stream, ca.Name, firstPeer) n.StepDown(firstPeer) } else { + s.Noticef("Scale down of '%s > %s' to %+v", ca.Stream, ca.Name, s.peerSetToNames(newPeerSet)) // truncate this consumer for _, p := range oldPeerSet { n.ProposeRemovePeer(p) @@ -3880,6 +3901,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca)) } } + mDelayTc = 0 case <-t.C: doSnapshot(false) } @@ -7193,7 +7215,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Setup sequences to walk through. seq, last := sreq.FirstSeq, sreq.LastSeq mset.setCatchupPeer(sreq.Peer, last-seq) - defer mset.clearCatchupPeer(sreq.Peer) var spb int sendNextBatchAndContinue := func(qch chan struct{}) bool { @@ -7308,16 +7329,19 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { case <-qch: return case <-remoteQuitCh: + mset.clearCatchupPeer(sreq.Peer) return case <-notActive.C: s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.name()) return case <-nextBatchC: if !sendNextBatchAndContinue(qch) { + mset.clearCatchupPeer(sreq.Peer) return } case <-cbKick: if !sendNextBatchAndContinue(qch) { + mset.clearCatchupPeer(sreq.Peer) return } } diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 89ddc218..6d8862e0 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -1812,7 +1812,7 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) { // Should see the cluster designation and leader switch to C2. // We should also shrink back down to original replica count. - checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("MOVE") if err != nil { return err