From ebb24006c2cd23c89ccfafec9e34ab1982b36fd2 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 22 Sep 2021 14:34:32 -0700 Subject: [PATCH] Direct consumers used for mirroring should not be affected by max consumer limits Signed-off-by: Derek Collison --- server/consumer.go | 2 +- server/jetstream_cluster.go | 17 ++++++++++--- server/norace_test.go | 49 ++++++++++++++++++++++++++++++++++++- server/stream.go | 13 +++++++--- 4 files changed, 71 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index bd49e033..49ef440b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -490,7 +490,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if maxc <= 0 || (mset.jsa.limits.MaxConsumers > 0 && mset.jsa.limits.MaxConsumers < maxc) { maxc = mset.jsa.limits.MaxConsumers } - if maxc > 0 && len(mset.consumers) >= maxc { + if maxc > 0 && mset.numPublicConsumers() >= maxc { mset.mu.Unlock() return nil, NewJSMaximumConsumersLimitError() } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 17462648..e11892e4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4119,10 +4119,19 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } // Check for max consumers here to short circuit if possible. - if maxc := sa.Config.MaxConsumers; maxc > 0 && len(sa.consumers) >= maxc { - resp.Error = NewJSMaximumConsumersLimitError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - return + if maxc := sa.Config.MaxConsumers; maxc > 0 { + // Don't count DIRECTS. + total := 0 + for _, ca := range sa.consumers { + if ca.Config != nil && !ca.Config.Direct { + total++ + } + } + if total >= maxc { + resp.Error = NewJSMaximumConsumersLimitError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } } // Also short circuit if DeliverLastPerSubject is set with no FilterSubject. diff --git a/server/norace_test.go b/server/norace_test.go index b131ff4a..c7afbadf 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -19,7 +19,6 @@ import ( "bufio" "bytes" "compress/gzip" - crand "crypto/rand" "encoding/binary" "encoding/json" "fmt" @@ -37,6 +36,8 @@ import ( "testing" "time" + crand "crypto/rand" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" @@ -3504,3 +3505,49 @@ func TestNoRaceJetStreamClusterInterestRetentionDeadlock(t *testing.T) { return nil }) } + +func TestNoRaceJetStreamClusterMaxConsumersAndDirect(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // We want to max sure max consumer limits do not affect mirrors or sources etc. + _, err := js.AddStream(&nats.StreamConfig{Name: "S", Storage: nats.MemoryStorage, MaxConsumers: 1}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var mirrors []string + for i := 0; i < 10; i++ { + // Create a mirror. + mname := fmt.Sprintf("M-%d", i+1) + mirrors = append(mirrors, mname) + _, err = js.AddStream(&nats.StreamConfig{Name: mname, Mirror: &nats.StreamSource{Name: "S"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + // Queue up messages. + numRequests := 20 + for i := 0; i < numRequests; i++ { + js.Publish("S", []byte("Q")) + } + + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + for _, mname := range mirrors { + si, err := js.StreamInfo(mname) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != uint64(numRequests) { + return fmt.Errorf("Expected %d msgs for %q, got state: %+v", numRequests, mname, si.State) + } + } + return nil + }) +} diff --git a/server/stream.go b/server/stream.go index 9f6378db..ae4c9f72 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3303,8 +3303,8 @@ func (mset *stream) getMsg(seq uint64) (*StoredMsg, error) { // getConsumers will return all the current consumers for this stream. func (mset *stream) getConsumers() []*consumer { - mset.mu.Lock() - defer mset.mu.Unlock() + mset.mu.RLock() + defer mset.mu.RUnlock() var obs []*consumer for _, o := range mset.consumers { @@ -3313,10 +3313,15 @@ func (mset *stream) getConsumers() []*consumer { return obs } +// Lock should be held for this one. +func (mset *stream) numPublicConsumers() int { + return len(mset.consumers) - mset.directs +} + // This returns all consumers that are not DIRECT. func (mset *stream) getPublicConsumers() []*consumer { - mset.mu.Lock() - defer mset.mu.Unlock() + mset.mu.RLock() + defer mset.mu.RUnlock() var obs []*consumer for _, o := range mset.consumers {