diff --git a/server/filestore.go b/server/filestore.go index 21de257e..b127e66c 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -330,6 +330,8 @@ const ( wiThresh = int64(30 * time.Second) // Time threshold to write index info for non FIFO cases winfThresh = int64(2 * time.Second) + // Checksum size for hash for msg records. + recordHashSize = 8 ) func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { @@ -1054,6 +1056,10 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { startLastSeq := mb.last.seq + // Remove the .fss file and clear any cache we have set. + mb.clearCacheAndOffset() + mb.removePerSubjectInfoLocked() + buf, err := mb.loadBlock(nil) if err != nil || len(buf) == 0 { var ld *LostStreamData @@ -1079,9 +1085,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { mb.last.seq, mb.last.ts = 0, 0 firstNeedsSet := true - // Remove the .fss file from disk. - mb.removePerSubjectInfoLocked() - // Check if we need to decrypt. if mb.bek != nil && len(buf) > 0 { // Recreate to reset counter. @@ -1155,12 +1158,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { rl &^= hbit dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh { - truncate(index) - return gatherLost(lbuf - index), errBadMsg - } - - if index+rl > lbuf { + if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { truncate(index) return gatherLost(lbuf - index), errBadMsg } @@ -1176,15 +1174,17 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { addToDmap(seq) } index += rl - mb.last.seq = seq - mb.last.ts = ts + if seq >= mb.first.seq { + mb.last.seq = seq + mb.last.ts = ts + } continue } // This is for when we have index info that adjusts for deleted messages // at the head. So the first.seq will be already set here. If this is larger // replace what we have with this seq. - if firstNeedsSet && seq > mb.first.seq { + if firstNeedsSet && seq >= mb.first.seq { firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts } @@ -1201,12 +1201,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { hh.Write(hdr[4:20]) hh.Write(data[:slen]) if hasHeaders { - hh.Write(data[slen+4 : dlen-8]) + hh.Write(data[slen+4 : dlen-recordHashSize]) } else { - hh.Write(data[slen : dlen-8]) + hh.Write(data[slen : dlen-recordHashSize]) } checksum := hh.Sum(nil) - if !bytes.Equal(checksum, data[len(data)-8:]) { + if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) { truncate(index) return gatherLost(lbuf - index), errBadMsg } @@ -1247,6 +1247,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { mb.last.seq = mb.first.seq - 1 } + // Update our fss file if needed. + if len(mb.fss) > 0 { + mb.writePerSubjectInfo() + } + return nil, nil } @@ -2674,14 +2679,42 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() { fs.scb = nil defer func() { fs.scb = cb }() + var numMsgs uint64 + // collect all that are not correct. needAttention := make(map[string]*psi) for subj, psi := range fs.psim { + numMsgs += psi.total if psi.total > maxMsgsPer { needAttention[subj] = psi } } + // We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught. + // So do a quick sanity check here. If we detect a skew do a rebuild then re-check. + if numMsgs != fs.state.Msgs { + // Clear any global subject state. + fs.psim = make(map[string]*psi) + for _, mb := range fs.blks { + mb.removeIndexFile() + ld, err := mb.rebuildState() + mb.writeIndexInfo() + if err != nil && ld != nil { + fs.addLostData(ld) + } + fs.populateGlobalPerSubjectInfo(mb) + } + // Rebuild fs state too. + fs.rebuildStateLocked(nil) + // Need to redo blocks that need attention. + needAttention = make(map[string]*psi) + for subj, psi := range fs.psim { + if psi.total > maxMsgsPer { + needAttention[subj] = psi + } + } + } + // Collect all the msgBlks we alter. blks := make(map[*msgBlock]struct{}) @@ -3121,8 +3154,7 @@ func (mb *msgBlock) compact() { return } - // Close cache and index file and wipe delete map, then rebuild. - mb.clearCacheAndOffset() + // Remove index file and wipe delete map, then rebuild. mb.removeIndexFileLocked() mb.deleteDmap() mb.rebuildStateLocked() @@ -3148,6 +3180,11 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { bi := mb.cache.idx[slot] ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0 + // If this is a deleted slot return here. + if bi == dbit { + return 0, 0, false, errDeletedMsg + } + // Determine record length var rl uint32 if len(mb.cache.idx) > slot+1 { @@ -4295,7 +4332,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock { func (mb *msgBlock) indexCacheBuf(buf []byte) error { var le = binary.LittleEndian - var fseq uint64 + var fseq, pseq uint64 var idx []uint32 var index uint32 @@ -4328,7 +4365,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. - if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 { + if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { // This means something is off. // TODO(dlc) - Add into bad list? return errCorruptState @@ -4336,15 +4373,31 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Clear erase bit. seq = seq &^ ebit - // Adjust if we guessed wrong. - if seq != 0 && seq < fseq { - fseq = seq - } + // We defer checksum checks to individual msg cache lookups to amortorize costs and // not introduce latency for first message from a newly loaded block. - idx = append(idx, index) - mb.cache.lrl = uint32(rl) - index += mb.cache.lrl + if seq >= mb.first.seq { + // Track that we do not have holes. + // Not expected but did see it in the field. + if pseq > 0 && seq != pseq+1 { + if mb.dmap == nil { + mb.dmap = make(map[uint64]struct{}) + } + for dseq := pseq + 1; dseq < seq; dseq++ { + idx = append(idx, dbit) + mb.dmap[dseq] = struct{}{} + } + } + pseq = seq + + idx = append(idx, index) + mb.cache.lrl = uint32(rl) + // Adjust if we guessed wrong. + if seq != 0 && seq < fseq { + fseq = seq + } + } + index += rl } mb.cache.buf = buf mb.cache.idx = idx @@ -4686,6 +4739,9 @@ const hbit = 1 << 31 // Used for marking erased messages sequences. const ebit = 1 << 63 +// Used to mark a bad index as deleted. +const dbit = 1 << 30 + // Will do a lookup from cache. // Lock should be held. func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { @@ -4696,6 +4752,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { // If we have a delete map check it. if !mb.dmap.IsEmpty() { if mb.dmap.Exists(seq) { + mb.llts = time.Now().UnixNano() return nil, errDeletedMsg } } @@ -4838,9 +4895,9 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store hh.Write(hdr[4:20]) hh.Write(data[:slen]) if hasHeaders { - hh.Write(data[slen+4 : dlen-8]) + hh.Write(data[slen+4 : dlen-recordHashSize]) } else { - hh.Write(data[slen : dlen-8]) + hh.Write(data[slen : dlen-recordHashSize]) } if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) { return nil, errBadMsg