Merge pull request #2354 from nats-io/maxcc

Fix for multiple concurrent ephemeral consumer requests with max consumers set.
This commit is contained in:
Derek Collison
2021-07-08 08:53:51 -07:00
committed by GitHub
2 changed files with 60 additions and 8 deletions

View File

@@ -4050,14 +4050,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
eca := encodeAddConsumerAssignment(ca)
// Mark this as pending if a durable.
if isDurableConsumer(cfg) {
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
}
ca.pending = true
sa.consumers[ca.Name] = ca
// Mark this as pending.
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
}
ca.pending = true
sa.consumers[ca.Name] = ca
// Do formal proposal.
cc.meta.Propose(eca)

View File

@@ -7340,7 +7340,7 @@ func TestJetStreamClusterSourceAndMirrorConsumersLeaderChange(t *testing.T) {
// Now make sure we only have a single direct consumer on our origin streams.
// Pick one at random.
name := fmt.Sprintf("O%d", rand.Intn(numStreams))
name := fmt.Sprintf("O%d", rand.Intn(numStreams-1)+1)
c1.waitOnStreamLeader("$G", name)
s := c1.streamLeader("$G", name)
a, err := s.lookupAccount("$G")
@@ -7351,6 +7351,7 @@ func TestJetStreamClusterSourceAndMirrorConsumersLeaderChange(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
if ndc := mset.numDirectConsumers(); ndc != 1 {
return fmt.Errorf("Stream %q wanted 1 direct consumer, got %d", name, ndc)
@@ -7640,6 +7641,59 @@ func TestJetStreamClusterMaxConsumers(t *testing.T) {
}
}
func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
cfg := &nats.StreamConfig{
Name: "MAXCC",
Storage: nats.MemoryStorage,
Subjects: []string{"in.maxcc.>"},
MaxConsumers: 1,
Replicas: 3,
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
si, err := js.StreamInfo("MAXCC")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Config.MaxConsumers != 1 {
t.Fatalf("Expected max of 1, got %d", si.Config.MaxConsumers)
}
startCh := make(chan bool)
var wg sync.WaitGroup
for n := 0; n < 10; n++ {
wg.Add(1)
go func() {
defer wg.Done()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
<-startCh
js.SubscribeSync("in.maxcc.foo")
}()
}
// Wait for Go routines.
time.Sleep(250 * time.Millisecond)
close(startCh)
wg.Wait()
var names []string
for n := range js.ConsumerNames("MAXCC") {
names = append(names, n)
}
if nc := len(names); nc > 1 {
t.Fatalf("Expected only 1 consumer, got %d", nc)
}
}
// Support functions
// Used to setup superclusters for tests.