Fixed data corruption bug, optimized Compact().

Also trim fs.blks slice appropriately to avoid unbound growth.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-28 05:14:25 -08:00
parent 6a03ab9629
commit f5cbd55b46
2 changed files with 103 additions and 27 deletions

View File

@@ -515,6 +515,7 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
rl := le.Uint32(hdr[0:])
slen := le.Uint16(hdr[20:])
hasHeaders := rl&hbit != 0
// Clear any headers bit that could be set.
rl &^= hbit
dlen := int(rl) - msgHdrSize
@@ -535,6 +536,7 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
// If the first seq we read does not match our indexed first seq, reset.
if index == 0 && seq > mb.first.seq {
mb.first.seq = seq
mb.first.ts = ts
}
// This is an old erased message, or a new one that we can track.
@@ -558,7 +560,11 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
hh.Write(data[slen : dlen-8])
if hasHeaders {
hh.Write(data[slen+4 : dlen-8])
} else {
hh.Write(data[slen : dlen-8])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-8:]) {
truncate(index)
@@ -1173,16 +1179,16 @@ func (fs *fileStore) isClosed() bool {
// Will spin up our flush loop.
func (mb *msgBlock) spinUpFlushLoop() {
mb.mu.Lock()
defer mb.mu.Unlock()
// Are we already running?
if mb.flusher {
mb.mu.Unlock()
return
}
mb.flusher = true
mb.fch = make(chan struct{}, 1)
mb.qch = make(chan struct{})
fch, qch := mb.fch, mb.qch
mb.mu.Unlock()
go mb.flushLoop(fch, qch)
}
@@ -2659,22 +2665,10 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
return fs.Purge()
}
// TODO(dlc) - We can be smarter for large compactions and drop whole msg blocks.
var purged uint64
if last := fs.lastSeq(); seq <= last {
for fseq := fs.firstSeq(); fseq < seq; fseq = fs.firstSeq() {
if found, err := fs.removeMsg(fseq, false); err != nil {
if err == ErrStoreMsgNotFound {
continue
} else if err == ErrStoreEOF {
err = nil
}
return purged, err
} else if found {
purged++
}
}
} else {
var bytes uint64
if last := fs.lastSeq(); seq > last {
// We are compacting past the end of our range. Do purge and set sequences correctly
// such that the next message placed will have seq.
var err error
@@ -2682,6 +2676,55 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
return 0, err
}
fs.resetFirst(seq)
} else {
// We have to delete interior messages.
fs.mu.Lock()
smb := fs.selectMsgBlock(seq)
if smb == nil {
fs.mu.Unlock()
return 0, nil
}
// All msgblocks up to this one can be thrown away.
for i, mb := range fs.blks {
if mb == smb {
fs.blks = append(fs.blks[:0:0], fs.blks[i:]...)
break
}
mb.mu.Lock()
purged += mb.msgs
bytes += mb.bytes
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
}
fs.mu.Unlock()
if err := smb.loadMsgs(); err != nil {
return purged, err
}
smb.mu.Lock()
for mseq := smb.first.seq; mseq < seq; mseq++ {
if _, rl, _, err := smb.slotInfo(int(mseq - smb.cache.fseq)); err != nil {
smb.bytes -= uint64(rl)
}
smb.msgs--
purged++
}
// Update first entry.
sm, _ := smb.cacheLookupWithLock(seq)
if sm != nil {
smb.first.seq = sm.seq
smb.first.ts = sm.ts
}
smb.mu.Unlock()
if sm != nil {
// Reset our version of first.
fs.state.FirstSeq = sm.seq
fs.state.FirstTime = time.Unix(0, sm.ts).UTC()
fs.state.Msgs -= purged
fs.state.Bytes -= bytes
}
}
return purged, nil
@@ -2764,13 +2807,6 @@ func (fs *fileStore) resetFirst(newFirst uint64) {
fs.mu.Unlock()
}
func (fs *fileStore) firstSeq() uint64 {
fs.mu.RLock()
seq := fs.state.FirstSeq
fs.mu.RUnlock()
return seq
}
func (fs *fileStore) lastSeq() uint64 {
fs.mu.RLock()
seq := fs.state.LastSeq
@@ -2806,7 +2842,8 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
// Remove from list.
for i, omb := range fs.blks {
if mb == omb {
fs.blks = append(fs.blks[:i], fs.blks[i+1:]...)
blks := append(fs.blks[:i], fs.blks[i+1:]...)
fs.blks = append(blks[:0:0], blks...)
break
}
}
@@ -2820,8 +2857,8 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
// Lock should not be held.
func (mb *msgBlock) dirtyClose() {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.dirtyCloseWithRemove(false)
mb.mu.Unlock()
}
// Should be called with lock held.

View File

@@ -810,6 +810,45 @@ func TestFileStoreCompact(t *testing.T) {
}
}
func TestFileStoreCompactPerf(t *testing.T) {
t.SkipNow()
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 8192, AsyncFlush: true}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 100_000; i++ {
fs.StoreMsg(subj, nil, msg)
}
if state := fs.State(); state.Msgs != 100_000 {
t.Fatalf("Expected 1000000 msgs, got %d", state.Msgs)
}
start := time.Now()
n, err := fs.Compact(90_001)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
t.Logf("Took %v to compact\n", time.Since(start))
if n != 90_000 {
t.Fatalf("Expected to have purged 90_000 msgs, got %d", n)
}
state := fs.State()
if state.Msgs != 10_000 {
t.Fatalf("Expected 10_000 msgs, got %d", state.Msgs)
}
if state.FirstSeq != 90_001 {
t.Fatalf("Expected first seq of 90_001, got %d", state.FirstSeq)
}
}
func TestFileStoreStreamTruncate(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)