Merge pull request #2761 from nats-io/fs_partial_err

Fix for when consumer would stop working due to errPartialCache returned from fileStore.
This commit is contained in:
Derek Collison
2021-12-27 12:03:31 -08:00
committed by GitHub
3 changed files with 154 additions and 83 deletions

View File

@@ -2353,7 +2353,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// On error either wait or return.
if err != nil {
if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending {
if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
goto waitForMsgs
} else {
o.mu.Unlock()

View File

@@ -231,6 +231,9 @@ const (
FileStoreMinBlkSize = 32 * 1000 // 32kib
// FileStoreMaxBlkSize is maximum size we will do for a blk size.
FileStoreMaxBlkSize = maxBlockSize
// Check for bad record length value due to corrupt data.
rlBadThresh = 32 * 1024 * 1024
)
func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
@@ -478,8 +481,11 @@ func (fs *fileStore) writeStreamMeta() error {
return nil
}
const msgHdrSize = 22
const checksumSize = 8
const (
msgHdrSize = 22
checksumSize = 8
emptyRecordLen = msgHdrSize + checksumSize
)
// This is the max room needed for index header.
const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize
@@ -738,21 +744,20 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
}
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize >= lbuf {
if index+msgHdrSize > lbuf {
truncate(index)
return gatherLost(lbuf - index), nil
}
hdr := buf[index : index+msgHdrSize]
rl := le.Uint32(hdr[0:])
slen := le.Uint16(hdr[20:])
rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:])
hasHeaders := rl&hbit != 0
// Clear any headers bit that could be set.
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) {
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
@@ -779,9 +784,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
// at the head. So the first.seq will be already set here. If this is larger
// replace what we have with this seq.
if firstNeedsSet && seq > mb.first.seq {
firstNeedsSet = false
mb.first.seq = seq
mb.first.ts = ts
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
var deleted bool
@@ -813,9 +816,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
}
if firstNeedsSet {
firstNeedsSet = false
mb.first.seq = seq
mb.first.ts = ts
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
mb.msgs++
@@ -1065,7 +1066,7 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
// Linear search, hence the dumb part..
ts := t.UnixNano()
for seq := fseq; seq <= lseq; seq++ {
sm, _ := mb.fetchMsg(seq)
sm, _, _ := mb.fetchMsg(seq)
if sm != nil && sm.ts >= ts {
return sm.seq
}
@@ -1387,7 +1388,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
lmb.lwts = lwts
// We could check for a certain time since last load, but to be safe just reuse if no loads at all.
if llts == 0 && (lmb.cache == nil || lmb.cache.buf == nil) {
rbuf = buf
rbuf = buf[:0]
}
}
lmb.mu.Unlock()
@@ -1838,7 +1839,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
}
mb.dmap[seq] = struct{}{}
// Check if <25% utilization and minimum size met.
if notLast && mb.rbytes > compactMinimum && mb.rbytes>>2 > mb.bytes {
mb.compact()
}
@@ -1850,11 +1850,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
qch, fch = mb.qch, mb.fch
}
cb := fs.scb
mb.mu.Unlock()
if secure {
mb.flushPendingMsgs()
if ld, _ := mb.flushPendingMsgsLocked(); ld != nil {
fs.rebuildStateLocked(ld)
}
}
mb.mu.Unlock()
// Kick outside of lock.
if shouldWriteIndex {
@@ -1929,7 +1931,7 @@ func (mb *msgBlock) compact() {
var smh [msgHdrSize]byte
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize >= lbuf {
if index+msgHdrSize > lbuf {
return
}
hdr := buf[index : index+msgHdrSize]
@@ -1938,7 +1940,7 @@ func (mb *msgBlock) compact() {
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 || index+rl > lbuf {
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh || index+rl > lbuf {
return
}
// Only need to process non-deleted messages.
@@ -2012,8 +2014,8 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
return 0, 0, false, errPartialCache
}
bi := mb.cache.idx[slot]
ri := (bi &^ hbit)
hashChecked := (bi & hbit) != 0
ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0
// Determine record length
var rl uint32
if len(mb.cache.idx) > slot+1 {
@@ -2301,7 +2303,7 @@ func (mb *msgBlock) selectNextFirst() {
if sm == nil {
// Slow path, need to unlock.
mb.mu.Unlock()
sm, _ = mb.fetchMsg(seq)
sm, _, _ = mb.fetchMsg(seq)
mb.mu.Lock()
}
if sm != nil {
@@ -2388,7 +2390,7 @@ func (mb *msgBlock) tryForceExpireCache() {
mb.tryForceExpireCacheLocked()
}
// We will attempt to force expire this be temp clearing the last load time.
// We will attempt to force expire this by temporarily clearing the last load time.
func (mb *msgBlock) tryForceExpireCacheLocked() {
llts := mb.llts
mb.llts = 0
@@ -2396,6 +2398,7 @@ func (mb *msgBlock) tryForceExpireCacheLocked() {
mb.llts = llts
}
// Lock should be held.
func (mb *msgBlock) expireCacheLocked() {
if mb.cache == nil {
if mb.ctmr != nil {
@@ -2492,7 +2495,13 @@ func (fs *fileStore) expireMsgs() {
func (fs *fileStore) checkAndFlushAllBlocks() {
for _, mb := range fs.blks {
if mb.pendingWriteSize() > 0 {
mb.flushPendingMsgs()
// Since fs lock is held need to pull this apart in case we need to rebuild state.
mb.mu.Lock()
ld, _ := mb.flushPendingMsgsLocked()
mb.mu.Unlock()
if ld != nil {
fs.rebuildStateLocked(ld)
}
}
if mb.indexNeedsUpdate() {
mb.writeIndexInfo()
@@ -2542,6 +2551,8 @@ func (mb *msgBlock) enableForWriting(fip bool) error {
// filestore lock will be held.
func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error {
mb.mu.Lock()
defer mb.mu.Unlock()
// Make sure we have a cache setup.
if mb.cache == nil {
mb.setupWriteCache(nil)
@@ -2630,15 +2641,18 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
}
fch, werr := mb.fch, mb.werr
mb.mu.Unlock()
// If we should be flushing, or had a write error, do so here.
if flush || werr != nil {
if err := mb.flushPendingMsgs(); err != nil {
ld, err := mb.flushPendingMsgsLocked()
if ld != nil && mb.fs != nil {
mb.fs.rebuildStateLocked(ld)
}
if err != nil {
return err
}
if writeIndex {
if err := mb.writeIndexInfo(); err != nil {
if err := mb.writeIndexInfoLocked(); err != nil {
return err
}
}
@@ -2801,10 +2815,26 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq {
return nil
}
// Starting index, defaults to beginning.
si := 0
// Max threshold before we probe for a starting block to start our linear search.
const maxl = 256
if nb := len(fs.blks); nb > maxl {
d := nb / 8
for _, i := range []int{d, 2 * d, 3 * d, 4 * d, 5 * d, 6 * d, 7 * d} {
mb := fs.blks[i]
if seq <= atomic.LoadUint64(&mb.last.seq) {
break
}
si = i
}
}
// blks are sorted in ascending order.
// TODO(dlc) - Can be smarter here, when lots of blks maybe use binary search.
// For now this is cache friendly for small to medium numbers of blks.
for _, mb := range fs.blks {
for i := si; i < len(fs.blks); i++ {
mb := fs.blks[i]
if seq <= atomic.LoadUint64(&mb.last.seq) {
return mb
}
@@ -2858,13 +2888,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
lbuf := uint32(len(buf))
for index < lbuf {
if index+msgHdrSize >= lbuf {
if index+msgHdrSize > lbuf {
return errCorruptState
}
hdr := buf[index : index+msgHdrSize]
rl := le.Uint32(hdr[0:])
seq := le.Uint64(hdr[4:])
slen := le.Uint16(hdr[20:])
rl, seq, slen := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]), le.Uint16(hdr[20:])
// Clear any headers bit that could be set.
rl &^= hbit
@@ -2898,21 +2926,27 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// flushPendingMsgs writes out any messages for this message block.
func (mb *msgBlock) flushPendingMsgs() error {
// Signals us that we need to rebuild filestore state, but after we release our own lock.
mb.mu.Lock()
fsLostData, err := mb.flushPendingMsgsLocked()
fs := mb.fs
mb.mu.Unlock()
// Signals us that we need to rebuild filestore state.
if fsLostData != nil && fs != nil {
// Rebuild fs state too.
fs.rebuildState(fsLostData)
}
return err
}
// flushPendingMsgsLocked writes out any messages for this message block.
// Lock should be held.
func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {
// Signals us that we need to rebuild filestore state.
var fsLostData *LostStreamData
mb.mu.Lock()
defer func() {
fs := mb.fs
mb.mu.Unlock()
if fsLostData != nil && fs != nil {
// Rebuild fs state too.
fs.rebuildState(fsLostData)
}
}()
if mb.cache == nil || mb.mfd == nil {
return nil
return nil, nil
}
buf, err := mb.bytesPending()
@@ -2922,7 +2956,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
if err == errNoPending || err == errNoCache {
err = nil
}
return err
return nil, err
}
woff := int64(mb.cache.off + mb.cache.wp)
@@ -2958,7 +2992,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
fsLostData = ld
}
}
return err
return fsLostData, err
}
// Partial write.
if n != lbb {
@@ -2977,7 +3011,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
// Cache may be gone.
if mb.cache == nil || mb.mfd == nil {
return mb.werr
return fsLostData, mb.werr
}
// Check for additional writes while we were writing to the disk.
@@ -3011,7 +3045,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
mb.cache.fseq = 0
}
return mb.werr
return fsLostData, mb.werr
}
// Lock should be held.
@@ -3033,7 +3067,7 @@ func (mb *msgBlock) cacheAlreadyLoaded() bool {
return false
}
numEntries := mb.msgs + uint64(len(mb.dmap)) + (mb.first.seq - mb.cache.fseq)
return numEntries == uint64(len(mb.cache.idx)) && len(mb.cache.buf) > 0
return numEntries == uint64(len(mb.cache.idx))
}
// Lock should be held.
@@ -3055,6 +3089,8 @@ func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
sz64 := info.Size()
if int64(int(sz64)) == sz64 {
sz = int(sz64)
} else {
return nil, errMsgBlkTooBig
}
}
@@ -3096,17 +3132,20 @@ checkCache:
// FIXME(dlc) - We could be smarter here.
if buf, _ := mb.bytesPending(); len(buf) > 0 {
mb.mu.Unlock()
err := mb.flushPendingMsgs()
mb.mu.Lock()
ld, err := mb.flushPendingMsgsLocked()
if ld != nil && mb.fs != nil {
// We do not know if fs is locked or not at this point.
// This should be an exceptional condition so do so in Go routine.
go mb.fs.rebuildState(ld)
}
if err != nil {
return err
}
goto checkCache
}
// Load in the whole block. We want to hold the mb lock here to avoid any changes to
// state.
// Load in the whole block.
// We want to hold the mb lock here to avoid any changes to state.
buf, err := mb.loadBlock(nil)
if err != nil {
return err
@@ -3151,16 +3190,21 @@ checkCache:
// Fetch a message from this block, possibly reading in and caching the messages.
// We assume the block was selected and is correct, so we do not do range checks.
func (mb *msgBlock) fetchMsg(seq uint64) (*fileStoredMsg, error) {
func (mb *msgBlock) fetchMsg(seq uint64) (*fileStoredMsg, bool, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil, err
return nil, false, err
}
}
return mb.cacheLookup(seq)
sm, err := mb.cacheLookup(seq)
if err != nil {
return nil, false, err
}
expireOk := seq == mb.last.seq && mb.llseq == seq-1
return sm, expireOk, err
}
var (
@@ -3175,6 +3219,7 @@ var (
errNoEncryption = errors.New("encryption not enabled")
errBadKeySize = errors.New("encryption bad key size")
errNoMsgBlk = errors.New("no message block")
errMsgBlkTooBig = errors.New("message block size exceeded int capacity")
)
// Used for marking messages that have had their checksums checked.
@@ -3259,8 +3304,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
seq = fs.state.FirstSeq
}
// Make sure to snapshot here.
lseq := fs.state.LastSeq
mb, lmb := fs.selectMsgBlock(seq), fs.lmb
mb, lmb, lseq := fs.selectMsgBlock(seq), fs.lmb, fs.state.LastSeq
fs.mu.RUnlock()
if mb == nil {
@@ -3271,23 +3315,14 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
return nil, err
}
// Check to see if we are the last seq for this message block and are doing
// a linear scan. If that is true and we are not the last message block we can
// try to expire the cache.
mb.mu.RLock()
shouldTryExpire := mb != lmb && seq == mb.last.seq && mb.llseq == seq-1
mb.mu.RUnlock()
// TODO(dlc) - older design had a check to prefetch when we knew we were
// loading in order and getting close to end of current mb. Should add
// something like it back in.
fsm, err := mb.fetchMsg(seq)
fsm, expireOk, err := mb.fetchMsg(seq)
if err != nil {
return nil, err
}
// We detected a linear scan and access to the last message.
if shouldTryExpire {
// If we are not the last message block we can try to expire the cache.
if mb != lmb && expireOk {
mb.tryForceExpireCache()
}
@@ -3296,7 +3331,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
// Internal function to return msg parts from a raw buffer.
func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, []byte, uint64, int64, error) {
if len(buf) < msgHdrSize {
if len(buf) < emptyRecordLen {
return _EMPTY_, nil, nil, 0, 0, errBadMsg
}
var le = binary.LittleEndian
@@ -3388,7 +3423,7 @@ func (fs *fileStore) FastState(state *StreamState) {
state.LastSeq = fs.state.LastSeq
state.LastTime = fs.state.LastTime
if state.LastSeq > state.FirstSeq {
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
state.NumDeleted = int((state.LastSeq - state.FirstSeq + 1) - state.Msgs)
}
state.Consumers = len(fs.cfs)
fs.mu.RUnlock()
@@ -3438,8 +3473,6 @@ func (fs *fileStore) Utilization() (total, reported uint64, err error) {
return total, reported, nil
}
const emptyRecordLen = 22 + 8
func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 {
if len(hdr) == 0 {
// length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)
@@ -3476,6 +3509,14 @@ func (mb *msgBlock) indexNeedsUpdate() bool {
// Write index info to the appropriate file.
// Filestore lock should be held.
func (mb *msgBlock) writeIndexInfo() error {
mb.mu.Lock()
defer mb.mu.Unlock()
return mb.writeIndexInfoLocked()
}
// Write index info to the appropriate file.
// Filestore lock and mb lock should be held.
func (mb *msgBlock) writeIndexInfoLocked() error {
// HEADER: magic version msgs bytes fseq fts lseq lts ndel checksum
var hdr [indexHdrSize]byte
@@ -3483,9 +3524,6 @@ func (mb *msgBlock) writeIndexInfo() error {
hdr[0] = magic
hdr[1] = version
mb.mu.Lock()
defer mb.mu.Unlock()
n := hdrLen
n += binary.PutUvarint(hdr[n:], mb.msgs)
n += binary.PutUvarint(hdr[n:], mb.bytes)
@@ -3988,7 +4026,7 @@ func (fs *fileStore) Truncate(seq uint64) error {
fs.mu.Unlock()
return ErrInvalidSequence
}
lsm, _ := nlmb.fetchMsg(seq)
lsm, _, _ := nlmb.fetchMsg(seq)
if lsm == nil {
fs.mu.Unlock()
return ErrInvalidSequence

View File

@@ -448,7 +448,7 @@ func TestFileStoreWriteExpireWrite(t *testing.T) {
for i := 1; i <= toSend*2; i++ {
subj, _, msg, _, err := fs.LoadMsg(uint64(i))
if err != nil {
t.Fatalf("Unexpected error looking up seq 11: %v", err)
t.Fatalf("Unexpected error looking up seq %d: %v", i, err)
}
str := "Hello World!"
if i > toSend {
@@ -3479,3 +3479,36 @@ func TestFileStoreRemoveLastWriteIndex(t *testing.T) {
t.Fatalf("Index file %q size is 0", fname)
}
}
// Test to optimize the selectMsgBlock with lots of blocks.
func TestFileStoreFetchPerf(t *testing.T) {
// Comment out to run.
t.SkipNow()
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 8192, AsyncFlush: true}, StreamConfig{Name: "TEST", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
// Will create 25k msg blocks.
n, subj, msg := 100_000, "zzz", bytes.Repeat([]byte("ABC"), 600)
for i := 0; i < n; i++ {
if _, _, err := fs.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Time how long it takes us to load all messages.
now := time.Now()
for i := 0; i < n; i++ {
_, _, _, _, err := fs.LoadMsg(uint64(i))
if err != nil {
t.Fatalf("Unexpected error looking up seq %d: %v", i, err)
}
}
fmt.Printf("Elapsed to load all messages is %v\n", time.Since(now))
}