diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7f026618..896043cf 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4050,14 +4050,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} eca := encodeAddConsumerAssignment(ca) - // Mark this as pending if a durable. - if isDurableConsumer(cfg) { - if sa.consumers == nil { - sa.consumers = make(map[string]*consumerAssignment) - } - ca.pending = true - sa.consumers[ca.Name] = ca + // Mark this as pending. + if sa.consumers == nil { + sa.consumers = make(map[string]*consumerAssignment) } + ca.pending = true + sa.consumers[ca.Name] = ca // Do formal proposal. cc.meta.Propose(eca) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index e2e19377..9953e6d6 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -7640,6 +7640,59 @@ func TestJetStreamClusterMaxConsumers(t *testing.T) { } } +func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "MAXCC", + Storage: nats.MemoryStorage, + Subjects: []string{"in.maxcc.>"}, + MaxConsumers: 1, + Replicas: 3, + } + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + si, err := js.StreamInfo("MAXCC") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.Config.MaxConsumers != 1 { + t.Fatalf("Expected max of 1, got %d", si.Config.MaxConsumers) + } + + startCh := make(chan bool) + var wg sync.WaitGroup + + for n := 0; n < 10; n++ { + wg.Add(1) + go func() { + defer wg.Done() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + <-startCh + js.SubscribeSync("in.maxcc.foo") + }() + } + // Wait for Go routines. + time.Sleep(250 * time.Millisecond) + + close(startCh) + wg.Wait() + + var names []string + for n := range js.ConsumerNames("MAXCC") { + names = append(names, n) + } + if nc := len(names); nc > 1 { + t.Fatalf("Expected only 1 consumer, got %d", nc) + } +} + // Support functions // Used to setup superclusters for tests.