diff --git a/server/consumer.go b/server/consumer.go index 388f6d5b..9c5e62ca 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -28,6 +28,7 @@ import ( "sync/atomic" "time" + "github.com/nats-io/nats-server/v2/server/avl" "github.com/nats-io/nuid" "golang.org/x/time/rate" ) @@ -274,7 +275,7 @@ type consumer struct { pending map[uint64]*Pending ptmr *time.Timer rdq []uint64 - rdqi map[uint64]struct{} + rdqi avl.SequenceSet rdc map[uint64]uint64 maxdc uint64 waiting *waitQueue @@ -1077,7 +1078,8 @@ func (o *consumer) setLeader(isLeader bool) { mset.setConsumerAsLeader(o) o.mu.Lock() - o.rdq, o.rdqi = nil, nil + o.rdq = nil + o.rdqi.Empty() // Restore our saved state. During non-leader status we just update our underlying store. o.readStoredState(lseq) @@ -1200,7 +1202,8 @@ func (o *consumer) setLeader(isLeader bool) { } // Make sure to clear out any re delivery queues stopAndClearTimer(&o.ptmr) - o.rdq, o.rdqi = nil, nil + o.rdq = nil + o.rdqi.Empty() o.pending = nil // ok if they are nil, we protect inside unsubscribe() o.unsubscribe(o.ackSub) @@ -4055,12 +4058,9 @@ func (o *consumer) didNotDeliver(seq uint64) { // Lock should be held. func (o *consumer) addToRedeliverQueue(seqs ...uint64) { - if o.rdqi == nil { - o.rdqi = make(map[uint64]struct{}) - } o.rdq = append(o.rdq, seqs...) for _, seq := range seqs { - o.rdqi[seq] = struct{}{} + o.rdqi.Insert(seq) } } @@ -4075,10 +4075,11 @@ func (o *consumer) getNextToRedeliver() uint64 { } seq := o.rdq[0] if len(o.rdq) == 1 { - o.rdq, o.rdqi = nil, nil + o.rdq = nil + o.rdqi.Empty() } else { o.rdq = append(o.rdq[:0], o.rdq[1:]...) - delete(o.rdqi, seq) + o.rdqi.Delete(seq) } return seq } @@ -4087,11 +4088,7 @@ func (o *consumer) getNextToRedeliver() uint64 { // FIXME(dlc) - This is O(n) but should be fast with small redeliver size. // Lock should be held. func (o *consumer) onRedeliverQueue(seq uint64) bool { - if o.rdqi == nil { - return false - } - _, ok := o.rdqi[seq] - return ok + return o.rdqi.Exists(seq) } // Remove a sequence from the redelivery queue. @@ -4103,10 +4100,11 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool { for i, rseq := range o.rdq { if rseq == seq { if len(o.rdq) == 1 { - o.rdq, o.rdqi = nil, nil + o.rdq = nil + o.rdqi.Empty() } else { o.rdq = append(o.rdq[:i], o.rdq[i+1:]...) - delete(o.rdqi, seq) + o.rdqi.Delete(seq) } return true } @@ -4218,7 +4216,8 @@ func (o *consumer) checkPending() { } else { // Make sure to stop timer and clear out any re delivery queues stopAndClearTimer(&o.ptmr) - o.rdq, o.rdqi = nil, nil + o.rdq = nil + o.rdqi.Empty() o.pending = nil } @@ -4553,7 +4552,8 @@ func (o *consumer) purge(sseq uint64, slseq uint64) { // We need to remove all those being queued for redelivery under o.rdq if len(o.rdq) > 0 { rdq := o.rdq - o.rdq, o.rdqi = nil, nil + o.rdq = nil + o.rdqi.Empty() for _, sseq := range rdq { if sseq >= o.sseq { o.addToRedeliverQueue(sseq)