From 395409e2cd01ba8a9d5b80bb91f1d27fe1cc98a0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 25 Feb 2023 19:50:03 -0800 Subject: [PATCH] Fixed up consumer stream pending logic from merge with main. Bumped version Signed-off-by: Derek Collison --- server/const.go | 2 +- server/consumer.go | 37 +++++++++++-------------------------- 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/server/const.go b/server/const.go index ef10a9d4..23db3f54 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.10.0-beta.22" + VERSION = "2.10.0-beta.23" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/consumer.go b/server/consumer.go index 8de08d99..d120d4c5 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3704,23 +3704,16 @@ func (o *consumer) streamNumPending() uint64 { o.npc, o.npf = 0, 0 // Consumer without filters. if o.subjf == nil { - npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject) + npc, npf := o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject) o.npc, o.npf = int64(npc), npf return o.numPending() } // Consumer with filters. for _, filter := range o.subjf { - npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject) + npc, npf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject) o.npc += int64(npc) - o.npf = npf // Always last - - for _, ss := range o.mset.store.SubjectsState(filter.subject) { - if filter.nextSeq <= ss.Last { - o.npc++ - if ss.Last > o.npf { - o.npf = ss.Last - } - } + if npf > o.npf { + o.npf = npf // Always last } } return o.numPending() @@ -3728,8 +3721,8 @@ func (o *consumer) streamNumPending() uint64 { // Every other Delivery Policy is handled here. // Consumer without filters. if o.subjf == nil { - ss := o.mset.store.FilteredState(o.sseq, _EMPTY_) - o.npc, o.npf = int64(ss.Msgs), ss.Last + npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject) + o.npc, o.npf = int64(npc), npf return o.numPending() } // Consumer with filters. @@ -3739,21 +3732,13 @@ func (o *consumer) streamNumPending() uint64 { if filter.currentSeq < o.sseq { filter.currentSeq = o.sseq } - ss := o.mset.store.FilteredState(filter.currentSeq, filter.subject) - o.npc += int64(ss.Msgs) - if ss.Last > o.npf { - o.npf = ss.Last + npc, npf := o.mset.store.NumPending(filter.currentSeq, filter.subject, isLastPerSubject) + o.npc += int64(npc) + if npf > o.npf { + o.npf = npf // Always last } + } -======= - } else { - isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject - // Set our num pending and valid sequence floor. - npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject) - o.npc, o.npf = int64(npc), npf ->>>>>>> main - -} return o.numPending() }