down scale consumer before downscale of stream (#3282)

Now monitorStream waits with scaling down the stream until all
monitorConsumer have scaled down their respective consumer

Also update consumer assignment for later use in monitorConsumer
Same for stream assignment in monitorStream

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2022-07-22 19:54:13 +02:00
committed by GitHub
parent 3676823e07
commit 5a720d4977

View File

@@ -1346,12 +1346,13 @@ func (js *jetStream) truncateOldPeers(mset *stream, newPreferred string) {
cc, csa := js.cluster, sa.copyGroup()
csa.Group.Peers = csa.Group.Peers[len(csa.Group.Peers)-csa.Config.Replicas:]
// Now do consumers actually first here, followed by the owning stream.
// Now do consumers still needing truncating first here, followed by the owning stream.
for _, ca := range csa.consumers {
cca := ca.copyGroup()
r := cca.Config.replicas(csa.Config)
cca.Group.Peers = cca.Group.Peers[len(cca.Group.Peers)-r:]
cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca))
if r := ca.Config.replicas(csa.Config); r != len(ca.Group.Peers) {
cca := ca.copyGroup()
cca.Group.Peers = cca.Group.Peers[len(cca.Group.Peers)-r:]
cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca))
}
}
si, _ := js.srv.nodeToInfo.Load(newPreferred)
@@ -1634,6 +1635,34 @@ func (mset *stream) removeNode() {
}
}
// utility function to return the number of current peers in the specified set,
// as well as the first current peer and if the leader (always current) is part of the set or not
func currentPeerCount(ci *ClusterInfo, peerSet []string, leaderId string) (currentCount int, firstPeer string, foundLeader bool) {
for _, peer := range peerSet {
// selfId is leaderId
foundCurrent := peer == leaderId
if foundCurrent {
foundLeader = true
} else {
for _, p := range ci.Replicas {
if peer == string(getHash(p.Name)) {
if p.Current {
foundCurrent = true
}
break
}
}
}
if foundCurrent {
currentCount++
if firstPeer == _EMPTY_ {
firstPeer = peer
}
}
}
return
}
// Monitor our stream node for this stream.
func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnapshot bool) {
s, cc := js.server(), js.cluster
@@ -1648,7 +1677,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
return
}
qch, lch, aq, uch := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC()
qch, lch, aq, uch, selfId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), cc.meta.ID()
s.Debugf("Starting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
defer s.Debugf("Exiting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
@@ -1683,9 +1712,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
acc, err := s.LookupAccount(sa.Client.serviceAccount())
if err != nil {
s.Warnf("Could not retrieve account for stream '%s > %s", sa.Client.serviceAccount(), sa.Config.Name)
s.Warnf("Could not retrieve account for stream '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
return
}
accName := acc.GetName()
var lastSnap []byte
@@ -1779,7 +1809,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
doSnapshot()
}
} else {
s.Warnf("Error applying entries to '%s > %s': %v", sa.Client.serviceAccount(), sa.Config.Name, err)
s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err)
if isClusterResetErr(err) {
if mset.isMirror() && mset.IsLeader() {
mset.retryMirrorConsumer()
@@ -1823,7 +1853,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
migrating := mset.isMigrating()
// Check for migrations here. We set the state on the stream assignment update below.
if isLeader && migrating && mmtc == nil {
if isLeader && migrating {
startMigrationMonitoring()
}
@@ -1871,18 +1901,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case <-t.C:
doSnapshot()
case <-uch:
// We get this when we have a new stream assignment caused by an update. We want
// to know if we are migrating.
migrating := false
if cc != nil && cc.meta != nil {
migrating = mset.isMigrating()
}
// keep stream assignment current
sa = mset.streamAssignment()
// We get this when we have a new stream assignment caused by an update.
// We want to know if we are migrating.
migrating := mset.isMigrating()
// If we are migrating, monitor for the new peers to be caught up.
if !migrating {
if isLeader && migrating {
if mmtc == nil {
doSnapshot()
startMigrationMonitoring()
}
} else {
stopMigrationMonitoring()
} else if isLeader && mmtc == nil {
doSnapshot()
startMigrationMonitoring()
}
case <-mmtc:
if !isLeader {
@@ -1908,42 +1939,52 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
stopMigrationMonitoring()
continue
}
//
// Determine if process is finished
// First make sure all consumer are properly scaled down
toSkip := len(rg.Peers) - replicas
newPeerSet := rg.Peers[toSkip:]
currentCount := 0
firstPeer := _EMPTY_
foundSelf := false
selfId := cc.meta.ID()
newPeerTbl := map[string]struct{}{}
for _, peer := range newPeerSet {
foundCurrent := peer == mset.leader
if !foundCurrent {
for _, p := range ci.Replicas {
if peer == string(getHash(p.Name)) {
if p.Current {
foundCurrent = true
}
break
newPeerTbl[peer] = struct{}{}
}
waitOnConsumerScaledown := false
js.mu.RLock()
if san, ok := cc.streams[accName][sa.Config.Name]; ok {
FOR_CONSUMER_SCALEDOWN:
for cName, c := range san.consumers {
if c.pending || len(c.Group.Peers) > c.Config.replicas(san.Config) {
waitOnConsumerScaledown = true
s.Debugf("Scale down of '%s > %s' blocked by consumer '%s'",
accName, san.Config.Name, cName)
break
}
for _, peer := range c.Group.Peers {
if _, ok := newPeerTbl[peer]; !ok {
waitOnConsumerScaledown = true
s.Debugf("Scale down of '%s > %s' blocked by consumer '%s' with old peer %s",
accName, san.Config.Name, cName, peer)
break FOR_CONSUMER_SCALEDOWN
}
}
}
if foundCurrent {
currentCount++
if firstPeer == _EMPTY_ {
firstPeer = peer
}
}
if peer == selfId {
foundSelf = true
}
}
js.mu.RUnlock()
if waitOnConsumerScaledown {
continue
}
currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId)
// If all are current we are good, or if we have some offline and we have a quorum.
if quorum := replicas/2 + 1; currentCount >= quorum {
// Remove the old peers or transfer leadership (after which new leader resumes with peer removal).
// stopMigrationMonitoring is invoked on actual leadership change or
// on the next tick when migration completed.
// In case these operations fail, the next tick will retry
if !foundSelf {
if !foundLeader {
n.StepDown(firstPeer)
} else {
js.truncateOldPeers(mset, selfId)
@@ -3543,6 +3584,15 @@ func (o *consumer) streamAndNode() (*stream, RaftNode) {
return o.mset, o.node
}
func (o *consumer) replica() int {
o.mu.RLock()
oCfg := o.cfg
mset := o.mset
o.mu.RUnlock()
sCfg := mset.config()
return oCfg.replicas(&sCfg)
}
func (o *consumer) raftGroup() *raftGroup {
if o == nil {
return nil
@@ -3574,15 +3624,14 @@ func (o *consumer) raftNode() RaftNode {
}
func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
s, n := js.server(), o.raftNode()
s, n, cc := js.server(), o.raftNode(), js.cluster
defer s.grWG.Done()
if n == nil {
s.Warnf("No RAFT group for consumer")
return
}
qch, lch, aq, uch := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC()
qch, lch, aq, uch, selfId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID()
s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
defer s.Debugf("Exiting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
@@ -3625,6 +3674,25 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}
}
// For migration tracking.
var mmt *time.Ticker
var mmtc <-chan time.Time
startMigrationMonitoring := func() {
if mmt == nil {
mmt = time.NewTicker(1 * time.Second)
mmtc = mmt.C
}
}
stopMigrationMonitoring := func() {
if mmt != nil {
mmt.Stop()
mmt, mmtc = nil, nil
}
}
defer stopMigrationMonitoring()
// Track if we are leader.
var isLeader bool
recovering := true
@@ -3665,7 +3733,68 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
doSnapshot(true)
}
// We may receive a leader change after the consumer assignment which would cancel us
// monitoring for this closely. So re-assess our state here as well.
// Or the old leader is no longer part of the set and transferred leadership
// for this leader to resume with removal
rg := o.raftGroup()
// Check for migrations (peer count and replica count differ) here.
// We set the state on the stream assignment update below.
if isLeader && len(rg.Peers) != o.replica() {
startMigrationMonitoring()
} else {
stopMigrationMonitoring()
}
case <-uch:
// keep consumer assignment current
ca = o.consumerAssignment()
// We get this when we have a new consumer assignment caused by an update.
// We want to know if we are migrating.
rg := o.raftGroup()
// If we are migrating, monitor for the new peers to be caught up.
if isLeader && len(rg.Peers) != o.replica() {
startMigrationMonitoring()
} else {
stopMigrationMonitoring()
}
case <-mmtc:
if !isLeader {
// We are no longer leader, so not our job.
stopMigrationMonitoring()
continue
}
rg := o.raftGroup()
replicas := o.replica()
if len(rg.Peers) <= replicas {
// Migration no longer happening, so not our job anymore
stopMigrationMonitoring()
continue
}
toSkip := len(rg.Peers) - replicas
newPeerSet := rg.Peers[toSkip:]
ci := js.clusterInfo(rg)
currentCount, firstPeer, foundLeader := currentPeerCount(ci, newPeerSet, selfId)
// If all are current we are good, or if we have some offline and we have a quorum.
if quorum := replicas/2 + 1; currentCount >= quorum {
// Remove the old peers or transfer leadership (after which new leader resumes with peer removal).
// stopMigrationMonitoring is invoked on actual leadership change or
// on the next tick when migration completed.
// In case these operations fail, the next tick will retry
if !foundLeader {
n.StepDown(firstPeer)
} else {
// truncate this consumer
cca := ca.copyGroup()
cca.Group.Peers = newPeerSet
cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca))
}
}
case <-t.C:
doSnapshot(false)
}