mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Make sure to clear messages from stream when consumer deleted
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1735,8 +1735,26 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
|
||||
mset.unsubscribe(ackSub)
|
||||
mset.unsubscribe(reqSub)
|
||||
delete(mset.consumers, o.name)
|
||||
rp := mset.config.Retention
|
||||
mset.mu.Unlock()
|
||||
|
||||
// We need to optionally remove all messages since we are interest based retention.
|
||||
if dflag && rp == InterestPolicy {
|
||||
var seqs []uint64
|
||||
o.mu.Lock()
|
||||
for seq := range o.pending {
|
||||
seqs = append(seqs, seq)
|
||||
}
|
||||
o.mu.Unlock()
|
||||
// Sort just to keep pending sparse array state small.
|
||||
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
|
||||
for _, seq := range seqs {
|
||||
if !mset.checkInterest(seq, o) {
|
||||
mset.store.RemoveMsg(seq)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we stamp our update state
|
||||
if !dflag {
|
||||
o.writeState()
|
||||
|
||||
@@ -4999,16 +4999,51 @@ func TestJetStreamInterestRetentionStreamWithDurableRestart(t *testing.T) {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
o.Delete()
|
||||
|
||||
sub, _ = nc.SubscribeSync(nats.NewInbox())
|
||||
nc.Flush()
|
||||
|
||||
cfg.DeliverSubject = sub.Subject
|
||||
cfg.AckPolicy = server.AckExplicit // Set ack
|
||||
if o, err = mset.AddConsumer(cfg); err != nil {
|
||||
t.Fatalf("Error re-establishing the durable consumer: %v", err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
checkSubPending(0)
|
||||
checkNumMsgs(0)
|
||||
|
||||
// Now queue up some messages.
|
||||
for i := 1; i <= 10; i++ {
|
||||
sendStreamMsg(t, nc, "IK", fmt.Sprintf("M%d", i))
|
||||
}
|
||||
checkNumMsgs(10)
|
||||
checkSubPending(10)
|
||||
|
||||
// Create second consumer
|
||||
sub2, _ := nc.SubscribeSync(nats.NewInbox())
|
||||
nc.Flush()
|
||||
cfg.DeliverSubject = sub2.Subject
|
||||
cfg.Durable = "derek"
|
||||
o2, err := mset.AddConsumer(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating second durable consumer: %v", err)
|
||||
}
|
||||
|
||||
// Now queue up some messages.
|
||||
for i := 11; i <= 20; i++ {
|
||||
sendStreamMsg(t, nc, "IK", fmt.Sprintf("M%d", i))
|
||||
}
|
||||
checkNumMsgs(20)
|
||||
checkSubPending(20)
|
||||
|
||||
// Now make sure deleting the consumers will remove messages from
|
||||
// the stream since we are interest retention based.
|
||||
o.Delete()
|
||||
checkNumMsgs(10)
|
||||
|
||||
o2.Delete()
|
||||
checkNumMsgs(0)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user