Can't remove based on interest directly

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-04 19:11:24 -08:00
parent 7b1b9a7946
commit 4bfd6485f6
2 changed files with 7 additions and 3 deletions

View File

@@ -1964,7 +1964,7 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) {
mset.store.RemoveMsg(seq)
mset.rmch <- seq
}
if ap == AckExplicit || ap == AckAll {
@@ -2460,6 +2460,7 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error {
seqs = append(seqs, seq)
}
o.mu.Unlock()
// Sort just to keep pending sparse array state small.
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
for _, seq := range seqs {

View File

@@ -139,6 +139,7 @@ type stream struct {
mch chan struct{}
msgs *inbound
store StreamStore
rmch chan uint64
lseq uint64
lmsgId string
consumers map[string]*consumer
@@ -305,6 +306,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
consumers: make(map[string]*consumer),
mch: make(chan struct{}, 1),
msgs: &inbound{},
rmch: make(chan uint64, 8192),
qch: make(chan struct{}),
}
@@ -2339,7 +2341,7 @@ func (mset *stream) internalLoop() {
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, mch := mset.outq, mset.qch, mset.mch
outq, qch, mch, rmch := mset.outq, mset.qch, mset.mch, mset.rmch
isClustered := mset.node != nil
mset.mu.RUnlock()
@@ -2380,7 +2382,6 @@ func (mset *stream) internalLoop() {
pm = next
}
c.flushClients(10 * time.Millisecond)
case <-mch:
for im := mset.pending(); im != nil; {
// If we are clustered we need to propose this message to the underlying raft group.
@@ -2394,6 +2395,8 @@ func (mset *stream) internalLoop() {
im.next, im.hdr, im.msg = nil, nil, nil
im = next
}
case seq := <-rmch:
mset.store.RemoveMsg(seq)
case <-qch:
return
case <-s.quitCh: