mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Parallel consumer creation could drop responses (create and info) and could also run monitorConsumer twice.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -54,7 +54,7 @@ type ConsumerInfo struct {
|
||||
}
|
||||
|
||||
type ConsumerConfig struct {
|
||||
// Durable is deprecated. All consumers will have names. picked by clients.
|
||||
// Durable is deprecated. All consumers should have names, picked by clients.
|
||||
Durable string `json:"durable_name,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
@@ -298,6 +298,7 @@ type consumer struct {
|
||||
prOk bool
|
||||
uch chan struct{}
|
||||
retention RetentionPolicy
|
||||
inMonitor bool
|
||||
|
||||
// R>1 proposals
|
||||
pch chan struct{}
|
||||
@@ -959,6 +960,13 @@ func (o *consumer) clearNode() {
|
||||
}
|
||||
}
|
||||
|
||||
// IsLeader will return if we are the current leader.
|
||||
func (o *consumer) IsLeader() bool {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
return o.isLeader()
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (o *consumer) isLeader() bool {
|
||||
if o.node != nil {
|
||||
@@ -1604,6 +1612,8 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
|
||||
if o.cfg.FilterSubject != cfg.FilterSubject {
|
||||
if cfg.FilterSubject != _EMPTY_ {
|
||||
o.filterWC = subjectHasWildcard(cfg.FilterSubject)
|
||||
} else {
|
||||
o.filterWC = false
|
||||
}
|
||||
// Make sure we have correct signaling setup.
|
||||
// Consumer lock can not be held.
|
||||
@@ -4444,3 +4454,29 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s
|
||||
o.signalNewMessages()
|
||||
}
|
||||
}
|
||||
|
||||
// Will check if we are running in the monitor already and if not set the appropriate flag.
|
||||
func (o *consumer) checkInMonitor() bool {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
if o.inMonitor {
|
||||
return true
|
||||
}
|
||||
o.inMonitor = true
|
||||
return false
|
||||
}
|
||||
|
||||
// Clear us being in the monitor routine.
|
||||
func (o *consumer) clearMonitorRunning() {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
o.inMonitor = false
|
||||
}
|
||||
|
||||
// Test whether we are in the monitor routine.
|
||||
func (o *consumer) isMonitorRunning() bool {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
return o.inMonitor
|
||||
}
|
||||
|
||||
@@ -3823,6 +3823,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
|
||||
if isClustered && !req.Config.Direct {
|
||||
// If we are inline with client, we still may need to do a callout for consumer info
|
||||
// during this call, so place in Go routine to not block client.
|
||||
// Router and Gateway API calls already in separate context.
|
||||
if c.kind != ROUTER && c.kind != GATEWAY {
|
||||
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
|
||||
} else {
|
||||
@@ -4165,20 +4166,27 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
|
||||
}
|
||||
// We have a consumer assignment.
|
||||
js.mu.RLock()
|
||||
|
||||
var node RaftNode
|
||||
var leaderNotPartOfGroup bool
|
||||
if rg := ca.Group; rg != nil && rg.node != nil && rg.isMember(ourID) {
|
||||
node = rg.node
|
||||
if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
|
||||
leaderNotPartOfGroup = true
|
||||
var isMember bool
|
||||
|
||||
rg := ca.Group
|
||||
if rg != nil && rg.isMember(ourID) {
|
||||
isMember = true
|
||||
if rg.node != nil {
|
||||
node = rg.node
|
||||
if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
|
||||
leaderNotPartOfGroup = true
|
||||
}
|
||||
}
|
||||
}
|
||||
js.mu.RUnlock()
|
||||
// Check if we should ignore all together.
|
||||
if node == nil {
|
||||
// We have been assigned and are pending.
|
||||
if ca.pending {
|
||||
// Send our config and defaults for state and no cluster info.
|
||||
// We have been assigned but have not created a node yet. If we are a member return
|
||||
// our config and defaults for state and no cluster info.
|
||||
if isMember {
|
||||
resp.ConsumerInfo = &ConsumerInfo{
|
||||
Stream: ca.Stream,
|
||||
Name: ca.Name,
|
||||
@@ -4190,7 +4198,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
|
||||
return
|
||||
}
|
||||
// If we are a member and we have a group leader or we had a previous leader consider bailing out.
|
||||
if node != nil && (node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader()) {
|
||||
if node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader() {
|
||||
if leaderNotPartOfGroup {
|
||||
resp.Error = NewJSConsumerOfflineError()
|
||||
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2020-2022 The NATS Authors
|
||||
// Copyright 2020-2023 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -146,7 +146,6 @@ type consumerAssignment struct {
|
||||
// Internal
|
||||
responded bool
|
||||
deleted bool
|
||||
pending bool
|
||||
err error
|
||||
}
|
||||
|
||||
@@ -3585,18 +3584,19 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
}
|
||||
js.mu.RLock()
|
||||
s := js.srv
|
||||
acc, err := s.LookupAccount(ca.Client.serviceAccount())
|
||||
if err != nil {
|
||||
s.Warnf("JetStream cluster failed to lookup account %q: %v", ca.Client.serviceAccount(), err)
|
||||
js.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
rg := ca.Group
|
||||
alreadyRunning := rg.node != nil
|
||||
alreadyRunning := rg != nil && rg.node != nil
|
||||
accName, stream, consumer := ca.Client.serviceAccount(), ca.Stream, ca.Name
|
||||
js.mu.RUnlock()
|
||||
|
||||
acc, err := s.LookupAccount(accName)
|
||||
if err != nil {
|
||||
s.Warnf("JetStream cluster failed to lookup axccount %q: %v", accName, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Go ahead and create or update the consumer.
|
||||
mset, err := acc.lookupStream(ca.Stream)
|
||||
mset, err := acc.lookupStream(stream)
|
||||
if err != nil {
|
||||
js.mu.Lock()
|
||||
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.serviceAccount(), ca.Stream)
|
||||
@@ -3614,7 +3614,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
}
|
||||
|
||||
// Check if we already have this consumer running.
|
||||
o := mset.lookupConsumer(ca.Name)
|
||||
o := mset.lookupConsumer(consumer)
|
||||
|
||||
if !alreadyRunning {
|
||||
// Process the raft group and make sure its running if needed.
|
||||
@@ -3622,7 +3622,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
if ca.Config.MemoryStorage {
|
||||
storage = MemoryStorage
|
||||
}
|
||||
js.createRaftGroup(acc.GetName(), rg, storage)
|
||||
js.createRaftGroup(accName, rg, storage)
|
||||
} else {
|
||||
// If we are clustered update the known peers.
|
||||
js.mu.RLock()
|
||||
@@ -3633,51 +3633,56 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
}
|
||||
|
||||
// Check if we already have this consumer running.
|
||||
var didCreate, isConfigUpdate bool
|
||||
var didCreate, isConfigUpdate, needsLocalResponse bool
|
||||
if o == nil {
|
||||
// Add in the consumer if needed.
|
||||
o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false)
|
||||
didCreate = true
|
||||
} else {
|
||||
if err := o.updateConfig(ca.Config); err != nil {
|
||||
// This is essentially an update that has failed.
|
||||
js.mu.Lock()
|
||||
result := &consumerAssignmentResult{
|
||||
Account: ca.Client.serviceAccount(),
|
||||
Stream: ca.Stream,
|
||||
Consumer: ca.Name,
|
||||
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
|
||||
}
|
||||
result.Response.Error = NewJSConsumerNameExistError()
|
||||
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
|
||||
js.mu.Unlock()
|
||||
return
|
||||
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil {
|
||||
didCreate = true
|
||||
}
|
||||
} else {
|
||||
// This consumer exists.
|
||||
// Only update if config is really different.
|
||||
cfg := o.config()
|
||||
if !reflect.DeepEqual(&cfg, ca.Config) {
|
||||
if err := o.updateConfig(ca.Config); err != nil {
|
||||
// This is essentially an update that has failed.
|
||||
js.mu.RLock()
|
||||
result := &consumerAssignmentResult{
|
||||
Account: accName,
|
||||
Stream: stream,
|
||||
Consumer: consumer,
|
||||
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
|
||||
}
|
||||
result.Response.Error = NewJSConsumerNameExistError()
|
||||
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
|
||||
js.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
// Check if we already had a consumer assignment and its still pending.
|
||||
cca, oca := ca, o.consumerAssignment()
|
||||
o.mu.RLock()
|
||||
leader := o.isLeader()
|
||||
o.mu.RUnlock()
|
||||
|
||||
var sendState bool
|
||||
js.mu.Lock()
|
||||
js.mu.RLock()
|
||||
n := rg.node
|
||||
// Check if we already had a consumer assignment and its still pending.
|
||||
cca, oca := ca, o.consumerAssignment()
|
||||
if oca != nil {
|
||||
if !oca.responded {
|
||||
// We can't override info for replying here otherwise leader once elected can not respond.
|
||||
// So just update Config, leave off client and reply to the originals.
|
||||
cac := *oca
|
||||
cac.Config = ca.Config
|
||||
// So copy over original client and the reply from the old ca.
|
||||
cac := *ca
|
||||
cac.Client = oca.Client
|
||||
cac.Reply = oca.Reply
|
||||
cca = &cac
|
||||
needsLocalResponse = true
|
||||
}
|
||||
// If we look like we are scaling up, let's send our current state to the group.
|
||||
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && leader
|
||||
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && o.IsLeader() && n != nil
|
||||
// Signal that this is an update
|
||||
isConfigUpdate = true
|
||||
}
|
||||
n := rg.node
|
||||
js.mu.Unlock()
|
||||
js.mu.RUnlock()
|
||||
|
||||
if sendState && n != nil {
|
||||
if sendState {
|
||||
if snap, err := o.store.EncodedState(); err == nil {
|
||||
n.SendSnapshot(snap)
|
||||
}
|
||||
@@ -3694,6 +3699,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
err = o.setStoreState(state)
|
||||
o.mu.Unlock()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if IsNatsErr(err, JSConsumerStoreFailedErrF) {
|
||||
s.Warnf("Consumer create failed for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err)
|
||||
@@ -3755,7 +3761,6 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
}
|
||||
}
|
||||
|
||||
// Start our monitoring routine.
|
||||
if rg.node == nil {
|
||||
// Single replica consumer, process manually here.
|
||||
js.mu.Lock()
|
||||
@@ -3766,15 +3771,14 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
js.mu.Unlock()
|
||||
js.processConsumerLeaderChange(o, true)
|
||||
} else {
|
||||
if !alreadyRunning {
|
||||
// Clustered consumer.
|
||||
// Start our monitoring routine if needed.
|
||||
if !alreadyRunning && !o.isMonitorRunning() {
|
||||
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
|
||||
}
|
||||
// Only send response if not recovering.
|
||||
if !js.isMetaRecovering() {
|
||||
o.mu.RLock()
|
||||
isLeader := o.isLeader()
|
||||
o.mu.RUnlock()
|
||||
if wasExisting && (isLeader || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) {
|
||||
// For existing consumer, only send response if not recovering.
|
||||
if wasExisting && !js.isMetaRecovering() {
|
||||
if o.IsLeader() || (!didCreate && needsLocalResponse) {
|
||||
// Process if existing as an update.
|
||||
js.mu.RLock()
|
||||
client, subject, reply := ca.Client, ca.Subject, ca.Reply
|
||||
@@ -3970,6 +3974,12 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure only one is running.
|
||||
if o.checkInMonitor() {
|
||||
return
|
||||
}
|
||||
defer o.clearMonitorRunning()
|
||||
|
||||
qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID()
|
||||
|
||||
s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
|
||||
@@ -4373,8 +4383,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
|
||||
ca.responded = true
|
||||
js.mu.Unlock()
|
||||
|
||||
streamName := o.streamName()
|
||||
consumerName := o.String()
|
||||
streamName, consumerName := o.streamName(), o.String()
|
||||
acc, _ := s.LookupAccount(account)
|
||||
if acc == nil {
|
||||
return stepDownIfLeader()
|
||||
@@ -6493,7 +6502,6 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
|
||||
if sa.consumers == nil {
|
||||
sa.consumers = make(map[string]*consumerAssignment)
|
||||
}
|
||||
ca.pending = true
|
||||
sa.consumers[ca.Name] = ca
|
||||
|
||||
// Do formal proposal.
|
||||
|
||||
@@ -1486,13 +1486,21 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
np := 20
|
||||
np := 50
|
||||
|
||||
startCh := make(chan bool)
|
||||
errCh := make(chan error, np)
|
||||
|
||||
cfg := &nats.ConsumerConfig{
|
||||
Durable: "dlc",
|
||||
Replicas: 3,
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
swg := sync.WaitGroup{}
|
||||
wg.Add(np)
|
||||
swg.Add(np)
|
||||
|
||||
for i := 0; i < np; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -1501,21 +1509,20 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
swg.Done()
|
||||
|
||||
// Make them all fire at once.
|
||||
<-startCh
|
||||
|
||||
var err error
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "dlc",
|
||||
Replicas: 3,
|
||||
})
|
||||
if err != nil {
|
||||
if _, err := js.AddConsumer("TEST", cfg); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
swg.Wait()
|
||||
close(startCh)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if len(errCh) > 0 {
|
||||
|
||||
@@ -541,11 +541,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
|
||||
}
|
||||
|
||||
// This is always true in single server mode.
|
||||
mset.mu.RLock()
|
||||
isLeader := mset.isLeader()
|
||||
mset.mu.RUnlock()
|
||||
|
||||
if isLeader {
|
||||
if mset.IsLeader() {
|
||||
// Send advisory.
|
||||
var suppress bool
|
||||
if !s.standAloneMode() && sa == nil {
|
||||
|
||||
Reference in New Issue
Block a user