diff --git a/server/filestore.go b/server/filestore.go index 43559e0c..ac8ec473 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -515,6 +515,7 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { rl := le.Uint32(hdr[0:]) slen := le.Uint16(hdr[20:]) + hasHeaders := rl&hbit != 0 // Clear any headers bit that could be set. rl &^= hbit dlen := int(rl) - msgHdrSize @@ -535,6 +536,7 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { // If the first seq we read does not match our indexed first seq, reset. if index == 0 && seq > mb.first.seq { mb.first.seq = seq + mb.first.ts = ts } // This is an old erased message, or a new one that we can track. @@ -558,7 +560,11 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { hh.Reset() hh.Write(hdr[4:20]) hh.Write(data[:slen]) - hh.Write(data[slen : dlen-8]) + if hasHeaders { + hh.Write(data[slen+4 : dlen-8]) + } else { + hh.Write(data[slen : dlen-8]) + } checksum := hh.Sum(nil) if !bytes.Equal(checksum, data[len(data)-8:]) { truncate(index) @@ -1173,16 +1179,16 @@ func (fs *fileStore) isClosed() bool { // Will spin up our flush loop. func (mb *msgBlock) spinUpFlushLoop() { mb.mu.Lock() + defer mb.mu.Unlock() + // Are we already running? if mb.flusher { - mb.mu.Unlock() return } mb.flusher = true mb.fch = make(chan struct{}, 1) mb.qch = make(chan struct{}) fch, qch := mb.fch, mb.qch - mb.mu.Unlock() go mb.flushLoop(fch, qch) } @@ -2659,22 +2665,10 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { return fs.Purge() } - // TODO(dlc) - We can be smarter for large compactions and drop whole msg blocks. var purged uint64 - if last := fs.lastSeq(); seq <= last { - for fseq := fs.firstSeq(); fseq < seq; fseq = fs.firstSeq() { - if found, err := fs.removeMsg(fseq, false); err != nil { - if err == ErrStoreMsgNotFound { - continue - } else if err == ErrStoreEOF { - err = nil - } - return purged, err - } else if found { - purged++ - } - } - } else { + var bytes uint64 + + if last := fs.lastSeq(); seq > last { // We are compacting past the end of our range. Do purge and set sequences correctly // such that the next message placed will have seq. var err error @@ -2682,6 +2676,55 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { return 0, err } fs.resetFirst(seq) + } else { + // We have to delete interior messages. + fs.mu.Lock() + smb := fs.selectMsgBlock(seq) + if smb == nil { + fs.mu.Unlock() + return 0, nil + } + // All msgblocks up to this one can be thrown away. + for i, mb := range fs.blks { + if mb == smb { + fs.blks = append(fs.blks[:0:0], fs.blks[i:]...) + break + } + mb.mu.Lock() + purged += mb.msgs + bytes += mb.bytes + mb.dirtyCloseWithRemove(true) + mb.mu.Unlock() + } + fs.mu.Unlock() + + if err := smb.loadMsgs(); err != nil { + return purged, err + } + + smb.mu.Lock() + for mseq := smb.first.seq; mseq < seq; mseq++ { + if _, rl, _, err := smb.slotInfo(int(mseq - smb.cache.fseq)); err != nil { + smb.bytes -= uint64(rl) + } + smb.msgs-- + purged++ + } + // Update first entry. + sm, _ := smb.cacheLookupWithLock(seq) + if sm != nil { + smb.first.seq = sm.seq + smb.first.ts = sm.ts + } + smb.mu.Unlock() + + if sm != nil { + // Reset our version of first. + fs.state.FirstSeq = sm.seq + fs.state.FirstTime = time.Unix(0, sm.ts).UTC() + fs.state.Msgs -= purged + fs.state.Bytes -= bytes + } } return purged, nil @@ -2764,13 +2807,6 @@ func (fs *fileStore) resetFirst(newFirst uint64) { fs.mu.Unlock() } -func (fs *fileStore) firstSeq() uint64 { - fs.mu.RLock() - seq := fs.state.FirstSeq - fs.mu.RUnlock() - return seq -} - func (fs *fileStore) lastSeq() uint64 { fs.mu.RLock() seq := fs.state.LastSeq @@ -2806,7 +2842,8 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { // Remove from list. for i, omb := range fs.blks { if mb == omb { - fs.blks = append(fs.blks[:i], fs.blks[i+1:]...) + blks := append(fs.blks[:i], fs.blks[i+1:]...) + fs.blks = append(blks[:0:0], blks...) break } } @@ -2820,8 +2857,8 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { // Lock should not be held. func (mb *msgBlock) dirtyClose() { mb.mu.Lock() + defer mb.mu.Unlock() mb.dirtyCloseWithRemove(false) - mb.mu.Unlock() } // Should be called with lock held. diff --git a/server/filestore_test.go b/server/filestore_test.go index 9ac0bd8e..0c0c9f61 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -810,6 +810,45 @@ func TestFileStoreCompact(t *testing.T) { } } +func TestFileStoreCompactPerf(t *testing.T) { + t.SkipNow() + + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 8192, AsyncFlush: true}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + subj, msg := "foo", []byte("Hello World") + for i := 0; i < 100_000; i++ { + fs.StoreMsg(subj, nil, msg) + } + if state := fs.State(); state.Msgs != 100_000 { + t.Fatalf("Expected 1000000 msgs, got %d", state.Msgs) + } + start := time.Now() + n, err := fs.Compact(90_001) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + t.Logf("Took %v to compact\n", time.Since(start)) + + if n != 90_000 { + t.Fatalf("Expected to have purged 90_000 msgs, got %d", n) + } + state := fs.State() + if state.Msgs != 10_000 { + t.Fatalf("Expected 10_000 msgs, got %d", state.Msgs) + } + if state.FirstSeq != 90_001 { + t.Fatalf("Expected first seq of 90_001, got %d", state.FirstSeq) + } +} + func TestFileStoreStreamTruncate(t *testing.T) { storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) os.MkdirAll(storeDir, 0755)