From eb4856e4a7041355ebfbc58b7ecb4de75ccff051 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sat, 16 Apr 2022 13:37:46 -0600 Subject: [PATCH] Cleanup timers on consumer leader change Signed-off-by: Ivan Kozlovic --- server/consumer.go | 5 +++++ server/jetstream_api.go | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index ab3ad5de..c76ba146 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -962,6 +962,11 @@ func (o *consumer) setLeader(isLeader bool) { // Reset waiting if we are in pull mode. if o.isPullMode() { o.waiting = newWaitQueue(o.cfg.MaxWaiting) + if !o.isDurable() { + stopAndClearTimer(&o.dtmr) + } + } else if o.srv.gateway.enabled { + stopAndClearTimer(&o.gwdtmr) } o.mu.Unlock() } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 52e583b1..52fb3fd1 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2085,7 +2085,9 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ } // Call actual stepdown. - o.raftNode().StepDown() + if n := o.raftNode(); n != nil { + n.StepDown() + } resp.Success = true s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))