From bcbf28fb3a41a3502bfaf30fdbc743434221bec7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 3 Sep 2020 11:41:14 -0700 Subject: [PATCH] Make sure to clear messages from stream when consumer deleted Signed-off-by: Derek Collison --- server/consumer.go | 18 ++++++++++++++++++ test/jetstream_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/server/consumer.go b/server/consumer.go index 3660d641..dc006129 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1735,8 +1735,26 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { mset.unsubscribe(ackSub) mset.unsubscribe(reqSub) delete(mset.consumers, o.name) + rp := mset.config.Retention mset.mu.Unlock() + // We need to optionally remove all messages since we are interest based retention. + if dflag && rp == InterestPolicy { + var seqs []uint64 + o.mu.Lock() + for seq := range o.pending { + seqs = append(seqs, seq) + } + o.mu.Unlock() + // Sort just to keep pending sparse array state small. + sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) + for _, seq := range seqs { + if !mset.checkInterest(seq, o) { + mset.store.RemoveMsg(seq) + } + } + } + // Make sure we stamp our update state if !dflag { o.writeState() diff --git a/test/jetstream_test.go b/test/jetstream_test.go index c0203256..8280932b 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -4999,16 +4999,51 @@ func TestJetStreamInterestRetentionStreamWithDurableRestart(t *testing.T) { } return nil }) + o.Delete() sub, _ = nc.SubscribeSync(nats.NewInbox()) nc.Flush() + cfg.DeliverSubject = sub.Subject + cfg.AckPolicy = server.AckExplicit // Set ack if o, err = mset.AddConsumer(cfg); err != nil { t.Fatalf("Error re-establishing the durable consumer: %v", err) } time.Sleep(100 * time.Millisecond) checkSubPending(0) checkNumMsgs(0) + + // Now queue up some messages. + for i := 1; i <= 10; i++ { + sendStreamMsg(t, nc, "IK", fmt.Sprintf("M%d", i)) + } + checkNumMsgs(10) + checkSubPending(10) + + // Create second consumer + sub2, _ := nc.SubscribeSync(nats.NewInbox()) + nc.Flush() + cfg.DeliverSubject = sub2.Subject + cfg.Durable = "derek" + o2, err := mset.AddConsumer(cfg) + if err != nil { + t.Fatalf("Error creating second durable consumer: %v", err) + } + + // Now queue up some messages. + for i := 11; i <= 20; i++ { + sendStreamMsg(t, nc, "IK", fmt.Sprintf("M%d", i)) + } + checkNumMsgs(20) + checkSubPending(20) + + // Now make sure deleting the consumers will remove messages from + // the stream since we are interest retention based. + o.Delete() + checkNumMsgs(10) + + o2.Delete() + checkNumMsgs(0) }) } }