diff --git a/server/consumer.go b/server/consumer.go index ba1a0456..3c62104b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3244,6 +3244,11 @@ func (o *consumer) checkPending() { now := time.Now().UnixNano() ttl := int64(o.cfg.AckWait) next := int64(o.ackWait(0)) + // However, if there is backoff, initializes with the largest backoff. + // It will be adjusted as needed. + if l := len(o.cfg.BackOff); l > 0 { + next = int64(o.cfg.BackOff[l-1]) + } var shouldUpdateState bool var state StreamState @@ -3268,12 +3273,20 @@ func (o *consumer) checkPending() { continue } elapsed, deadline := now-p.Timestamp, ttl - if len(o.cfg.BackOff) > 0 && o.rdc != nil { + if len(o.cfg.BackOff) > 0 { + // This is ok even if o.rdc is nil, we would get dc == 0, which is what we want. dc := int(o.rdc[seq]) - if dc >= len(o.cfg.BackOff) { + // This will be the index for the next backoff, will set to last element if needed. + nbi := dc + 1 + if dc+1 >= len(o.cfg.BackOff) { dc = len(o.cfg.BackOff) - 1 + nbi = dc } deadline = int64(o.cfg.BackOff[dc]) + // Set `next` to the next backoff (if smaller than current `next` value). + if nextBackoff := int64(o.cfg.BackOff[nbi]); nextBackoff < next { + next = nextBackoff + } } if elapsed >= deadline { if !o.onRedeliverQueue(seq) { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 85ac948e..e83efabb 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10682,8 +10682,11 @@ func TestJetStreamClusterStreamTagPlacement(t *testing.T) { reset := func(s *Server) { s.mu.Lock() - s.sys.resetCh <- struct{}{} + rch := s.sys.resetCh s.mu.Unlock() + if rch != nil { + rch <- struct{}{} + } s.sendStatszUpdate() } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index c58e6680..6196b1dd 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -30,6 +30,7 @@ import ( "path/filepath" "reflect" "runtime" + "runtime/debug" "sort" "strconv" "strings" @@ -16227,6 +16228,84 @@ func TestJetStreamAddStreamWithFilestoreFailure(t *testing.T) { } } +type checkFastState struct { + count int64 + StreamStore +} + +func (s *checkFastState) FastState(state *StreamState) { + // Keep track only when called from checkPending() + if bytes.Contains(debug.Stack(), []byte("checkPending(")) { + atomic.AddInt64(&s.count, 1) + } + s.StreamStore.FastState(state) +} + +func TestJetStreamBackOffCheckPending(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.delete() + + // Plug or store to see how many times we invoke FastState, which is done in checkPending + mset.mu.Lock() + st := &checkFastState{StreamStore: mset.store} + mset.store = st + mset.mu.Unlock() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + sendStreamMsg(t, nc, "foo", "Hello World!") + + sub, _ := nc.SubscribeSync(nats.NewInbox()) + defer sub.Unsubscribe() + nc.Flush() + + o, err := mset.addConsumer(&ConsumerConfig{ + DeliverSubject: sub.Subject, + AckPolicy: AckExplicit, + MaxDeliver: 1000, + BackOff: []time.Duration{50 * time.Millisecond, 250 * time.Millisecond, time.Second}, + }) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer o.delete() + + // Check the first delivery and the following 2 redeliveries + start := time.Now() + natsNexMsg(t, sub, time.Second) + if dur := time.Since(start); dur >= 50*time.Millisecond { + t.Fatalf("Expected first delivery to be fast, took: %v", dur) + } + start = time.Now() + natsNexMsg(t, sub, time.Second) + if dur := time.Since(start); dur < 25*time.Millisecond || dur > 75*time.Millisecond { + t.Fatalf("Expected first redelivery to be ~50ms, took: %v", dur) + } + start = time.Now() + natsNexMsg(t, sub, time.Second) + if dur := time.Since(start); dur < 200*time.Millisecond || dur > 300*time.Millisecond { + t.Fatalf("Expected first redelivery to be ~250ms, took: %v", dur) + } + // There was a bug that would cause checkPending to be invoked based on the + // ackWait (which in this case would be the first value of BackOff, which + // is 50ms). So we would call checkPending() too many times. + time.Sleep(500 * time.Millisecond) + // Check now, it should have been invoked twice. + if n := atomic.LoadInt64(&st.count); n != 2 { + t.Fatalf("Expected checkPending to be invoked 2 times, was %v", n) + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////