From aecdca874fd745984b636af18f60e5e98dd482ef Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 6 Jul 2020 12:03:04 -0700 Subject: [PATCH] Pending timers could go negative, this is a fix for #1502 Signed-off-by: Derek Collison --- server/consumer.go | 19 +++++++-------- test/jetstream_test.go | 52 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 292a9f3f..c930bc7a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -748,7 +748,7 @@ const ackWaitDelay = time.Millisecond // ackWait returns how long to wait to fire the pending timer. func (o *Consumer) ackWait(next time.Duration) time.Duration { - if next != 0 { + if next > 0 { return next + ackWaitDelay } return o.config.AckWait + ackWaitDelay @@ -1387,12 +1387,14 @@ func (o *Consumer) checkPending() { var expired []uint64 for seq, ts := range o.pending { elapsed := now - ts - if elapsed > ttl && !o.onRedeliverQueue(seq) { - expired = append(expired, seq) - shouldSignal = true - } else if elapsed-ttl < next { + if elapsed >= ttl { + if !o.onRedeliverQueue(seq) { + expired = append(expired, seq) + shouldSignal = true + } + } else if ttl-elapsed < next { // Update when we should fire next. - next = elapsed - ttl + next = ttl - elapsed } } @@ -1401,10 +1403,9 @@ func (o *Consumer) checkPending() { o.rdq = append(o.rdq, expired...) // Now we should update the timestamp here since we are redelivering. // We will use an incrementing time to preserve order for any other redelivery. - now := time.Now() + off := now - o.pending[expired[0]] for _, seq := range expired { - now = now.Add(time.Microsecond) - o.pending[seq] = now.UnixNano() + o.pending[seq] += off } } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index b7ff3371..7e14dbe4 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -1897,7 +1897,7 @@ func TestJetStreamAckAllRedelivery(t *testing.T) { // Wait for messages. // We will do 5 redeliveries. for i := 1; i <= 5; i++ { - checkFor(t, 500*time.Millisecond, 25*time.Millisecond, func() error { + checkFor(t, 500*time.Millisecond, 10*time.Millisecond, func() error { if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend*i { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend*i) } @@ -4043,6 +4043,56 @@ func TestJetStreamRedeliverAndLateAck(t *testing.T) { } } +// https://github.com/nats-io/nats-server/issues/1502 +func TestJetStreamPendingNextTimer(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + // Forced cleanup of all persisted state. + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: "NT", Storage: server.MemoryStorage, Subjects: []string{"ORDERS.*"}}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "DDD", + AckPolicy: server.AckExplicit, + FilterSubject: "ORDERS.test", + AckWait: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + sendAndReceive := func() { + nc := clientConnectToServer(t, s) + defer nc.Close() + + // Queue up message + sendStreamMsg(t, nc, "ORDERS.test", "Hello World! #1") + sendStreamMsg(t, nc, "ORDERS.test", "Hello World! #2") + + nextSubj := o.RequestNextMsgSubject() + for i := 0; i < 2; i++ { + if _, err := nc.Request(nextSubj, nil, time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + nc.Close() + time.Sleep(200 * time.Millisecond) + } + + sendAndReceive() + sendAndReceive() + sendAndReceive() +} + func TestJetStreamCanNotNakAckd(t *testing.T) { cases := []struct { name string