Bitrot protection and tests

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-24 15:47:15 -07:00
parent 62fcfcd84d
commit 4afdfafd89
2 changed files with 153 additions and 2 deletions

View File

@@ -52,6 +52,7 @@ type fileStore struct {
wmb *bytes.Buffer
fch chan struct{}
qch chan struct{}
bad []uint64
closed bool
}
@@ -247,7 +248,7 @@ func (ms *fileStore) recoverMsgs() error {
if err != nil {
return fmt.Errorf("storage directory not readable")
}
// Recover the blocks.
// Recover all of the msg blocks.
for _, fi := range fis {
var index uint64
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
@@ -471,6 +472,75 @@ func (ms *fileStore) expireMsgs() {
}
}
// Check all the checksums for a message block.
func checkMsgBlockFile(fp *os.File, hh hash.Hash) []uint64 {
var le = binary.LittleEndian
var hdr [msgHdrSize]byte
var bad []uint64
r := bufio.NewReaderSize(fp, 32*1024*1024)
for {
if _, err := io.ReadFull(r, hdr[0:]); err != nil {
break
}
rl := le.Uint32(hdr[0:])
seq := le.Uint64(hdr[4:])
slen := le.Uint16(hdr[20:])
dlen := int(rl) - msgHdrSize
if dlen < 0 || int(slen) > dlen || dlen > int(rl) {
bad = append(bad, seq)
break
}
data := make([]byte, dlen)
if _, err := io.ReadFull(r, data); err != nil {
bad = append(bad, seq)
break
}
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
hh.Write(data[slen : dlen-8])
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-8:]) {
bad = append(bad, seq)
}
}
return bad
}
// This will check all the checksums on messages and report back any sequence numbers with errors.
func (ms *fileStore) checkMsgs() []uint64 {
ms.mu.Lock()
if ms.wmb.Len() > 0 {
ms.flushToFileLocked()
}
ms.mu.Unlock()
mdir := path.Join(ms.fcfg.StoreDir, msgDir)
fis, err := ioutil.ReadDir(mdir)
if err != nil {
return nil
}
key := sha256.Sum256([]byte(mdir))
hh, _ := highwayhash.New64(key[:])
// Check all of the msg blocks.
for _, fi := range fis {
var index uint64
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
if fp, err := os.Open(path.Join(mdir, fi.Name())); err != nil {
continue
} else {
ms.bad = append(ms.bad, checkMsgBlockFile(fp, hh)...)
fp.Close()
}
}
}
return ms.bad
}
// This will kick out our flush routine if its waiting.
func (ms *fileStore) kickFlusher() {
select {
@@ -538,7 +608,7 @@ func (ms *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64
// Calculate hash.
ms.hh.Reset()
ms.hh.Write(hdr[4:12])
ms.hh.Write(hdr[4:20])
ms.hh.Write([]byte(subj))
ms.hh.Write(msg)
checksum := ms.hh.Sum(nil)
@@ -616,10 +686,26 @@ func (ms *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *storedMsg {
// Read in the message regardless.
ts := int64(le.Uint64(hdr[12:]))
slen := le.Uint16(hdr[20:])
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) {
// This means something is off.
ms.bad = append(ms.bad, seq)
continue
}
data := make([]byte, dlen)
if _, err := io.ReadFull(r, data); err != nil {
break
}
// Check the checksum here.
ms.hh.Reset()
ms.hh.Write(hdr[4:20])
ms.hh.Write(data[:slen])
ms.hh.Write(data[slen : dlen-8])
checksum := ms.hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-8:]) {
ms.bad = append(ms.bad, seq)
continue
}
msg := &storedMsg{
subj: string(data[:slen]),
msg: data[slen : dlen-8],

View File

@@ -17,8 +17,11 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math/bits"
"math/rand"
"os"
"path/filepath"
"reflect"
"testing"
"time"
)
@@ -516,3 +519,65 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) {
t.Fatalf("Expected no bytes, got %d", stats.Bytes)
}
}
func TestFileStoreBitRot(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
ms, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, MsgSetConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer ms.Stop()
// Store some messages. Does not really matter how many.
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
ms.StoreMsg(subj, msg)
}
stats := ms.Stats()
if stats.Msgs != uint64(toStore) {
t.Fatalf("Expected %d msgs, got %d", toStore, stats.Msgs)
}
if badSeqs := len(ms.checkMsgs()); badSeqs > 0 {
t.Fatalf("Expected to have no corrupt msgs, got %d", badSeqs)
}
// Now twiddle some bits.
ms.mu.Lock()
lmb := ms.lmb
contents, _ := ioutil.ReadFile(lmb.mfn)
var index int
for {
index = rand.Intn(len(contents))
// Reverse one byte anywhere.
b := contents[index]
contents[index] = bits.Reverse8(b)
if b != contents[index] {
break
}
}
ioutil.WriteFile(lmb.mfn, contents, 0644)
ms.mu.Unlock()
bseqs := ms.checkMsgs()
if badSeqs := len(bseqs); badSeqs == 0 {
t.Fatalf("Expected to have corrupt msgs got none: changed [%d]", index)
}
// Make sure we can restore.
ms.Stop()
ms, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, MsgSetConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer ms.Stop()
if !reflect.DeepEqual(bseqs, ms.checkMsgs()) {
t.Fatalf("Different reporting on bad msgs: %+v vs %+v", bseqs, ms.checkMsgs())
}
}