From ca0dc728419079bc332525fa7e2686bcaa10d9dc Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Fri, 24 Jul 2020 15:24:27 +0200 Subject: [PATCH] allow max consumers to be set Signed-off-by: R.I.Pienaar --- server/jetstream.go | 4 +--- test/jetstream_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 37dde97a..7f396397 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -796,10 +796,8 @@ func (jsa *jsAccount) checkLimits(config *StreamConfig) error { return fmt.Errorf("replicas setting of %d not allowed", config.Replicas) } // Check MaxConsumers - if config.MaxConsumers > 0 && config.MaxConsumers > jsa.limits.MaxConsumers { + if config.MaxConsumers > 0 && jsa.limits.MaxConsumers > 0 && config.MaxConsumers > jsa.limits.MaxConsumers { return fmt.Errorf("maximum consumers exceeds account limit") - } else { - config.MaxConsumers = jsa.limits.MaxConsumers } // Check storage, memory or disk. if config.MaxBytes > 0 { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 3507babb..5512a666 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -699,6 +699,34 @@ func TestJetStreamAddStreamBadSubjects(t *testing.T) { expectAPIErr(server.StreamConfig{Name: "MyStream", Subjects: []string{".>"}}) } +func TestJetStreamAddStreamMaxConsumers(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + cfg := &server.StreamConfig{ + Name: "MAXC", + Subjects: []string{"in.maxc.>"}, + MaxConsumers: 1, + } + + acc := s.GlobalAccount() + mset, err := acc.AddStream(cfg) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + + if mset.Config().MaxConsumers != 1 { + t.Fatalf("Expected 1 MaxConsumers, got %d", mset.Config().MaxConsumers) + } +} + func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) { mconfig := &server.StreamConfig{ Name: "ok",