From da9046b2e6f3ee2b72b4f1d19eb4fe82e55c0cd5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 9 Feb 2022 13:48:37 -0800 Subject: [PATCH] Snapshot initial consumer info when needed. Signed-off-by: Derek Collison --- server/consumer.go | 34 ++++++++++++++++++++++++++++++++++ server/jetstream_api.go | 2 +- server/jetstream_cluster.go | 5 ++++- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index f47682bd..fd468e57 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -239,6 +239,7 @@ type consumer struct { maxdc uint64 waiting *waitQueue cfg ConsumerConfig + ici *ConsumerInfo store ConsumerStore active bool replay bool @@ -849,6 +850,9 @@ func (o *consumer) setLeader(isLeader bool) { } o.mu.Unlock() + // Snapshot initial info. + o.infoWithSnap(true) + // Now start up Go routine to deliver msgs. go o.loopAndGatherMsgs(qch) @@ -1830,8 +1834,33 @@ func (o *consumer) writeStoreStateUnlocked() error { return o.store.Update(&state) } +// Returns an initial info. Only applicable for non-clustered consumers. +// We will clear after we return it, so one shot. +func (o *consumer) initialInfo() *ConsumerInfo { + o.mu.Lock() + ici := o.ici + o.ici = nil // gc friendly + o.mu.Unlock() + if ici == nil { + ici = o.info() + } + return ici +} + +// Clears our initial info. +// Used when we have a leader change in cluster mode but do not send a response. +func (o *consumer) clearInitialInfo() { + o.mu.Lock() + o.ici = nil // gc friendly + o.mu.Unlock() +} + // Info returns our current consumer state. func (o *consumer) info() *ConsumerInfo { + return o.infoWithSnap(false) +} + +func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo { o.mu.RLock() mset := o.mset if mset == nil || mset.srv == nil { @@ -1885,6 +1914,11 @@ func (o *consumer) info() *ConsumerInfo { o.expireWaiting() info.NumWaiting = o.waiting.len() } + // If we were asked to snapshot do so here. + if snap { + o.ici = info + } + return info } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index fde67a31..f01651cb 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3244,7 +3244,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - resp.ConsumerInfo = o.info() + resp.ConsumerInfo = o.initialInfo() s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4ceb97ae..f5fa25c8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3292,6 +3292,9 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) { } if !isLeader || hasResponded { + if isLeader { + o.clearInitialInfo() + } return } @@ -3300,7 +3303,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) { resp.Error = NewJSConsumerCreateError(err, Unless(err)) s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } else { - resp.ConsumerInfo = o.info() + resp.ConsumerInfo = o.initialInfo() s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) if node := o.raftNode(); node != nil { o.sendCreateAdvisory()