From 2253bb6f1a2a4783e3708e9eda500d27b43f4828 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 23 Mar 2022 12:40:46 -0600 Subject: [PATCH] JS: BackOff list caused too frequent checkPending() calls Since the "next" timer value is set to the AckWait value, which is the first element in the BackOff list if present, the check would possibly happen at this interval, even when we were past the first redelivery and the backoff interval had increased. The end-user would still see the redelivery be done at the durations indicated by the BackOff list, but internally, we would be checking at the initial BackOff's ack wait. I added a test that uses the store's interface to detect how many times the checkPending() function is invoked. For this test it should have been invoked twice, but without the fix it was invoked 15 times. Also fixed an unrelated test that could possibly deadlock causing tests to be aborted due to inactivity on Travis. Signed-off-by: Ivan Kozlovic --- server/consumer.go | 17 ++++++- server/jetstream_cluster_test.go | 5 +- server/jetstream_test.go | 79 ++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ba1a0456..3c62104b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3244,6 +3244,11 @@ func (o *consumer) checkPending() { now := time.Now().UnixNano() ttl := int64(o.cfg.AckWait) next := int64(o.ackWait(0)) + // However, if there is backoff, initializes with the largest backoff. + // It will be adjusted as needed. + if l := len(o.cfg.BackOff); l > 0 { + next = int64(o.cfg.BackOff[l-1]) + } var shouldUpdateState bool var state StreamState @@ -3268,12 +3273,20 @@ func (o *consumer) checkPending() { continue } elapsed, deadline := now-p.Timestamp, ttl - if len(o.cfg.BackOff) > 0 && o.rdc != nil { + if len(o.cfg.BackOff) > 0 { + // This is ok even if o.rdc is nil, we would get dc == 0, which is what we want. dc := int(o.rdc[seq]) - if dc >= len(o.cfg.BackOff) { + // This will be the index for the next backoff, will set to last element if needed. + nbi := dc + 1 + if dc+1 >= len(o.cfg.BackOff) { dc = len(o.cfg.BackOff) - 1 + nbi = dc } deadline = int64(o.cfg.BackOff[dc]) + // Set `next` to the next backoff (if smaller than current `next` value). + if nextBackoff := int64(o.cfg.BackOff[nbi]); nextBackoff < next { + next = nextBackoff + } } if elapsed >= deadline { if !o.onRedeliverQueue(seq) { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 85ac948e..e83efabb 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10682,8 +10682,11 @@ func TestJetStreamClusterStreamTagPlacement(t *testing.T) { reset := func(s *Server) { s.mu.Lock() - s.sys.resetCh <- struct{}{} + rch := s.sys.resetCh s.mu.Unlock() + if rch != nil { + rch <- struct{}{} + } s.sendStatszUpdate() } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index c58e6680..6196b1dd 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -30,6 +30,7 @@ import ( "path/filepath" "reflect" "runtime" + "runtime/debug" "sort" "strconv" "strings" @@ -16227,6 +16228,84 @@ func TestJetStreamAddStreamWithFilestoreFailure(t *testing.T) { } } +type checkFastState struct { + count int64 + StreamStore +} + +func (s *checkFastState) FastState(state *StreamState) { + // Keep track only when called from checkPending() + if bytes.Contains(debug.Stack(), []byte("checkPending(")) { + atomic.AddInt64(&s.count, 1) + } + s.StreamStore.FastState(state) +} + +func TestJetStreamBackOffCheckPending(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.delete() + + // Plug or store to see how many times we invoke FastState, which is done in checkPending + mset.mu.Lock() + st := &checkFastState{StreamStore: mset.store} + mset.store = st + mset.mu.Unlock() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + sendStreamMsg(t, nc, "foo", "Hello World!") + + sub, _ := nc.SubscribeSync(nats.NewInbox()) + defer sub.Unsubscribe() + nc.Flush() + + o, err := mset.addConsumer(&ConsumerConfig{ + DeliverSubject: sub.Subject, + AckPolicy: AckExplicit, + MaxDeliver: 1000, + BackOff: []time.Duration{50 * time.Millisecond, 250 * time.Millisecond, time.Second}, + }) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer o.delete() + + // Check the first delivery and the following 2 redeliveries + start := time.Now() + natsNexMsg(t, sub, time.Second) + if dur := time.Since(start); dur >= 50*time.Millisecond { + t.Fatalf("Expected first delivery to be fast, took: %v", dur) + } + start = time.Now() + natsNexMsg(t, sub, time.Second) + if dur := time.Since(start); dur < 25*time.Millisecond || dur > 75*time.Millisecond { + t.Fatalf("Expected first redelivery to be ~50ms, took: %v", dur) + } + start = time.Now() + natsNexMsg(t, sub, time.Second) + if dur := time.Since(start); dur < 200*time.Millisecond || dur > 300*time.Millisecond { + t.Fatalf("Expected first redelivery to be ~250ms, took: %v", dur) + } + // There was a bug that would cause checkPending to be invoked based on the + // ackWait (which in this case would be the first value of BackOff, which + // is 50ms). So we would call checkPending() too many times. + time.Sleep(500 * time.Millisecond) + // Check now, it should have been invoked twice. + if n := atomic.LoadInt64(&st.count); n != 2 { + t.Fatalf("Expected checkPending to be invoked 2 times, was %v", n) + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////