Pending timers could go negative, this is a fix for #1502

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-07-06 12:03:04 -07:00
parent 33deee8a64
commit aecdca874f
2 changed files with 61 additions and 10 deletions

View File

@@ -748,7 +748,7 @@ const ackWaitDelay = time.Millisecond
// ackWait returns how long to wait to fire the pending timer.
func (o *Consumer) ackWait(next time.Duration) time.Duration {
if next != 0 {
if next > 0 {
return next + ackWaitDelay
}
return o.config.AckWait + ackWaitDelay
@@ -1387,12 +1387,14 @@ func (o *Consumer) checkPending() {
var expired []uint64
for seq, ts := range o.pending {
elapsed := now - ts
if elapsed > ttl && !o.onRedeliverQueue(seq) {
expired = append(expired, seq)
shouldSignal = true
} else if elapsed-ttl < next {
if elapsed >= ttl {
if !o.onRedeliverQueue(seq) {
expired = append(expired, seq)
shouldSignal = true
}
} else if ttl-elapsed < next {
// Update when we should fire next.
next = elapsed - ttl
next = ttl - elapsed
}
}
@@ -1401,10 +1403,9 @@ func (o *Consumer) checkPending() {
o.rdq = append(o.rdq, expired...)
// Now we should update the timestamp here since we are redelivering.
// We will use an incrementing time to preserve order for any other redelivery.
now := time.Now()
off := now - o.pending[expired[0]]
for _, seq := range expired {
now = now.Add(time.Microsecond)
o.pending[seq] = now.UnixNano()
o.pending[seq] += off
}
}

View File

@@ -1897,7 +1897,7 @@ func TestJetStreamAckAllRedelivery(t *testing.T) {
// Wait for messages.
// We will do 5 redeliveries.
for i := 1; i <= 5; i++ {
checkFor(t, 500*time.Millisecond, 25*time.Millisecond, func() error {
checkFor(t, 500*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend*i {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend*i)
}
@@ -4043,6 +4043,56 @@ func TestJetStreamRedeliverAndLateAck(t *testing.T) {
}
}
// https://github.com/nats-io/nats-server/issues/1502
func TestJetStreamPendingNextTimer(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Forced cleanup of all persisted state.
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: "NT", Storage: server.MemoryStorage, Subjects: []string{"ORDERS.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
o, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "DDD",
AckPolicy: server.AckExplicit,
FilterSubject: "ORDERS.test",
AckWait: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
sendAndReceive := func() {
nc := clientConnectToServer(t, s)
defer nc.Close()
// Queue up message
sendStreamMsg(t, nc, "ORDERS.test", "Hello World! #1")
sendStreamMsg(t, nc, "ORDERS.test", "Hello World! #2")
nextSubj := o.RequestNextMsgSubject()
for i := 0; i < 2; i++ {
if _, err := nc.Request(nextSubj, nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
nc.Close()
time.Sleep(200 * time.Millisecond)
}
sendAndReceive()
sendAndReceive()
sendAndReceive()
}
func TestJetStreamCanNotNakAckd(t *testing.T) {
cases := []struct {
name string