fix race by locking arround o.isLeader (#3291)

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2022-07-26 21:49:04 +02:00
committed by GitHub
parent fe370955c8
commit 6212087feb
2 changed files with 12 additions and 6 deletions

View File

@@ -795,7 +795,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
// This is always true in single server mode.
if o.isLeader() {
o.mu.RLock()
isLdr := o.isLeader()
o.mu.RUnlock()
if isLdr {
// Send advisory.
var suppress bool
if !s.standAloneMode() && ca == nil {

View File

@@ -3477,7 +3477,10 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}
// Only send response if not recovering.
if !js.isMetaRecovering() {
if wasExisting && (o.isLeader() || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) {
o.mu.RLock()
isLeader := o.isLeader()
o.mu.RUnlock()
if wasExisting && (isLeader || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) {
// Process if existing as an update.
js.mu.RLock()
client, subject, reply := ca.Client, ca.Subject, ca.Reply
@@ -3898,22 +3901,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
o.mu.Unlock()
case addPendingRequest:
o.mu.Lock()
if !o.isLeader() {
o.mu.Lock()
if o.prm == nil {
o.prm = make(map[string]struct{})
}
o.prm[string(buf[1:])] = struct{}{}
o.mu.Unlock()
}
o.mu.Unlock()
case removePendingRequest:
o.mu.Lock()
if !o.isLeader() {
o.mu.Lock()
if o.prm != nil {
delete(o.prm, string(buf[1:]))
}
o.mu.Unlock()
}
o.mu.Unlock()
default:
panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type! %v", entryOp(buf[0])))
}