From c6031382a1929a8c943adf4da285d19f68fa9fcf Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 5 Nov 2022 13:56:45 -0700 Subject: [PATCH] Fix for #3499 When we deleted a consumer from an interest policy stream we would make sure to clean up any unacked messages. However we only based start from the ack floor for the consumer and did not take into account the first sequence of the stream. Signed-off-by: Derek Collison --- server/consumer.go | 7 +++++- server/norace_test.go | 56 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index cea2c44f..8b6c8360 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4105,13 +4105,18 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { // We will do this consistently on all replicas. Note that if in clustered mode the // non-leader consumers will need to restore state first. if dflag && rp == InterestPolicy { - stop := mset.lastSeq() + state := mset.state() + stop := state.LastSeq o.mu.Lock() if !o.isLeader() { o.readStoredState(stop) } start := o.asflr o.mu.Unlock() + // Make sure we start at worst with first sequence in the stream. + if start < state.FirstSeq { + start = state.FirstSeq + } var rmseqs []uint64 mset.mu.RLock() diff --git a/server/norace_test.go b/server/norace_test.go index b8766e47..8a6dcd99 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5724,3 +5724,59 @@ func TestNoRaceJetStreamManyPullConsumersNeedAckOptimization(t *testing.T) { last := msgs[len(msgs)-1] last.AckSync() } + +// https://github.com/nats-io/nats-server/issues/3499 +func TestNoRaceJetStreamDeleteConsumerWithInterestStreamAndHighSeqs(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"log.>"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "c", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Set baseline for time to delete so we can see linear increase as sequence numbers increase. + start := time.Now() + err = js.DeleteConsumer("TEST", "c") + require_NoError(t, err) + elapsed := time.Since(start) + + // Crank up sequence numbers. + msg := []byte(strings.Repeat("ZZZ", 128)) + for i := 0; i < 5_000_000; i++ { + nc.Publish("log.Z", msg) + } + nc.Flush() + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "c", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // We have a bug that spins unecessarily through all the sequences from this consumer's + // ackfloor(0) and the last sequence for the stream. We will detect by looking for the time + // to delete being 100x more. Should be the same since both times no messages exist in the stream. + start = time.Now() + err = js.DeleteConsumer("TEST", "c") + require_NoError(t, err) + + if e := time.Since(start); e > 100*elapsed { + t.Fatalf("Consumer delete took too long: %v vs baseline %v", e, elapsed) + } +}