diff --git a/server/consumer.go b/server/consumer.go index db24057e..b801f199 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -745,14 +745,13 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre) o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name) - // If not durable determine the inactive threshold. - if !o.isDurable() { - if o.cfg.InactiveThreshold != 0 { - o.dthresh = o.cfg.InactiveThreshold - } else { - // Add in 1 sec of jitter above and beyond the default of 5s. - o.dthresh = JsDeleteWaitTimeDefault + time.Duration(rand.Int63n(1000))*time.Millisecond - } + // If the user has set the inactive threshold, set that up here. + if o.cfg.InactiveThreshold > 0 { + o.dthresh = o.cfg.InactiveThreshold + } else if !o.isDurable() { + // Ephemerals will always have inactive thresholds. + // Add in 1 sec of jitter above and beyond the default of 5s. + o.dthresh = JsDeleteWaitTimeDefault + time.Duration(rand.Int63n(1000))*time.Millisecond } if o.isPushMode() { @@ -953,11 +952,11 @@ func (o *consumer) setLeader(isLeader bool) { stopAndClearTimer(&o.gwdtmr) o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() }) } - } else if !o.isDurable() { - // Ephemeral pull consumer. We run the dtmr all the time for this one. - if o.dtmr != nil { - stopAndClearTimer(&o.dtmr) - } + } + + if o.dthresh > 0 && (o.isPullMode() || !o.active) { + // Pull consumer. We run the dtmr all the time for this one. + stopAndClearTimer(&o.dtmr) o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) } @@ -1200,9 +1199,9 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { // Stop and clear the delete timer always. stopAndClearTimer(&o.dtmr) - // If we do not have interest anymore and we are not durable start + // If we do not have interest anymore and have a delete threshold set, then set // a timer to delete us. We wait for a bit in case of server reconnect. - if !o.isDurable() && !interest { + if !interest && o.dthresh > 0 { o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) return true } @@ -4017,25 +4016,6 @@ func validFilteredSubject(filteredSubject string, subjects []string) bool { return false } -// setInActiveDeleteThreshold sets the delete threshold for how long to wait -// before deleting an inactive ephemeral consumer. -func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error { - o.mu.Lock() - defer o.mu.Unlock() - - if o.isDurable() { - return fmt.Errorf("consumer is not ephemeral") - } - deleteWasRunning := o.dtmr != nil - stopAndClearTimer(&o.dtmr) - // Do not add jitter if set via here. - o.dthresh = dthresh - if deleteWasRunning { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) - } - return nil -} - // switchToEphemeral is called on startup when recovering ephemerals. func (o *consumer) switchToEphemeral() { o.mu.Lock() diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 0a1d28ec..6ff138b7 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1345,3 +1345,19 @@ func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { } return resp.StreamInfo } + +// setInActiveDeleteThreshold sets the delete threshold for how long to wait +// before deleting an inactive consumer. +func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error { + o.mu.Lock() + defer o.mu.Unlock() + + deleteWasRunning := o.dtmr != nil + stopAndClearTimer(&o.dtmr) + // Do not add jitter if set via here. + o.dthresh = dthresh + if deleteWasRunning { + o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + } + return nil +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 6e721211..d5905099 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -5372,7 +5372,6 @@ func TestJetStreamConsumerInactiveNoDeadlock(t *testing.T) { // the internal sendq. sub.Unsubscribe() nc.Flush() - }) } } @@ -17839,6 +17838,79 @@ func TestJetStreamDirectMsgGet(t *testing.T) { require_True(t, m.Header.Get("Description") == "Message Not Found") } +func TestJetStreamConsumerInactiveThreshold(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + js.PublishAsync("foo", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + waitOnCleanup := func(ci *nats.ConsumerInfo) { + t.Helper() + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + _, err := js.ConsumerInfo(ci.Stream, ci.Name) + if err == nil { + return fmt.Errorf("Consumer still present") + } + return nil + }) + } + + // Test to make sure inactive threshold is enforced for all types. + // Ephemeral and Durable, both push and pull. + + // Ephemeral Push (no bind to deliver subject) + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + DeliverSubject: "_no_bind_", + InactiveThreshold: 50 * time.Millisecond, + }) + require_NoError(t, err) + waitOnCleanup(ci) + + // Ephemeral Pull + ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + InactiveThreshold: 50 * time.Millisecond, + }) + require_NoError(t, err) + waitOnCleanup(ci) + + // Support InactiveThresholds for Durables as well. + + // Durable Push (no bind to deliver subject) + ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "d1", + DeliverSubject: "_no_bind_", + InactiveThreshold: 50 * time.Millisecond, + }) + require_NoError(t, err) + waitOnCleanup(ci) + + // Durable Pull + ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "d2", + AckPolicy: nats.AckExplicitPolicy, + InactiveThreshold: 50 * time.Millisecond, + }) + require_NoError(t, err) + waitOnCleanup(ci) +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////