Merge pull request #2652 from nats-io/consumer-rdc

Fix for a bug that did not properly decode redelivered state for consumers from a filestore.
This commit is contained in:
Derek Collison
2021-10-28 11:28:21 -07:00
committed by GitHub
3 changed files with 79 additions and 3 deletions

View File

@@ -1408,7 +1408,27 @@ func (o *consumer) ackWait(next time.Duration) time.Duration {
return o.cfg.AckWait + ackWaitDelay
}
// Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check.
func (o *consumer) checkRedelivered() {
var lseq uint64
if mset := o.mset; mset != nil {
lseq = mset.lastSeq()
}
var shouldUpdateState bool
for sseq := range o.rdc {
if sseq < o.asflr || sseq > lseq {
delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
shouldUpdateState = true
}
}
if shouldUpdateState {
o.writeStoreStateUnlocked()
}
}
// This will restore the state from disk.
// Lock should be held.
func (o *consumer) readStoredState() error {
if o.store == nil {
return nil
@@ -1416,6 +1436,9 @@ func (o *consumer) readStoredState() error {
state, err := o.store.State()
if err == nil && state != nil {
o.applyState(state)
if len(o.rdc) > 0 {
o.checkRedelivered()
}
}
return err
}
@@ -1435,7 +1458,14 @@ func (o *consumer) applyState(state *ConsumerState) {
// Setup tracking timer if we have restored pending.
if len(o.pending) > 0 && o.ptmr == nil {
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)
// This is on startup or leader change. We want to check pending
// sooner in case there are inconsistencies etc. Pick between 1-5 secs.
delay := time.Second + time.Duration(rand.Int63n(4000))*time.Millisecond
// If normal is lower than this just use that.
if o.cfg.AckWait < delay {
delay = o.ackWait(0)
}
o.ptmr = time.AfterFunc(delay, o.checkPending)
}
}
@@ -1462,7 +1492,12 @@ func (o *consumer) setStoreState(state *ConsumerState) error {
func (o *consumer) writeStoreState() error {
o.mu.Lock()
defer o.mu.Unlock()
return o.writeStoreStateUnlocked()
}
// Update our state to the store.
// Lock should be held.
func (o *consumer) writeStoreStateUnlocked() error {
if o.store == nil {
return nil
}
@@ -2583,19 +2618,32 @@ func (o *consumer) checkPending() {
if mset == nil {
return
}
now := time.Now().UnixNano()
ttl := int64(o.cfg.AckWait)
next := int64(o.ackWait(0))
now := time.Now().UnixNano()
var shouldUpdateState bool
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq
// Since we can update timestamps, we have to review all pending.
// We may want to unlock here or warn if list is big.
var expired []uint64
for seq, p := range o.pending {
// Check if these are no longer valid.
if seq < fseq {
delete(o.pending, seq)
delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
shouldUpdateState = true
continue
}
elapsed := now - p.Timestamp
if elapsed >= ttl {
if !o.onRedeliverQueue(seq) {
expired = append(expired, seq)
o.signalNewMessages()
}
} else if ttl-elapsed < next {
// Update when we should fire next.
@@ -2615,6 +2663,7 @@ func (o *consumer) checkPending() {
p.Timestamp += off
}
}
o.signalNewMessages()
}
if len(o.pending) > 0 {
@@ -2623,6 +2672,11 @@ func (o *consumer) checkPending() {
o.ptmr.Stop()
o.ptmr = nil
}
// Update our state if needed.
if shouldUpdateState {
o.writeStoreStateUnlocked()
}
}
// SeqFromReply will extract a sequence number from a reply subject.

View File

@@ -5346,6 +5346,8 @@ func decodeConsumerState(buf []byte) (*ConsumerState, error) {
state.Redelivered = make(map[uint64]uint64, numRedelivered)
for i := 0; i < int(numRedelivered); i++ {
if seq, n := readSeq(), readCount(); seq > 0 && n > 0 {
// Adjust seq back.
seq += state.AckFloor.Stream
state.Redelivered[seq] = n
}
}

View File

@@ -1992,6 +1992,26 @@ func TestFileStoreConsumer(t *testing.T) {
updateAndCheck()
}
func TestFileStoreConsumerEncodeDecodeRedelivered(t *testing.T) {
state := &ConsumerState{}
state.Delivered.Consumer = 100
state.Delivered.Stream = 100
state.AckFloor.Consumer = 50
state.AckFloor.Stream = 50
state.Redelivered = map[uint64]uint64{122: 3, 144: 8}
buf := encodeConsumerState(state)
rstate, err := decodeConsumerState(buf)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(state, rstate) {
t.Fatalf("States do not match: %+v vs %+v", state, rstate)
}
}
func TestFileStoreWriteFailures(t *testing.T) {
// This test should be run inside an environment where this directory
// has a limited size.