mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Fix for #3499
When we deleted a consumer from an interest policy stream we would make sure to clean up any unacked messages. However we only based start from the ack floor for the consumer and did not take into account the first sequence of the stream. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -4105,13 +4105,18 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
|
||||
// We will do this consistently on all replicas. Note that if in clustered mode the
|
||||
// non-leader consumers will need to restore state first.
|
||||
if dflag && rp == InterestPolicy {
|
||||
stop := mset.lastSeq()
|
||||
state := mset.state()
|
||||
stop := state.LastSeq
|
||||
o.mu.Lock()
|
||||
if !o.isLeader() {
|
||||
o.readStoredState(stop)
|
||||
}
|
||||
start := o.asflr
|
||||
o.mu.Unlock()
|
||||
// Make sure we start at worst with first sequence in the stream.
|
||||
if start < state.FirstSeq {
|
||||
start = state.FirstSeq
|
||||
}
|
||||
|
||||
var rmseqs []uint64
|
||||
mset.mu.RLock()
|
||||
|
||||
@@ -5724,3 +5724,59 @@ func TestNoRaceJetStreamManyPullConsumersNeedAckOptimization(t *testing.T) {
|
||||
last := msgs[len(msgs)-1]
|
||||
last.AckSync()
|
||||
}
|
||||
|
||||
// https://github.com/nats-io/nats-server/issues/3499
|
||||
func TestNoRaceJetStreamDeleteConsumerWithInterestStreamAndHighSeqs(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
defer s.Shutdown()
|
||||
|
||||
// Client for API requests.
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"log.>"},
|
||||
Retention: nats.InterestPolicy,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "c",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// Set baseline for time to delete so we can see linear increase as sequence numbers increase.
|
||||
start := time.Now()
|
||||
err = js.DeleteConsumer("TEST", "c")
|
||||
require_NoError(t, err)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Crank up sequence numbers.
|
||||
msg := []byte(strings.Repeat("ZZZ", 128))
|
||||
for i := 0; i < 5_000_000; i++ {
|
||||
nc.Publish("log.Z", msg)
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "c",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// We have a bug that spins unecessarily through all the sequences from this consumer's
|
||||
// ackfloor(0) and the last sequence for the stream. We will detect by looking for the time
|
||||
// to delete being 100x more. Should be the same since both times no messages exist in the stream.
|
||||
start = time.Now()
|
||||
err = js.DeleteConsumer("TEST", "c")
|
||||
require_NoError(t, err)
|
||||
|
||||
if e := time.Since(start); e > 100*elapsed {
|
||||
t.Fatalf("Consumer delete took too long: %v vs baseline %v", e, elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user