From 3a14a984fcdea4e1d37189ee889fd1f361809830 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 28 Oct 2021 08:22:30 -0700 Subject: [PATCH 1/2] Fix for a bug that did not properly decode redelivered state for consumers from a filestore. This also caused state abnormalities in a user's setup so added code to clean up bad state as needed. Signed-off-by: Derek Collison --- server/consumer.go | 56 +++++++++++++++++++++++++++++++++++++--- server/filestore.go | 2 ++ server/filestore_test.go | 20 ++++++++++++++ 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 53872c53..5d0cd1bd 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1408,7 +1408,27 @@ func (o *consumer) ackWait(next time.Duration) time.Duration { return o.cfg.AckWait + ackWaitDelay } +// Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check. +func (o *consumer) checkRedelivered() { + var lseq uint64 + if mset := o.mset; mset != nil { + lseq = mset.lastSeq() + } + var shouldUpdateState bool + for sseq := range o.rdc { + if sseq < o.asflr || sseq > lseq { + delete(o.rdc, sseq) + o.removeFromRedeliverQueue(sseq) + shouldUpdateState = true + } + } + if shouldUpdateState { + o.writeStoreStateUnlocked() + } +} + // This will restore the state from disk. +// Lock should be held. func (o *consumer) readStoredState() error { if o.store == nil { return nil @@ -1416,6 +1436,9 @@ func (o *consumer) readStoredState() error { state, err := o.store.State() if err == nil && state != nil { o.applyState(state) + if len(o.rdc) > 0 { + o.checkRedelivered() + } } return err } @@ -1435,7 +1458,10 @@ func (o *consumer) applyState(state *ConsumerState) { // Setup tracking timer if we have restored pending. if len(o.pending) > 0 && o.ptmr == nil { - o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending) + // This is on startup or leader change. We want to check pending + // sooner in case there are inconsistencies etc. Pick between 1-5 secs. + delay := time.Second + time.Duration(rand.Int63n(4000))*time.Millisecond + o.ptmr = time.AfterFunc(delay, o.checkPending) } } @@ -1462,7 +1488,12 @@ func (o *consumer) setStoreState(state *ConsumerState) error { func (o *consumer) writeStoreState() error { o.mu.Lock() defer o.mu.Unlock() + return o.writeStoreStateUnlocked() +} +// Update our state to the store. +// Lock should be held. +func (o *consumer) writeStoreStateUnlocked() error { if o.store == nil { return nil } @@ -2583,19 +2614,32 @@ func (o *consumer) checkPending() { if mset == nil { return } + + now := time.Now().UnixNano() ttl := int64(o.cfg.AckWait) next := int64(o.ackWait(0)) - now := time.Now().UnixNano() + + var shouldUpdateState bool + var state StreamState + mset.store.FastState(&state) + fseq := state.FirstSeq // Since we can update timestamps, we have to review all pending. // We may want to unlock here or warn if list is big. var expired []uint64 for seq, p := range o.pending { + // Check if these are no longer valid. + if seq < fseq { + delete(o.pending, seq) + delete(o.rdc, seq) + o.removeFromRedeliverQueue(seq) + shouldUpdateState = true + continue + } elapsed := now - p.Timestamp if elapsed >= ttl { if !o.onRedeliverQueue(seq) { expired = append(expired, seq) - o.signalNewMessages() } } else if ttl-elapsed < next { // Update when we should fire next. @@ -2615,6 +2659,7 @@ func (o *consumer) checkPending() { p.Timestamp += off } } + o.signalNewMessages() } if len(o.pending) > 0 { @@ -2623,6 +2668,11 @@ func (o *consumer) checkPending() { o.ptmr.Stop() o.ptmr = nil } + + // Update our state if needed. + if shouldUpdateState { + o.writeStoreStateUnlocked() + } } // SeqFromReply will extract a sequence number from a reply subject. diff --git a/server/filestore.go b/server/filestore.go index 45023111..b81a8789 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5346,6 +5346,8 @@ func decodeConsumerState(buf []byte) (*ConsumerState, error) { state.Redelivered = make(map[uint64]uint64, numRedelivered) for i := 0; i < int(numRedelivered); i++ { if seq, n := readSeq(), readCount(); seq > 0 && n > 0 { + // Adjust seq back. + seq += state.AckFloor.Stream state.Redelivered[seq] = n } } diff --git a/server/filestore_test.go b/server/filestore_test.go index e9d8914c..ea2df053 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1992,6 +1992,26 @@ func TestFileStoreConsumer(t *testing.T) { updateAndCheck() } +func TestFileStoreConsumerEncodeDecodeRedelivered(t *testing.T) { + state := &ConsumerState{} + + state.Delivered.Consumer = 100 + state.Delivered.Stream = 100 + state.AckFloor.Consumer = 50 + state.AckFloor.Stream = 50 + + state.Redelivered = map[uint64]uint64{122: 3, 144: 8} + buf := encodeConsumerState(state) + + rstate, err := decodeConsumerState(buf) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(state, rstate) { + t.Fatalf("States do not match: %+v vs %+v", state, rstate) + } +} + func TestFileStoreWriteFailures(t *testing.T) { // This test should be run inside an environment where this directory // has a limited size. From 003b6996f10817b7c6d50d0068f6a8a471174944 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 28 Oct 2021 08:54:32 -0700 Subject: [PATCH 2/2] If AckWait less then restart check interval use AckWait Signed-off-by: Derek Collison --- server/consumer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/consumer.go b/server/consumer.go index 5d0cd1bd..1d3ee4d1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1461,6 +1461,10 @@ func (o *consumer) applyState(state *ConsumerState) { // This is on startup or leader change. We want to check pending // sooner in case there are inconsistencies etc. Pick between 1-5 secs. delay := time.Second + time.Duration(rand.Int63n(4000))*time.Millisecond + // If normal is lower than this just use that. + if o.cfg.AckWait < delay { + delay = o.ackWait(0) + } o.ptmr = time.AfterFunc(delay, o.checkPending) } }