mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Stabilize filstore to eliminate sporadic errPartialCache errors under certain situations. Related to #2732
The filestore would release a msgBlock lock while trying to load a cache block if it thought it needed to flush pending data. With async false, this should be very rare but was possible after careful inspection. I constructed an artificial test with sleeps throughout the filestore code to reproduce. It involved having 2 Go routines that were through and waiting on the last msg block, and another one that was writing. After the write, but before we flushed after releasing the lock we would also artificially sleep. This would lead to the second read seeing the cache load was already in progress and return no error. If the load was for a sequence before the current write sequence, and async was false, the cache fseq would be higher than what was requested. This would cause the errPartialCache to be returned. Once returned to the consumer level in loopAndGather, it would exit that Go routine and the consumer would cease to function. This change removed the unlock of a msgBlock to perform and flush, ensuring that two cacheLoads would not yield the errPartialCache. I also updated the consumer in the case this does happen in the future to not exit the loopAndGather Go routine. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2353,7 +2353,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
|
||||
// On error either wait or return.
|
||||
if err != nil {
|
||||
if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending {
|
||||
if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
|
||||
goto waitForMsgs
|
||||
} else {
|
||||
o.mu.Unlock()
|
||||
|
||||
@@ -231,6 +231,9 @@ const (
|
||||
FileStoreMinBlkSize = 32 * 1000 // 32kib
|
||||
// FileStoreMaxBlkSize is maximum size we will do for a blk size.
|
||||
FileStoreMaxBlkSize = maxBlockSize
|
||||
|
||||
// Check for bad record length value due to corrupt data.
|
||||
rlBadThresh = 32 * 1024 * 1024
|
||||
)
|
||||
|
||||
func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
|
||||
@@ -478,8 +481,11 @@ func (fs *fileStore) writeStreamMeta() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
const msgHdrSize = 22
|
||||
const checksumSize = 8
|
||||
const (
|
||||
msgHdrSize = 22
|
||||
checksumSize = 8
|
||||
emptyRecordLen = msgHdrSize + checksumSize
|
||||
)
|
||||
|
||||
// This is the max room needed for index header.
|
||||
const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize
|
||||
@@ -738,21 +744,20 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
|
||||
}
|
||||
|
||||
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
|
||||
if index+msgHdrSize >= lbuf {
|
||||
if index+msgHdrSize > lbuf {
|
||||
truncate(index)
|
||||
return gatherLost(lbuf - index), nil
|
||||
}
|
||||
|
||||
hdr := buf[index : index+msgHdrSize]
|
||||
rl := le.Uint32(hdr[0:])
|
||||
slen := le.Uint16(hdr[20:])
|
||||
rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:])
|
||||
|
||||
hasHeaders := rl&hbit != 0
|
||||
// Clear any headers bit that could be set.
|
||||
rl &^= hbit
|
||||
dlen := int(rl) - msgHdrSize
|
||||
// Do some quick sanity checks here.
|
||||
if dlen < 0 || int(slen) > dlen || dlen > int(rl) {
|
||||
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh {
|
||||
truncate(index)
|
||||
return gatherLost(lbuf - index), errBadMsg
|
||||
}
|
||||
@@ -779,9 +784,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
|
||||
// at the head. So the first.seq will be already set here. If this is larger
|
||||
// replace what we have with this seq.
|
||||
if firstNeedsSet && seq > mb.first.seq {
|
||||
firstNeedsSet = false
|
||||
mb.first.seq = seq
|
||||
mb.first.ts = ts
|
||||
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
|
||||
}
|
||||
|
||||
var deleted bool
|
||||
@@ -813,9 +816,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
|
||||
}
|
||||
|
||||
if firstNeedsSet {
|
||||
firstNeedsSet = false
|
||||
mb.first.seq = seq
|
||||
mb.first.ts = ts
|
||||
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
|
||||
}
|
||||
|
||||
mb.msgs++
|
||||
@@ -1387,7 +1388,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
|
||||
lmb.lwts = lwts
|
||||
// We could check for a certain time since last load, but to be safe just reuse if no loads at all.
|
||||
if llts == 0 && (lmb.cache == nil || lmb.cache.buf == nil) {
|
||||
rbuf = buf
|
||||
rbuf = buf[:0]
|
||||
}
|
||||
}
|
||||
lmb.mu.Unlock()
|
||||
@@ -1849,11 +1850,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
|
||||
qch, fch = mb.qch, mb.fch
|
||||
}
|
||||
cb := fs.scb
|
||||
mb.mu.Unlock()
|
||||
|
||||
if secure {
|
||||
mb.flushPendingMsgs()
|
||||
if ld, _ := mb.flushPendingMsgsLocked(); ld != nil {
|
||||
fs.rebuildStateLocked(ld)
|
||||
}
|
||||
}
|
||||
mb.mu.Unlock()
|
||||
|
||||
// Kick outside of lock.
|
||||
if shouldWriteIndex {
|
||||
@@ -1928,7 +1931,7 @@ func (mb *msgBlock) compact() {
|
||||
var smh [msgHdrSize]byte
|
||||
|
||||
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
|
||||
if index+msgHdrSize >= lbuf {
|
||||
if index+msgHdrSize > lbuf {
|
||||
return
|
||||
}
|
||||
hdr := buf[index : index+msgHdrSize]
|
||||
@@ -1937,7 +1940,7 @@ func (mb *msgBlock) compact() {
|
||||
rl &^= hbit
|
||||
dlen := int(rl) - msgHdrSize
|
||||
// Do some quick sanity checks here.
|
||||
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 || index+rl > lbuf {
|
||||
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh || index+rl > lbuf {
|
||||
return
|
||||
}
|
||||
// Only need to process non-deleted messages.
|
||||
@@ -2387,7 +2390,7 @@ func (mb *msgBlock) tryForceExpireCache() {
|
||||
mb.tryForceExpireCacheLocked()
|
||||
}
|
||||
|
||||
// We will attempt to force expire this be temp clearing the last load time.
|
||||
// We will attempt to force expire this by temporarily clearing the last load time.
|
||||
func (mb *msgBlock) tryForceExpireCacheLocked() {
|
||||
llts := mb.llts
|
||||
mb.llts = 0
|
||||
@@ -2395,6 +2398,7 @@ func (mb *msgBlock) tryForceExpireCacheLocked() {
|
||||
mb.llts = llts
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (mb *msgBlock) expireCacheLocked() {
|
||||
if mb.cache == nil {
|
||||
if mb.ctmr != nil {
|
||||
@@ -2491,7 +2495,13 @@ func (fs *fileStore) expireMsgs() {
|
||||
func (fs *fileStore) checkAndFlushAllBlocks() {
|
||||
for _, mb := range fs.blks {
|
||||
if mb.pendingWriteSize() > 0 {
|
||||
mb.flushPendingMsgs()
|
||||
// Since fs lock is held need to pull this apart in case we need to rebuild state.
|
||||
mb.mu.Lock()
|
||||
ld, _ := mb.flushPendingMsgsLocked()
|
||||
mb.mu.Unlock()
|
||||
if ld != nil {
|
||||
fs.rebuildStateLocked(ld)
|
||||
}
|
||||
}
|
||||
if mb.indexNeedsUpdate() {
|
||||
mb.writeIndexInfo()
|
||||
@@ -2541,6 +2551,8 @@ func (mb *msgBlock) enableForWriting(fip bool) error {
|
||||
// filestore lock will be held.
|
||||
func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error {
|
||||
mb.mu.Lock()
|
||||
defer mb.mu.Unlock()
|
||||
|
||||
// Make sure we have a cache setup.
|
||||
if mb.cache == nil {
|
||||
mb.setupWriteCache(nil)
|
||||
@@ -2629,15 +2641,18 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
|
||||
}
|
||||
|
||||
fch, werr := mb.fch, mb.werr
|
||||
mb.mu.Unlock()
|
||||
|
||||
// If we should be flushing, or had a write error, do so here.
|
||||
if flush || werr != nil {
|
||||
if err := mb.flushPendingMsgs(); err != nil {
|
||||
ld, err := mb.flushPendingMsgsLocked()
|
||||
if ld != nil && mb.fs != nil {
|
||||
mb.fs.rebuildStateLocked(ld)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if writeIndex {
|
||||
if err := mb.writeIndexInfo(); err != nil {
|
||||
if err := mb.writeIndexInfoLocked(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -2873,13 +2888,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
|
||||
lbuf := uint32(len(buf))
|
||||
|
||||
for index < lbuf {
|
||||
if index+msgHdrSize >= lbuf {
|
||||
if index+msgHdrSize > lbuf {
|
||||
return errCorruptState
|
||||
}
|
||||
hdr := buf[index : index+msgHdrSize]
|
||||
rl := le.Uint32(hdr[0:])
|
||||
seq := le.Uint64(hdr[4:])
|
||||
slen := le.Uint16(hdr[20:])
|
||||
rl, seq, slen := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]), le.Uint16(hdr[20:])
|
||||
|
||||
// Clear any headers bit that could be set.
|
||||
rl &^= hbit
|
||||
@@ -2913,21 +2926,27 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
|
||||
|
||||
// flushPendingMsgs writes out any messages for this message block.
|
||||
func (mb *msgBlock) flushPendingMsgs() error {
|
||||
// Signals us that we need to rebuild filestore state, but after we release our own lock.
|
||||
mb.mu.Lock()
|
||||
fsLostData, err := mb.flushPendingMsgsLocked()
|
||||
fs := mb.fs
|
||||
mb.mu.Unlock()
|
||||
|
||||
// Signals us that we need to rebuild filestore state.
|
||||
if fsLostData != nil && fs != nil {
|
||||
// Rebuild fs state too.
|
||||
fs.rebuildState(fsLostData)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// flushPendingMsgsLocked writes out any messages for this message block.
|
||||
// Lock should be held.
|
||||
func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {
|
||||
// Signals us that we need to rebuild filestore state.
|
||||
var fsLostData *LostStreamData
|
||||
|
||||
mb.mu.Lock()
|
||||
defer func() {
|
||||
fs := mb.fs
|
||||
mb.mu.Unlock()
|
||||
if fsLostData != nil && fs != nil {
|
||||
// Rebuild fs state too.
|
||||
fs.rebuildState(fsLostData)
|
||||
}
|
||||
}()
|
||||
|
||||
if mb.cache == nil || mb.mfd == nil {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
buf, err := mb.bytesPending()
|
||||
@@ -2937,7 +2956,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
|
||||
if err == errNoPending || err == errNoCache {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
woff := int64(mb.cache.off + mb.cache.wp)
|
||||
@@ -2973,7 +2992,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
|
||||
fsLostData = ld
|
||||
}
|
||||
}
|
||||
return err
|
||||
return fsLostData, err
|
||||
}
|
||||
// Partial write.
|
||||
if n != lbb {
|
||||
@@ -2992,7 +3011,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
|
||||
|
||||
// Cache may be gone.
|
||||
if mb.cache == nil || mb.mfd == nil {
|
||||
return mb.werr
|
||||
return fsLostData, mb.werr
|
||||
}
|
||||
|
||||
// Check for additional writes while we were writing to the disk.
|
||||
@@ -3026,7 +3045,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
|
||||
mb.cache.fseq = 0
|
||||
}
|
||||
|
||||
return mb.werr
|
||||
return fsLostData, mb.werr
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
@@ -3048,7 +3067,7 @@ func (mb *msgBlock) cacheAlreadyLoaded() bool {
|
||||
return false
|
||||
}
|
||||
numEntries := mb.msgs + uint64(len(mb.dmap)) + (mb.first.seq - mb.cache.fseq)
|
||||
return numEntries == uint64(len(mb.cache.idx)) && len(mb.cache.buf) > 0
|
||||
return numEntries == uint64(len(mb.cache.idx))
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
@@ -3070,6 +3089,8 @@ func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
|
||||
sz64 := info.Size()
|
||||
if int64(int(sz64)) == sz64 {
|
||||
sz = int(sz64)
|
||||
} else {
|
||||
return nil, errMsgBlkTooBig
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3111,17 +3132,20 @@ checkCache:
|
||||
|
||||
// FIXME(dlc) - We could be smarter here.
|
||||
if buf, _ := mb.bytesPending(); len(buf) > 0 {
|
||||
mb.mu.Unlock()
|
||||
err := mb.flushPendingMsgs()
|
||||
mb.mu.Lock()
|
||||
ld, err := mb.flushPendingMsgsLocked()
|
||||
if ld != nil && mb.fs != nil {
|
||||
// We do not know if fs is locked or not at this point.
|
||||
// This should be an exceptional condition so do so in Go routine.
|
||||
go mb.fs.rebuildState(ld)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
goto checkCache
|
||||
}
|
||||
|
||||
// Load in the whole block. We want to hold the mb lock here to avoid any changes to
|
||||
// state.
|
||||
// Load in the whole block.
|
||||
// We want to hold the mb lock here to avoid any changes to state.
|
||||
buf, err := mb.loadBlock(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -3195,6 +3219,7 @@ var (
|
||||
errNoEncryption = errors.New("encryption not enabled")
|
||||
errBadKeySize = errors.New("encryption bad key size")
|
||||
errNoMsgBlk = errors.New("no message block")
|
||||
errMsgBlkTooBig = errors.New("message block size exceeded int capacity")
|
||||
)
|
||||
|
||||
// Used for marking messages that have had their checksums checked.
|
||||
@@ -3279,8 +3304,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
|
||||
seq = fs.state.FirstSeq
|
||||
}
|
||||
// Make sure to snapshot here.
|
||||
lseq := fs.state.LastSeq
|
||||
mb, lmb := fs.selectMsgBlock(seq), fs.lmb
|
||||
mb, lmb, lseq := fs.selectMsgBlock(seq), fs.lmb, fs.state.LastSeq
|
||||
fs.mu.RUnlock()
|
||||
|
||||
if mb == nil {
|
||||
@@ -3307,7 +3331,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
|
||||
|
||||
// Internal function to return msg parts from a raw buffer.
|
||||
func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, []byte, uint64, int64, error) {
|
||||
if len(buf) < msgHdrSize {
|
||||
if len(buf) < emptyRecordLen {
|
||||
return _EMPTY_, nil, nil, 0, 0, errBadMsg
|
||||
}
|
||||
var le = binary.LittleEndian
|
||||
@@ -3399,7 +3423,7 @@ func (fs *fileStore) FastState(state *StreamState) {
|
||||
state.LastSeq = fs.state.LastSeq
|
||||
state.LastTime = fs.state.LastTime
|
||||
if state.LastSeq > state.FirstSeq {
|
||||
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
|
||||
state.NumDeleted = int((state.LastSeq - state.FirstSeq + 1) - state.Msgs)
|
||||
}
|
||||
state.Consumers = len(fs.cfs)
|
||||
fs.mu.RUnlock()
|
||||
@@ -3449,8 +3473,6 @@ func (fs *fileStore) Utilization() (total, reported uint64, err error) {
|
||||
return total, reported, nil
|
||||
}
|
||||
|
||||
const emptyRecordLen = 22 + 8
|
||||
|
||||
func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 {
|
||||
if len(hdr) == 0 {
|
||||
// length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)
|
||||
@@ -3487,6 +3509,14 @@ func (mb *msgBlock) indexNeedsUpdate() bool {
|
||||
// Write index info to the appropriate file.
|
||||
// Filestore lock should be held.
|
||||
func (mb *msgBlock) writeIndexInfo() error {
|
||||
mb.mu.Lock()
|
||||
defer mb.mu.Unlock()
|
||||
return mb.writeIndexInfoLocked()
|
||||
}
|
||||
|
||||
// Write index info to the appropriate file.
|
||||
// Filestore lock and mb lock should be held.
|
||||
func (mb *msgBlock) writeIndexInfoLocked() error {
|
||||
// HEADER: magic version msgs bytes fseq fts lseq lts ndel checksum
|
||||
var hdr [indexHdrSize]byte
|
||||
|
||||
@@ -3494,9 +3524,6 @@ func (mb *msgBlock) writeIndexInfo() error {
|
||||
hdr[0] = magic
|
||||
hdr[1] = version
|
||||
|
||||
mb.mu.Lock()
|
||||
defer mb.mu.Unlock()
|
||||
|
||||
n := hdrLen
|
||||
n += binary.PutUvarint(hdr[n:], mb.msgs)
|
||||
n += binary.PutUvarint(hdr[n:], mb.bytes)
|
||||
|
||||
Reference in New Issue
Block a user