Merge branch 'main' into dev

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-07-28 10:42:56 -07:00

View File

@@ -330,6 +330,8 @@ const (
wiThresh = int64(30 * time.Second)
// Time threshold to write index info for non FIFO cases
winfThresh = int64(2 * time.Second)
// Checksum size for hash for msg records.
recordHashSize = 8
)
func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
@@ -1054,6 +1056,10 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
startLastSeq := mb.last.seq
// Remove the .fss file and clear any cache we have set.
mb.clearCacheAndOffset()
mb.removePerSubjectInfoLocked()
buf, err := mb.loadBlock(nil)
if err != nil || len(buf) == 0 {
var ld *LostStreamData
@@ -1079,9 +1085,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq, mb.last.ts = 0, 0
firstNeedsSet := true
// Remove the .fss file from disk.
mb.removePerSubjectInfoLocked()
// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
// Recreate to reset counter.
@@ -1155,12 +1158,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
if index+rl > lbuf {
if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
@@ -1176,15 +1174,17 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
addToDmap(seq)
}
index += rl
mb.last.seq = seq
mb.last.ts = ts
if seq >= mb.first.seq {
mb.last.seq = seq
mb.last.ts = ts
}
continue
}
// This is for when we have index info that adjusts for deleted messages
// at the head. So the first.seq will be already set here. If this is larger
// replace what we have with this seq.
if firstNeedsSet && seq > mb.first.seq {
if firstNeedsSet && seq >= mb.first.seq {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
@@ -1201,12 +1201,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-8])
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-8])
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-8:]) {
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
@@ -1247,6 +1247,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq = mb.first.seq - 1
}
// Update our fss file if needed.
if len(mb.fss) > 0 {
mb.writePerSubjectInfo()
}
return nil, nil
}
@@ -2674,14 +2679,42 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
fs.scb = nil
defer func() { fs.scb = cb }()
var numMsgs uint64
// collect all that are not correct.
needAttention := make(map[string]*psi)
for subj, psi := range fs.psim {
numMsgs += psi.total
if psi.total > maxMsgsPer {
needAttention[subj] = psi
}
}
// We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught.
// So do a quick sanity check here. If we detect a skew do a rebuild then re-check.
if numMsgs != fs.state.Msgs {
// Clear any global subject state.
fs.psim = make(map[string]*psi)
for _, mb := range fs.blks {
mb.removeIndexFile()
ld, err := mb.rebuildState()
mb.writeIndexInfo()
if err != nil && ld != nil {
fs.addLostData(ld)
}
fs.populateGlobalPerSubjectInfo(mb)
}
// Rebuild fs state too.
fs.rebuildStateLocked(nil)
// Need to redo blocks that need attention.
needAttention = make(map[string]*psi)
for subj, psi := range fs.psim {
if psi.total > maxMsgsPer {
needAttention[subj] = psi
}
}
}
// Collect all the msgBlks we alter.
blks := make(map[*msgBlock]struct{})
@@ -3121,8 +3154,7 @@ func (mb *msgBlock) compact() {
return
}
// Close cache and index file and wipe delete map, then rebuild.
mb.clearCacheAndOffset()
// Remove index file and wipe delete map, then rebuild.
mb.removeIndexFileLocked()
mb.deleteDmap()
mb.rebuildStateLocked()
@@ -3148,6 +3180,11 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
bi := mb.cache.idx[slot]
ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0
// If this is a deleted slot return here.
if bi == dbit {
return 0, 0, false, errDeletedMsg
}
// Determine record length
var rl uint32
if len(mb.cache.idx) > slot+1 {
@@ -4295,7 +4332,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
func (mb *msgBlock) indexCacheBuf(buf []byte) error {
var le = binary.LittleEndian
var fseq uint64
var fseq, pseq uint64
var idx []uint32
var index uint32
@@ -4328,7 +4365,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 {
if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
// This means something is off.
// TODO(dlc) - Add into bad list?
return errCorruptState
@@ -4336,15 +4373,31 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// Clear erase bit.
seq = seq &^ ebit
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
fseq = seq
}
// We defer checksum checks to individual msg cache lookups to amortorize costs and
// not introduce latency for first message from a newly loaded block.
idx = append(idx, index)
mb.cache.lrl = uint32(rl)
index += mb.cache.lrl
if seq >= mb.first.seq {
// Track that we do not have holes.
// Not expected but did see it in the field.
if pseq > 0 && seq != pseq+1 {
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
for dseq := pseq + 1; dseq < seq; dseq++ {
idx = append(idx, dbit)
mb.dmap[dseq] = struct{}{}
}
}
pseq = seq
idx = append(idx, index)
mb.cache.lrl = uint32(rl)
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
fseq = seq
}
}
index += rl
}
mb.cache.buf = buf
mb.cache.idx = idx
@@ -4686,6 +4739,9 @@ const hbit = 1 << 31
// Used for marking erased messages sequences.
const ebit = 1 << 63
// Used to mark a bad index as deleted.
const dbit = 1 << 30
// Will do a lookup from cache.
// Lock should be held.
func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
@@ -4696,6 +4752,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
// If we have a delete map check it.
if !mb.dmap.IsEmpty() {
if mb.dmap.Exists(seq) {
mb.llts = time.Now().UnixNano()
return nil, errDeletedMsg
}
}
@@ -4838,9 +4895,9 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-8])
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-8])
hh.Write(data[slen : dlen-recordHashSize])
}
if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) {
return nil, errBadMsg