Direct consumers used for mirroring should not be affected by max consumer limits

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-09-22 14:34:32 -07:00
parent 6c633013fb
commit ebb24006c2
4 changed files with 71 additions and 10 deletions

View File

@@ -490,7 +490,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if maxc <= 0 || (mset.jsa.limits.MaxConsumers > 0 && mset.jsa.limits.MaxConsumers < maxc) {
maxc = mset.jsa.limits.MaxConsumers
}
if maxc > 0 && len(mset.consumers) >= maxc {
if maxc > 0 && mset.numPublicConsumers() >= maxc {
mset.mu.Unlock()
return nil, NewJSMaximumConsumersLimitError()
}

View File

@@ -4119,10 +4119,19 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
// Check for max consumers here to short circuit if possible.
if maxc := sa.Config.MaxConsumers; maxc > 0 && len(sa.consumers) >= maxc {
resp.Error = NewJSMaximumConsumersLimitError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
if maxc := sa.Config.MaxConsumers; maxc > 0 {
// Don't count DIRECTS.
total := 0
for _, ca := range sa.consumers {
if ca.Config != nil && !ca.Config.Direct {
total++
}
}
if total >= maxc {
resp.Error = NewJSMaximumConsumersLimitError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
}
// Also short circuit if DeliverLastPerSubject is set with no FilterSubject.

View File

@@ -19,7 +19,6 @@ import (
"bufio"
"bytes"
"compress/gzip"
crand "crypto/rand"
"encoding/binary"
"encoding/json"
"fmt"
@@ -37,6 +36,8 @@ import (
"testing"
"time"
crand "crypto/rand"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
@@ -3504,3 +3505,49 @@ func TestNoRaceJetStreamClusterInterestRetentionDeadlock(t *testing.T) {
return nil
})
}
func TestNoRaceJetStreamClusterMaxConsumersAndDirect(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// We want to max sure max consumer limits do not affect mirrors or sources etc.
_, err := js.AddStream(&nats.StreamConfig{Name: "S", Storage: nats.MemoryStorage, MaxConsumers: 1})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var mirrors []string
for i := 0; i < 10; i++ {
// Create a mirror.
mname := fmt.Sprintf("M-%d", i+1)
mirrors = append(mirrors, mname)
_, err = js.AddStream(&nats.StreamConfig{Name: mname, Mirror: &nats.StreamSource{Name: "S"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Queue up messages.
numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("S", []byte("Q"))
}
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, mname := range mirrors {
si, err := js.StreamInfo(mname)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
return fmt.Errorf("Expected %d msgs for %q, got state: %+v", numRequests, mname, si.State)
}
}
return nil
})
}

View File

@@ -3303,8 +3303,8 @@ func (mset *stream) getMsg(seq uint64) (*StoredMsg, error) {
// getConsumers will return all the current consumers for this stream.
func (mset *stream) getConsumers() []*consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.mu.RLock()
defer mset.mu.RUnlock()
var obs []*consumer
for _, o := range mset.consumers {
@@ -3313,10 +3313,15 @@ func (mset *stream) getConsumers() []*consumer {
return obs
}
// Lock should be held for this one.
func (mset *stream) numPublicConsumers() int {
return len(mset.consumers) - mset.directs
}
// This returns all consumers that are not DIRECT.
func (mset *stream) getPublicConsumers() []*consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.mu.RLock()
defer mset.mu.RUnlock()
var obs []*consumer
for _, o := range mset.consumers {