From 3a39786972ac6f96442005ed221bcbb3b4006c07 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 1 Sep 2023 08:48:28 -0700 Subject: [PATCH] When we fail to deliver a message for a consumer, either through didNotDeliver() or LoadMsg() failure re-adjust delivered count and waitingRequest accounting. Signed-off-by: Derek Collison --- server/consumer.go | 46 ++++++++++++++++++- server/filestore_test.go | 96 ++++++++++++++++++++++++++++++++++++++++ server/stream.go | 2 +- 3 files changed, 141 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ea6fbbe8..1cba6883 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3030,6 +3030,22 @@ func (o *consumer) incDeliveryCount(sseq uint64) uint64 { return o.rdc[sseq] + 1 } +// Used if we have to adjust on failed delivery or bad lookups. +// Those failed attempts should not increase deliver count. +// Lock should be held. +func (o *consumer) decDeliveryCount(sseq uint64) { + if o.rdc == nil { + return + } + if dc, ok := o.rdc[sseq]; ok { + if dc == 1 { + delete(o.rdc, sseq) + } else { + o.rdc[sseq] -= 1 + } + } +} + // send a delivery exceeded advisory. func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) { e := JSConsumerDeliveryExceededAdvisory{ @@ -3104,6 +3120,8 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { if sm == nil || err != nil { pmsg.returnToPool() pmsg, dc = nil, 0 + // Adjust back deliver count. + o.decDeliveryCount(seq) } return pmsg, dc, err } @@ -3205,6 +3223,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { interest = true } } + // If interest, update batch pending requests counter and update fexp timer. if interest { brp += wr.n @@ -3508,8 +3527,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { if err == ErrStoreEOF { o.checkNumPendingOnEOF() } - if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache { + if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending { goto waitForMsgs + } else if err == errPartialCache { + s.Warnf("Unexpected partial cache error looking up message for consumer") + goto waitForMsgs + } else { s.Errorf("Received an error looking up message for consumer: %v", err) goto waitForMsgs @@ -3906,20 +3929,39 @@ func (o *consumer) trackPending(sseq, dseq uint64) { } } +// Credit back a failed delivery. +// lock should be held. +func (o *consumer) creditWaitingRequest(reply string) { + for i, rp := 0, o.waiting.rp; i < o.waiting.n; i++ { + if wr := o.waiting.reqs[rp]; wr != nil { + if wr.reply == reply { + wr.n++ + wr.d-- + return + } + } + rp = (rp + 1) % cap(o.waiting.reqs) + } +} + // didNotDeliver is called when a delivery for a consumer message failed. // Depending on our state, we will process the failure. -func (o *consumer) didNotDeliver(seq uint64) { +func (o *consumer) didNotDeliver(seq uint64, subj string) { o.mu.Lock() mset := o.mset if mset == nil { o.mu.Unlock() return } + // Adjust back deliver count. + o.decDeliveryCount(seq) + var checkDeliveryInterest bool if o.isPushMode() { o.active = false checkDeliveryInterest = true } else if o.pending != nil { + o.creditWaitingRequest(subj) // pull mode and we have pending. if _, ok := o.pending[seq]; ok { // We found this messsage on pending, we need diff --git a/server/filestore_test.go b/server/filestore_test.go index 0c4fd4c9..a1c35603 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5727,3 +5727,99 @@ func TestFileStoreTombstoneBackwardCompatibility(t *testing.T) { checkPurgeState() } + +// Test that loads from lmb under lots of writes do not return errPartialCache. +func TestFileStoreErrPartialLoad(t *testing.T) { + fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + put := func(num int) { + for i := 0; i < num; i++ { + fs.StoreMsg("Z", nil, []byte("ZZZZZZZZZZZZZ")) + } + } + + put(100) + + // Dump cache of lmb. + clearCache := func() { + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + lmb.mu.Lock() + lmb.clearCache() + lmb.mu.Unlock() + } + clearCache() + + qch := make(chan struct{}) + defer close(qch) + + for i := 0; i < 10; i++ { + go func() { + for { + select { + case <-qch: + return + default: + put(5) + } + } + }() + } + + time.Sleep(100 * time.Millisecond) + + var smv StoreMsg + for i := 0; i < 10_000; i++ { + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + lmb.mu.Lock() + first, last := fs.lmb.first.seq, fs.lmb.last.seq + if i%100 == 0 { + lmb.clearCache() + } + lmb.mu.Unlock() + + if spread := int(last - first); spread > 0 { + seq := first + uint64(rand.Intn(spread)) + _, err = fs.LoadMsg(seq, &smv) + require_NoError(t, err) + } + } +} + +func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 500}, + StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + // This yields an internal record length of 50 bytes. So 10 msgs per blk. + msgLen := 19 + msg := bytes.Repeat([]byte("A"), msgLen) + + // Load up half the block. + for _, subj := range []string{"A", "B", "C", "D", "E"} { + fs.StoreMsg(subj, nil, msg) + } + + // Now simulate the sync timer closing the last block. + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + require_True(t, lmb != nil) + + lmb.mu.Lock() + lmb.expireCacheLocked() + lmb.dirtyCloseWithRemove(false) + lmb.mu.Unlock() + + fs.StoreMsg("Z", nil, msg) + _, err = fs.LoadMsg(1, nil) + require_NoError(t, err) +} diff --git a/server/stream.go b/server/stream.go index a2d86f64..fb792845 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4455,7 +4455,7 @@ func (mset *stream) internalLoop() { // Check to see if this is a delivery for a consumer and // we failed to deliver the message. If so alert the consumer. if pm.o != nil && pm.seq > 0 && !didDeliver { - pm.o.didNotDeliver(pm.seq) + pm.o.didNotDeliver(pm.seq, pm.dsubj) } pm.returnToPool() }