diff --git a/server/consumer.go b/server/consumer.go index 304fb326..8cc502e9 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1091,7 +1091,7 @@ func (o *consumer) setLeader(isLeader bool) { if o.dthresh > 0 && (o.isPullMode() || !o.active) { // Pull consumer. We run the dtmr all the time for this one. stopAndClearTimer(&o.dtmr) - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } // If we are not in ReplayInstant mode mark us as in replay state until resolved. @@ -1121,7 +1121,6 @@ func (o *consumer) setLeader(isLeader bool) { if pullMode { // Now start up Go routine to process inbound next message requests. go o.processInboundNextMsgReqs(qch) - } // If we are R>1 spin up our proposal loop. @@ -1140,7 +1139,10 @@ func (o *consumer) setLeader(isLeader bool) { close(o.qch) o.qch = nil } - // Make sure to clear out any re delivery queues + // Stop any inactivity timers. Should only be running on leaders. + stopAndClearTimer(&o.dtmr) + + // Make sure to clear out any re-deliver queues stopAndClearTimer(&o.ptmr) o.rdq, o.rdqi = nil, nil o.pending = nil @@ -1156,9 +1158,6 @@ func (o *consumer) setLeader(isLeader bool) { // Reset waiting if we are in pull mode. if o.isPullMode() { o.waiting = newWaitQueue(o.cfg.MaxWaiting) - if !o.isDurable() { - stopAndClearTimer(&o.dtmr) - } o.nextMsgReqs.drain() } else if o.srv.gateway.enabled { stopAndClearTimer(&o.gwdtmr) @@ -1349,7 +1348,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { // If we do not have interest anymore and have a delete threshold set, then set // a timer to delete us. We wait for a bit in case of server reconnect. if !interest && o.dthresh > 0 { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) return true } return false @@ -1376,7 +1375,7 @@ func (o *consumer) deleteNotActive() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh - elapsed) } else { - o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive) } o.mu.Unlock() return @@ -1386,7 +1385,7 @@ func (o *consumer) deleteNotActive() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh) } else { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } o.mu.Unlock() return @@ -1640,7 +1639,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { stopAndClearTimer(&o.dtmr) // Restart timer only if we are the leader. if o.isLeader() && o.dthresh > 0 { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3843239d..eb6d5789 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5110,3 +5110,49 @@ func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) { require_NotEqual(t, info.Cluster.Leader, "") require_Equal(t, len(info.Cluster.Replicas), 2) } + +func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Queue a msg. + sendStreamMsg(t, nc, "foo", "ok") + + thresh := 250 * time.Millisecond + + // This will start the timer. + sub, err := js.PullSubscribe("foo", "dlc", nats.InactiveThreshold(thresh)) + require_NoError(t, err) + + // Switch over leader. + cl := c.consumerLeader(globalAccountName, "TEST", "dlc") + cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "dlc") + c.waitOnConsumerLeader(globalAccountName, "TEST", "dlc") + + // Create activity on this consumer. + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + + // This is consider activity as well. So we can watch now up to thresh to make sure consumer still active. + msgs[0].AckSync() + + // The consumer should not disappear for next `thresh` interval unless old leader does so. + timeout := time.Now().Add(thresh) + for time.Now().Before(timeout) { + _, err := js.ConsumerInfo("TEST", "dlc") + if err == nats.ErrConsumerNotFound { + t.Fatalf("Consumer deleted when it should not have been") + } + } +}