diff --git a/server/consumer.go b/server/consumer.go index 6c03e8c0..e6aa00b4 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2353,7 +2353,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { // On error either wait or return. if err != nil { - if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending { + if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache { goto waitForMsgs } else { o.mu.Unlock() diff --git a/server/filestore.go b/server/filestore.go index 1cdc93b4..be693975 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -231,6 +231,9 @@ const ( FileStoreMinBlkSize = 32 * 1000 // 32kib // FileStoreMaxBlkSize is maximum size we will do for a blk size. FileStoreMaxBlkSize = maxBlockSize + + // Check for bad record length value due to corrupt data. + rlBadThresh = 32 * 1024 * 1024 ) func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { @@ -478,8 +481,11 @@ func (fs *fileStore) writeStreamMeta() error { return nil } -const msgHdrSize = 22 -const checksumSize = 8 +const ( + msgHdrSize = 22 + checksumSize = 8 + emptyRecordLen = msgHdrSize + checksumSize +) // This is the max room needed for index header. const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize @@ -738,21 +744,20 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { } for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { - if index+msgHdrSize >= lbuf { + if index+msgHdrSize > lbuf { truncate(index) return gatherLost(lbuf - index), nil } hdr := buf[index : index+msgHdrSize] - rl := le.Uint32(hdr[0:]) - slen := le.Uint16(hdr[20:]) + rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:]) hasHeaders := rl&hbit != 0 // Clear any headers bit that could be set. rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) { + if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh { truncate(index) return gatherLost(lbuf - index), errBadMsg } @@ -779,9 +784,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { // at the head. So the first.seq will be already set here. If this is larger // replace what we have with this seq. if firstNeedsSet && seq > mb.first.seq { - firstNeedsSet = false - mb.first.seq = seq - mb.first.ts = ts + firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts } var deleted bool @@ -813,9 +816,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { } if firstNeedsSet { - firstNeedsSet = false - mb.first.seq = seq - mb.first.ts = ts + firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts } mb.msgs++ @@ -1387,7 +1388,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { lmb.lwts = lwts // We could check for a certain time since last load, but to be safe just reuse if no loads at all. if llts == 0 && (lmb.cache == nil || lmb.cache.buf == nil) { - rbuf = buf + rbuf = buf[:0] } } lmb.mu.Unlock() @@ -1849,11 +1850,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error qch, fch = mb.qch, mb.fch } cb := fs.scb - mb.mu.Unlock() if secure { - mb.flushPendingMsgs() + if ld, _ := mb.flushPendingMsgsLocked(); ld != nil { + fs.rebuildStateLocked(ld) + } } + mb.mu.Unlock() // Kick outside of lock. if shouldWriteIndex { @@ -1928,7 +1931,7 @@ func (mb *msgBlock) compact() { var smh [msgHdrSize]byte for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { - if index+msgHdrSize >= lbuf { + if index+msgHdrSize > lbuf { return } hdr := buf[index : index+msgHdrSize] @@ -1937,7 +1940,7 @@ func (mb *msgBlock) compact() { rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 || index+rl > lbuf { + if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh || index+rl > lbuf { return } // Only need to process non-deleted messages. @@ -2387,7 +2390,7 @@ func (mb *msgBlock) tryForceExpireCache() { mb.tryForceExpireCacheLocked() } -// We will attempt to force expire this be temp clearing the last load time. +// We will attempt to force expire this by temporarily clearing the last load time. func (mb *msgBlock) tryForceExpireCacheLocked() { llts := mb.llts mb.llts = 0 @@ -2395,6 +2398,7 @@ func (mb *msgBlock) tryForceExpireCacheLocked() { mb.llts = llts } +// Lock should be held. func (mb *msgBlock) expireCacheLocked() { if mb.cache == nil { if mb.ctmr != nil { @@ -2491,7 +2495,13 @@ func (fs *fileStore) expireMsgs() { func (fs *fileStore) checkAndFlushAllBlocks() { for _, mb := range fs.blks { if mb.pendingWriteSize() > 0 { - mb.flushPendingMsgs() + // Since fs lock is held need to pull this apart in case we need to rebuild state. + mb.mu.Lock() + ld, _ := mb.flushPendingMsgsLocked() + mb.mu.Unlock() + if ld != nil { + fs.rebuildStateLocked(ld) + } } if mb.indexNeedsUpdate() { mb.writeIndexInfo() @@ -2541,6 +2551,8 @@ func (mb *msgBlock) enableForWriting(fip bool) error { // filestore lock will be held. func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error { mb.mu.Lock() + defer mb.mu.Unlock() + // Make sure we have a cache setup. if mb.cache == nil { mb.setupWriteCache(nil) @@ -2629,15 +2641,18 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte } fch, werr := mb.fch, mb.werr - mb.mu.Unlock() // If we should be flushing, or had a write error, do so here. if flush || werr != nil { - if err := mb.flushPendingMsgs(); err != nil { + ld, err := mb.flushPendingMsgsLocked() + if ld != nil && mb.fs != nil { + mb.fs.rebuildStateLocked(ld) + } + if err != nil { return err } if writeIndex { - if err := mb.writeIndexInfo(); err != nil { + if err := mb.writeIndexInfoLocked(); err != nil { return err } } @@ -2873,13 +2888,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { lbuf := uint32(len(buf)) for index < lbuf { - if index+msgHdrSize >= lbuf { + if index+msgHdrSize > lbuf { return errCorruptState } hdr := buf[index : index+msgHdrSize] - rl := le.Uint32(hdr[0:]) - seq := le.Uint64(hdr[4:]) - slen := le.Uint16(hdr[20:]) + rl, seq, slen := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]), le.Uint16(hdr[20:]) // Clear any headers bit that could be set. rl &^= hbit @@ -2913,21 +2926,27 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // flushPendingMsgs writes out any messages for this message block. func (mb *msgBlock) flushPendingMsgs() error { - // Signals us that we need to rebuild filestore state, but after we release our own lock. + mb.mu.Lock() + fsLostData, err := mb.flushPendingMsgsLocked() + fs := mb.fs + mb.mu.Unlock() + + // Signals us that we need to rebuild filestore state. + if fsLostData != nil && fs != nil { + // Rebuild fs state too. + fs.rebuildState(fsLostData) + } + return err +} + +// flushPendingMsgsLocked writes out any messages for this message block. +// Lock should be held. +func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) { + // Signals us that we need to rebuild filestore state. var fsLostData *LostStreamData - mb.mu.Lock() - defer func() { - fs := mb.fs - mb.mu.Unlock() - if fsLostData != nil && fs != nil { - // Rebuild fs state too. - fs.rebuildState(fsLostData) - } - }() - if mb.cache == nil || mb.mfd == nil { - return nil + return nil, nil } buf, err := mb.bytesPending() @@ -2937,7 +2956,7 @@ func (mb *msgBlock) flushPendingMsgs() error { if err == errNoPending || err == errNoCache { err = nil } - return err + return nil, err } woff := int64(mb.cache.off + mb.cache.wp) @@ -2973,7 +2992,7 @@ func (mb *msgBlock) flushPendingMsgs() error { fsLostData = ld } } - return err + return fsLostData, err } // Partial write. if n != lbb { @@ -2992,7 +3011,7 @@ func (mb *msgBlock) flushPendingMsgs() error { // Cache may be gone. if mb.cache == nil || mb.mfd == nil { - return mb.werr + return fsLostData, mb.werr } // Check for additional writes while we were writing to the disk. @@ -3026,7 +3045,7 @@ func (mb *msgBlock) flushPendingMsgs() error { mb.cache.fseq = 0 } - return mb.werr + return fsLostData, mb.werr } // Lock should be held. @@ -3048,7 +3067,7 @@ func (mb *msgBlock) cacheAlreadyLoaded() bool { return false } numEntries := mb.msgs + uint64(len(mb.dmap)) + (mb.first.seq - mb.cache.fseq) - return numEntries == uint64(len(mb.cache.idx)) && len(mb.cache.buf) > 0 + return numEntries == uint64(len(mb.cache.idx)) } // Lock should be held. @@ -3070,6 +3089,8 @@ func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) { sz64 := info.Size() if int64(int(sz64)) == sz64 { sz = int(sz64) + } else { + return nil, errMsgBlkTooBig } } @@ -3111,17 +3132,20 @@ checkCache: // FIXME(dlc) - We could be smarter here. if buf, _ := mb.bytesPending(); len(buf) > 0 { - mb.mu.Unlock() - err := mb.flushPendingMsgs() - mb.mu.Lock() + ld, err := mb.flushPendingMsgsLocked() + if ld != nil && mb.fs != nil { + // We do not know if fs is locked or not at this point. + // This should be an exceptional condition so do so in Go routine. + go mb.fs.rebuildState(ld) + } if err != nil { return err } goto checkCache } - // Load in the whole block. We want to hold the mb lock here to avoid any changes to - // state. + // Load in the whole block. + // We want to hold the mb lock here to avoid any changes to state. buf, err := mb.loadBlock(nil) if err != nil { return err @@ -3195,6 +3219,7 @@ var ( errNoEncryption = errors.New("encryption not enabled") errBadKeySize = errors.New("encryption bad key size") errNoMsgBlk = errors.New("no message block") + errMsgBlkTooBig = errors.New("message block size exceeded int capacity") ) // Used for marking messages that have had their checksums checked. @@ -3279,8 +3304,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) { seq = fs.state.FirstSeq } // Make sure to snapshot here. - lseq := fs.state.LastSeq - mb, lmb := fs.selectMsgBlock(seq), fs.lmb + mb, lmb, lseq := fs.selectMsgBlock(seq), fs.lmb, fs.state.LastSeq fs.mu.RUnlock() if mb == nil { @@ -3307,7 +3331,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) { // Internal function to return msg parts from a raw buffer. func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, []byte, uint64, int64, error) { - if len(buf) < msgHdrSize { + if len(buf) < emptyRecordLen { return _EMPTY_, nil, nil, 0, 0, errBadMsg } var le = binary.LittleEndian @@ -3399,7 +3423,7 @@ func (fs *fileStore) FastState(state *StreamState) { state.LastSeq = fs.state.LastSeq state.LastTime = fs.state.LastTime if state.LastSeq > state.FirstSeq { - state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1) + state.NumDeleted = int((state.LastSeq - state.FirstSeq + 1) - state.Msgs) } state.Consumers = len(fs.cfs) fs.mu.RUnlock() @@ -3449,8 +3473,6 @@ func (fs *fileStore) Utilization() (total, reported uint64, err error) { return total, reported, nil } -const emptyRecordLen = 22 + 8 - func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 { if len(hdr) == 0 { // length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8) @@ -3487,6 +3509,14 @@ func (mb *msgBlock) indexNeedsUpdate() bool { // Write index info to the appropriate file. // Filestore lock should be held. func (mb *msgBlock) writeIndexInfo() error { + mb.mu.Lock() + defer mb.mu.Unlock() + return mb.writeIndexInfoLocked() +} + +// Write index info to the appropriate file. +// Filestore lock and mb lock should be held. +func (mb *msgBlock) writeIndexInfoLocked() error { // HEADER: magic version msgs bytes fseq fts lseq lts ndel checksum var hdr [indexHdrSize]byte @@ -3494,9 +3524,6 @@ func (mb *msgBlock) writeIndexInfo() error { hdr[0] = magic hdr[1] = version - mb.mu.Lock() - defer mb.mu.Unlock() - n := hdrLen n += binary.PutUvarint(hdr[n:], mb.msgs) n += binary.PutUvarint(hdr[n:], mb.bytes)