diff --git a/server/filestore.go b/server/filestore.go index 19f7963f..279a6d66 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -616,6 +616,11 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { index += rl } + // For empty msg blocks make sure we recover last seq correctly based off of first. + if mb.msgs == 0 { + mb.last.seq = mb.first.seq - 1 + } + return nil, nil } diff --git a/server/filestore_test.go b/server/filestore_test.go index 6e0fb800..93ea9e5a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -2769,3 +2769,63 @@ func TestFileStoreStreamDeleteCacheBug(t *testing.T) { t.Fatalf("Unexpected error looking up msg: %v", err) } } + +// https://github.com/nats-io/nats-server/issues/2068 +func TestFileStoreStreamPurgeAndDirtyRestartBug(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + // Load up some messages. + num, subj, hdr, msg := 100, "foo", []byte("name:derek"), []byte("Hello World") + for i := 0; i < num; i++ { + fs.StoreMsg(subj, hdr, msg) + } + // Now purge + fs.Purge() + + // Snapshot state. + state := fs.State() + if state.FirstSeq != uint64(num+1) || state.LastSeq != uint64(num) { + t.Fatalf("Unexpected state: %+v", state) + } + + // Now we will stop the store and corrupt the index such that on restart it will do a rebuild. + fs.mu.Lock() + lmb := fs.lmb + fs.mu.Unlock() + + lmb.mu.RLock() + ifn := lmb.ifn + lmb.mu.RUnlock() + + fs.Stop() + + fd, err := os.OpenFile(ifn, os.O_RDWR, 0644) + if err != nil { + t.Fatalf("Error opening the index file: %v", err) + } + defer fd.Close() + fi, _ := fd.Stat() + if _, err = fd.WriteAt([]byte{1, 1}, fi.Size()-2); err != nil { + t.Fatalf("Error writing the index file: %v", err) + } + fd.Close() + + // Restart + fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + state = fs.State() + if state.FirstSeq != uint64(num+1) || state.LastSeq != uint64(num) { + t.Fatalf("Unexpected state: %+v", state) + } +}