Check for checksum violations for all records and before sequence processing.

Also fix for bitrot test and a small bug fix for a leaking fd.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-08-31 12:20:41 -07:00
parent 2834142bdd
commit c110ceea94
2 changed files with 39 additions and 26 deletions

View File

@@ -1032,7 +1032,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
fd = mb.mfd
} else {
fd, err = os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
if err == nil {
defer fd.Close()
}
}
@@ -1078,6 +1078,26 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
return gatherLost(lbuf - index), errBadMsg
}
// Check for checksum failures before additional processing.
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}
// Grab our sequence and timestamp.
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))
@@ -1114,29 +1134,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
_, deleted = mb.dmap[seq]
}
// Always set last.
mb.last.seq = seq
mb.last.ts = ts
if !deleted {
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}
if firstNeedsSet {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
@@ -1162,6 +1160,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.fssNeedsWrite = true
}
}
// Always set last
mb.last.seq = seq
mb.last.ts = ts
// Advance to next record.
index += rl
}
@@ -4646,7 +4649,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
dlen := int(rl) - msgHdrSize
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
if dlen < 0 || slen > dlen || int(rl) > len(buf) {
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) {
return nil, errBadMsg
}
data := buf[msgHdrSize : msgHdrSize+dlen]

View File

@@ -1278,7 +1278,10 @@ func TestFileStoreBitRot(t *testing.T) {
// Now twiddle some bits.
fs.mu.Lock()
lmb := fs.lmb
contents, _ := os.ReadFile(lmb.mfn)
contents, err := os.ReadFile(lmb.mfn)
require_NoError(t, err)
require_True(t, len(contents) > 0)
var index int
for {
index = rand.Intn(len(contents))
@@ -1296,6 +1299,10 @@ func TestFileStoreBitRot(t *testing.T) {
if len(ld.Msgs) > 0 {
break
}
// If our bitrot caused us to not be able to recover any messages we can break as well.
if state := fs.State(); state.Msgs == 0 {
break
}
// Fail the test if we have tried the 10 times and still did not
// get any corruption report.
if i == 9 {
@@ -1314,7 +1321,10 @@ func TestFileStoreBitRot(t *testing.T) {
// checkMsgs will repair the underlying store, so checkMsgs should be clean now.
if ld := fs.checkMsgs(); ld != nil {
t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld)
// If we have no msgs left this will report the head msgs as lost again.
if state := fs.State(); state.Msgs > 0 {
t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld)
}
}
})
}