Use AVL tree for consumer redeliver map

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-04-18 12:52:25 +01:00
parent f3e0d6b070
commit 57d888eec4

View File

@@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"
"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nuid"
"golang.org/x/time/rate"
)
@@ -274,7 +275,7 @@ type consumer struct {
pending map[uint64]*Pending
ptmr *time.Timer
rdq []uint64
rdqi map[uint64]struct{}
rdqi avl.SequenceSet
rdc map[uint64]uint64
maxdc uint64
waiting *waitQueue
@@ -1077,7 +1078,8 @@ func (o *consumer) setLeader(isLeader bool) {
mset.setConsumerAsLeader(o)
o.mu.Lock()
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
// Restore our saved state. During non-leader status we just update our underlying store.
o.readStoredState(lseq)
@@ -1200,7 +1202,8 @@ func (o *consumer) setLeader(isLeader bool) {
}
// Make sure to clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
// ok if they are nil, we protect inside unsubscribe()
o.unsubscribe(o.ackSub)
@@ -4055,12 +4058,9 @@ func (o *consumer) didNotDeliver(seq uint64) {
// Lock should be held.
func (o *consumer) addToRedeliverQueue(seqs ...uint64) {
if o.rdqi == nil {
o.rdqi = make(map[uint64]struct{})
}
o.rdq = append(o.rdq, seqs...)
for _, seq := range seqs {
o.rdqi[seq] = struct{}{}
o.rdqi.Insert(seq)
}
}
@@ -4075,10 +4075,11 @@ func (o *consumer) getNextToRedeliver() uint64 {
}
seq := o.rdq[0]
if len(o.rdq) == 1 {
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
} else {
o.rdq = append(o.rdq[:0], o.rdq[1:]...)
delete(o.rdqi, seq)
o.rdqi.Delete(seq)
}
return seq
}
@@ -4087,11 +4088,7 @@ func (o *consumer) getNextToRedeliver() uint64 {
// FIXME(dlc) - This is O(n) but should be fast with small redeliver size.
// Lock should be held.
func (o *consumer) onRedeliverQueue(seq uint64) bool {
if o.rdqi == nil {
return false
}
_, ok := o.rdqi[seq]
return ok
return o.rdqi.Exists(seq)
}
// Remove a sequence from the redelivery queue.
@@ -4103,10 +4100,11 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool {
for i, rseq := range o.rdq {
if rseq == seq {
if len(o.rdq) == 1 {
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
} else {
o.rdq = append(o.rdq[:i], o.rdq[i+1:]...)
delete(o.rdqi, seq)
o.rdqi.Delete(seq)
}
return true
}
@@ -4218,7 +4216,8 @@ func (o *consumer) checkPending() {
} else {
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
}
@@ -4553,7 +4552,8 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
// We need to remove all those being queued for redelivery under o.rdq
if len(o.rdq) > 0 {
rdq := o.rdq
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
for _, sseq := range rdq {
if sseq >= o.sseq {
o.addToRedeliverQueue(sseq)