Update to not pop directly, just bail when we detect leadership change

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-18 13:27:27 -07:00
parent ea775a80e8
commit 71b8a33456
2 changed files with 9 additions and 19 deletions

View File

@@ -1122,16 +1122,6 @@ func (o *consumer) isLeader() bool {
return true
}
func (o *consumer) clearLoopAndForward() {
o.mu.Lock()
defer o.mu.Unlock()
if o.qch != nil {
close(o.qch)
// Note can not close pch here.
o.qch, o.pch = nil, nil
}
}
func (o *consumer) setLeader(isLeader bool) {
o.mu.RLock()
mset := o.mset
@@ -2011,15 +2001,15 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
return
}
forwardProposals := func() {
forwardProposals := func() error {
o.mu.Lock()
node, pch = o.node, o.pch
if o.node != node || node.State() != Leader {
o.mu.Unlock()
return errors.New("no longer leader")
}
proposal := o.phead
o.phead, o.ptail = nil, nil
o.mu.Unlock()
if node == nil || pch == nil || node.State() != Leader {
return
}
// 256k max for now per batch.
const maxBatch = 256 * 1024
var entries []*Entry
@@ -2038,6 +2028,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
if len(entries) > 0 {
node.ProposeDirect(entries)
}
return nil
}
// In case we have anything pending on entry.
@@ -2049,7 +2040,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
forwardProposals()
return
case <-pch:
forwardProposals()
if err := forwardProposals(); err != nil {
return
}
}
}
}

View File

@@ -4264,9 +4264,6 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
} else {
// Check for scale down to 1..
if rg.node != nil && len(rg.Peers) == 1 {
// Need to pop loopAndForward by closing qch and nil out both qch and pch
// to avoid leaving a closed raft node forwarding proposals.
o.clearLoopAndForward()
o.clearNode()
o.setLeader(true)
// Need to clear from rg too.