From ac2669a02238c5ecc9502ecde9627de0fbd61ec7 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 20 Sep 2023 17:37:05 +0200 Subject: [PATCH] Fix consumer limits Test didn't catch this error, as by default, old JS client sets ack policy to none. If policy is different, it will fail to create consumer with defaults. Signed-off-by: Tomasz Pietrek --- server/consumer.go | 12 ++++++------ server/jetstream_test.go | 3 ++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 3fe09114..a5fd72a2 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -437,6 +437,12 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, if len(config.BackOff) > 0 { config.AckWait = config.BackOff[0] } + if config.MaxAckPending == 0 { + config.MaxAckPending = streamCfg.ConsumerLimits.MaxAckPending + } + if config.InactiveThreshold == 0 { + config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold + } // Set proper default for max ack pending if we are ack explicit and none has been set. if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 { accPending := JsDefaultMaxAckPending @@ -452,12 +458,6 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 { config.MaxRequestBatch = lim.MaxRequestBatch } - if config.MaxAckPending == 0 { - config.MaxAckPending = streamCfg.ConsumerLimits.MaxAckPending - } - if config.InactiveThreshold == 0 { - config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold - } } // Check the consumer config. If we are recovering don't check filter subjects. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 768f762d..4c76f100 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21718,7 +21718,8 @@ func TestJetStreamConsumerDefaultsFromStream(t *testing.T) { t.Run("InheritDefaultsFromStream", func(t *testing.T) { ci, err := js.AddConsumer("test", &nats.ConsumerConfig{ - Name: "InheritDefaultsFromStream", + Name: "InheritDefaultsFromStream", + AckPolicy: nats.AckExplicitPolicy, }) require_NoError(t, err)