mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2948 from nats-io/js_backoff_check_pending
JS: BackOff list caused too frequent checkPending() calls
This commit is contained in:
@@ -3244,6 +3244,11 @@ func (o *consumer) checkPending() {
|
||||
now := time.Now().UnixNano()
|
||||
ttl := int64(o.cfg.AckWait)
|
||||
next := int64(o.ackWait(0))
|
||||
// However, if there is backoff, initializes with the largest backoff.
|
||||
// It will be adjusted as needed.
|
||||
if l := len(o.cfg.BackOff); l > 0 {
|
||||
next = int64(o.cfg.BackOff[l-1])
|
||||
}
|
||||
|
||||
var shouldUpdateState bool
|
||||
var state StreamState
|
||||
@@ -3268,12 +3273,20 @@ func (o *consumer) checkPending() {
|
||||
continue
|
||||
}
|
||||
elapsed, deadline := now-p.Timestamp, ttl
|
||||
if len(o.cfg.BackOff) > 0 && o.rdc != nil {
|
||||
if len(o.cfg.BackOff) > 0 {
|
||||
// This is ok even if o.rdc is nil, we would get dc == 0, which is what we want.
|
||||
dc := int(o.rdc[seq])
|
||||
if dc >= len(o.cfg.BackOff) {
|
||||
// This will be the index for the next backoff, will set to last element if needed.
|
||||
nbi := dc + 1
|
||||
if dc+1 >= len(o.cfg.BackOff) {
|
||||
dc = len(o.cfg.BackOff) - 1
|
||||
nbi = dc
|
||||
}
|
||||
deadline = int64(o.cfg.BackOff[dc])
|
||||
// Set `next` to the next backoff (if smaller than current `next` value).
|
||||
if nextBackoff := int64(o.cfg.BackOff[nbi]); nextBackoff < next {
|
||||
next = nextBackoff
|
||||
}
|
||||
}
|
||||
if elapsed >= deadline {
|
||||
if !o.onRedeliverQueue(seq) {
|
||||
|
||||
@@ -10682,8 +10682,11 @@ func TestJetStreamClusterStreamTagPlacement(t *testing.T) {
|
||||
|
||||
reset := func(s *Server) {
|
||||
s.mu.Lock()
|
||||
s.sys.resetCh <- struct{}{}
|
||||
rch := s.sys.resetCh
|
||||
s.mu.Unlock()
|
||||
if rch != nil {
|
||||
rch <- struct{}{}
|
||||
}
|
||||
s.sendStatszUpdate()
|
||||
}
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -16227,6 +16228,84 @@ func TestJetStreamAddStreamWithFilestoreFailure(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type checkFastState struct {
|
||||
count int64
|
||||
StreamStore
|
||||
}
|
||||
|
||||
func (s *checkFastState) FastState(state *StreamState) {
|
||||
// Keep track only when called from checkPending()
|
||||
if bytes.Contains(debug.Stack(), []byte("checkPending(")) {
|
||||
atomic.AddInt64(&s.count, 1)
|
||||
}
|
||||
s.StreamStore.FastState(state)
|
||||
}
|
||||
|
||||
func TestJetStreamBackOffCheckPending(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
defer s.Shutdown()
|
||||
|
||||
mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding stream: %v", err)
|
||||
}
|
||||
defer mset.delete()
|
||||
|
||||
// Plug or store to see how many times we invoke FastState, which is done in checkPending
|
||||
mset.mu.Lock()
|
||||
st := &checkFastState{StreamStore: mset.store}
|
||||
mset.store = st
|
||||
mset.mu.Unlock()
|
||||
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
sendStreamMsg(t, nc, "foo", "Hello World!")
|
||||
|
||||
sub, _ := nc.SubscribeSync(nats.NewInbox())
|
||||
defer sub.Unsubscribe()
|
||||
nc.Flush()
|
||||
|
||||
o, err := mset.addConsumer(&ConsumerConfig{
|
||||
DeliverSubject: sub.Subject,
|
||||
AckPolicy: AckExplicit,
|
||||
MaxDeliver: 1000,
|
||||
BackOff: []time.Duration{50 * time.Millisecond, 250 * time.Millisecond, time.Second},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got %v", err)
|
||||
}
|
||||
defer o.delete()
|
||||
|
||||
// Check the first delivery and the following 2 redeliveries
|
||||
start := time.Now()
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
if dur := time.Since(start); dur >= 50*time.Millisecond {
|
||||
t.Fatalf("Expected first delivery to be fast, took: %v", dur)
|
||||
}
|
||||
start = time.Now()
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
if dur := time.Since(start); dur < 25*time.Millisecond || dur > 75*time.Millisecond {
|
||||
t.Fatalf("Expected first redelivery to be ~50ms, took: %v", dur)
|
||||
}
|
||||
start = time.Now()
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
if dur := time.Since(start); dur < 200*time.Millisecond || dur > 300*time.Millisecond {
|
||||
t.Fatalf("Expected first redelivery to be ~250ms, took: %v", dur)
|
||||
}
|
||||
// There was a bug that would cause checkPending to be invoked based on the
|
||||
// ackWait (which in this case would be the first value of BackOff, which
|
||||
// is 50ms). So we would call checkPending() too many times.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
// Check now, it should have been invoked twice.
|
||||
if n := atomic.LoadInt64(&st.count); n != 2 {
|
||||
t.Fatalf("Expected checkPending to be invoked 2 times, was %v", n)
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Simple JetStream Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user