mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 18:20:42 -07:00
Fixed up consumer stream pending logic from merge with main.
Bumped version Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -41,7 +41,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.10.0-beta.22"
|
||||
VERSION = "2.10.0-beta.23"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -3704,23 +3704,16 @@ func (o *consumer) streamNumPending() uint64 {
|
||||
o.npc, o.npf = 0, 0
|
||||
// Consumer without filters.
|
||||
if o.subjf == nil {
|
||||
npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject)
|
||||
npc, npf := o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
|
||||
o.npc, o.npf = int64(npc), npf
|
||||
return o.numPending()
|
||||
}
|
||||
// Consumer with filters.
|
||||
for _, filter := range o.subjf {
|
||||
npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject)
|
||||
npc, npf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject)
|
||||
o.npc += int64(npc)
|
||||
o.npf = npf // Always last
|
||||
|
||||
for _, ss := range o.mset.store.SubjectsState(filter.subject) {
|
||||
if filter.nextSeq <= ss.Last {
|
||||
o.npc++
|
||||
if ss.Last > o.npf {
|
||||
o.npf = ss.Last
|
||||
}
|
||||
}
|
||||
if npf > o.npf {
|
||||
o.npf = npf // Always last
|
||||
}
|
||||
}
|
||||
return o.numPending()
|
||||
@@ -3728,8 +3721,8 @@ func (o *consumer) streamNumPending() uint64 {
|
||||
// Every other Delivery Policy is handled here.
|
||||
// Consumer without filters.
|
||||
if o.subjf == nil {
|
||||
ss := o.mset.store.FilteredState(o.sseq, _EMPTY_)
|
||||
o.npc, o.npf = int64(ss.Msgs), ss.Last
|
||||
npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject)
|
||||
o.npc, o.npf = int64(npc), npf
|
||||
return o.numPending()
|
||||
}
|
||||
// Consumer with filters.
|
||||
@@ -3739,21 +3732,13 @@ func (o *consumer) streamNumPending() uint64 {
|
||||
if filter.currentSeq < o.sseq {
|
||||
filter.currentSeq = o.sseq
|
||||
}
|
||||
ss := o.mset.store.FilteredState(filter.currentSeq, filter.subject)
|
||||
o.npc += int64(ss.Msgs)
|
||||
if ss.Last > o.npf {
|
||||
o.npf = ss.Last
|
||||
npc, npf := o.mset.store.NumPending(filter.currentSeq, filter.subject, isLastPerSubject)
|
||||
o.npc += int64(npc)
|
||||
if npf > o.npf {
|
||||
o.npf = npf // Always last
|
||||
}
|
||||
}
|
||||
|
||||
=======
|
||||
} else {
|
||||
isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject
|
||||
// Set our num pending and valid sequence floor.
|
||||
npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject)
|
||||
o.npc, o.npf = int64(npc), npf
|
||||
>>>>>>> main
|
||||
|
||||
}
|
||||
return o.numPending()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user