mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Fix for multiple concurrent ephemeral consumer requests in clustered mode with max consumers set.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -4050,14 +4050,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
|
||||
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
|
||||
eca := encodeAddConsumerAssignment(ca)
|
||||
|
||||
// Mark this as pending if a durable.
|
||||
if isDurableConsumer(cfg) {
|
||||
if sa.consumers == nil {
|
||||
sa.consumers = make(map[string]*consumerAssignment)
|
||||
}
|
||||
ca.pending = true
|
||||
sa.consumers[ca.Name] = ca
|
||||
// Mark this as pending.
|
||||
if sa.consumers == nil {
|
||||
sa.consumers = make(map[string]*consumerAssignment)
|
||||
}
|
||||
ca.pending = true
|
||||
sa.consumers[ca.Name] = ca
|
||||
|
||||
// Do formal proposal.
|
||||
cc.meta.Propose(eca)
|
||||
|
||||
@@ -7640,6 +7640,59 @@ func TestJetStreamClusterMaxConsumers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "MAXCC",
|
||||
Storage: nats.MemoryStorage,
|
||||
Subjects: []string{"in.maxcc.>"},
|
||||
MaxConsumers: 1,
|
||||
Replicas: 3,
|
||||
}
|
||||
if _, err := js.AddStream(cfg); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
si, err := js.StreamInfo("MAXCC")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.Config.MaxConsumers != 1 {
|
||||
t.Fatalf("Expected max of 1, got %d", si.Config.MaxConsumers)
|
||||
}
|
||||
|
||||
startCh := make(chan bool)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for n := 0; n < 10; n++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
<-startCh
|
||||
js.SubscribeSync("in.maxcc.foo")
|
||||
}()
|
||||
}
|
||||
// Wait for Go routines.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
close(startCh)
|
||||
wg.Wait()
|
||||
|
||||
var names []string
|
||||
for n := range js.ConsumerNames("MAXCC") {
|
||||
names = append(names, n)
|
||||
}
|
||||
if nc := len(names); nc > 1 {
|
||||
t.Fatalf("Expected only 1 consumer, got %d", nc)
|
||||
}
|
||||
}
|
||||
|
||||
// Support functions
|
||||
|
||||
// Used to setup superclusters for tests.
|
||||
|
||||
Reference in New Issue
Block a user