mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
@@ -166,7 +166,7 @@ type stream struct {
|
||||
outq *jsOutQ
|
||||
msgs *inbound
|
||||
store StreamStore
|
||||
ackq *ackMsgQueue
|
||||
ackq *ipQueue // queue of uint64
|
||||
lseq uint64
|
||||
lmsgId string
|
||||
consumers map[string]*consumer
|
||||
@@ -384,7 +384,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
|
||||
|
||||
// For no-ack consumers when we are interest retention.
|
||||
if cfg.Retention != LimitsPolicy {
|
||||
mset.ackq = &ackMsgQueue{mch: make(chan struct{}, 1)}
|
||||
mset.ackq = newIPQueue() // of uint64
|
||||
}
|
||||
|
||||
jsa.streams[cfg.Name] = mset
|
||||
@@ -3191,49 +3191,6 @@ func (mset *stream) subjects() []string {
|
||||
return copyStrings(mset.cfg.Subjects)
|
||||
}
|
||||
|
||||
// Linked list for async ack of messages.
|
||||
// When we have a consumer to a stream that is interest based and the
|
||||
// consumer is R=1 and acknone. This is how mirrors and sources replicate.
|
||||
type ackMsgQueue struct {
|
||||
sync.Mutex
|
||||
mch chan struct{}
|
||||
seqs []uint64
|
||||
back []uint64
|
||||
}
|
||||
|
||||
// Push onto the queue.
|
||||
func (q *ackMsgQueue) push(seq uint64) {
|
||||
q.Lock()
|
||||
notify := len(q.seqs) == 0
|
||||
q.seqs = append(q.seqs, seq)
|
||||
q.Unlock()
|
||||
if notify {
|
||||
select {
|
||||
case q.mch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Pop all pending off.
|
||||
func (q *ackMsgQueue) pop() []uint64 {
|
||||
q.Lock()
|
||||
seqs := q.seqs
|
||||
q.seqs, q.back = q.back, nil
|
||||
q.Unlock()
|
||||
return seqs
|
||||
}
|
||||
|
||||
func (q *ackMsgQueue) recycle(seqs []uint64) {
|
||||
const maxAckQueueReuse = 8 * 1024
|
||||
if cap(seqs) > maxAckQueueReuse {
|
||||
return
|
||||
}
|
||||
q.Lock()
|
||||
q.back = seqs[:0]
|
||||
q.Unlock()
|
||||
}
|
||||
|
||||
func (mset *stream) internalLoop() {
|
||||
mset.mu.RLock()
|
||||
s := mset.srv
|
||||
@@ -3246,10 +3203,10 @@ func (mset *stream) internalLoop() {
|
||||
// For the ack msgs queue for interest retention.
|
||||
var (
|
||||
amch chan struct{}
|
||||
ackq *ackMsgQueue
|
||||
ackq *ipQueue // of uint64
|
||||
)
|
||||
if mset.ackq != nil {
|
||||
ackq, amch = mset.ackq, mset.ackq.mch
|
||||
ackq, amch = mset.ackq, mset.ackq.ch
|
||||
}
|
||||
mset.mu.RUnlock()
|
||||
|
||||
@@ -3303,9 +3260,9 @@ func (mset *stream) internalLoop() {
|
||||
case <-amch:
|
||||
seqs := ackq.pop()
|
||||
for _, seq := range seqs {
|
||||
mset.ackMsg(nil, seq)
|
||||
mset.ackMsg(nil, seq.(uint64))
|
||||
}
|
||||
ackq.recycle(seqs)
|
||||
ackq.recycle(&seqs)
|
||||
case <-qch:
|
||||
return
|
||||
case <-s.quitCh:
|
||||
|
||||
Reference in New Issue
Block a user