From 3c85df0a443c12439c9cd9c098f3b43b5612c7f2 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 14 Mar 2021 05:18:52 -0700 Subject: [PATCH] Truncate up to entry, no need for previous Signed-off-by: Derek Collison --- server/const.go | 2 +- server/filestore_test.go | 34 ++++++++++++++++++++++++++++++++++ server/raft.go | 19 +++---------------- 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/server/const.go b/server/const.go index 752c0cee..5281968d 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-RC.8.4" + VERSION = "2.2.0-RC.8.5" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/filestore_test.go b/server/filestore_test.go index 62370aa2..944c68c1 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -810,6 +810,40 @@ func TestFileStoreCompact(t *testing.T) { } } +func TestFileStoreCompactLastPlusOne(t *testing.T) { + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 8192, AsyncFlush: false}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + subj, msg := "foo", make([]byte, 10_000) + for i := 0; i < 10_000; i++ { + if _, _, err := fs.StoreMsg(subj, nil, msg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + if state := fs.State(); state.Msgs != 10_000 { + t.Fatalf("Expected 1000000 msgs, got %d", state.Msgs) + } + if _, err := fs.Compact(10_001); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + state := fs.State() + if state.Msgs != 0 { + t.Fatalf("Expected no message but got %d", state.Msgs) + } + fmt.Printf("state is %+v\n", state) + + fs.StoreMsg(subj, nil, msg) + state = fs.State() + fmt.Printf("state is %+v\n", state) +} + func TestFileStoreCompactPerf(t *testing.T) { t.SkipNow() diff --git a/server/raft.go b/server/raft.go index 3299f38f..796fc44e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2210,25 +2210,12 @@ func (n *raft) attemptStepDown(newLeader string) { func (n *raft) truncateWal(ae *appendEntry) { n.debug("Truncating and repairing WAL") - // Special case if already at 0. - if ae.pindex == 0 { - n.pindex = ae.pindex - n.pterm = ae.pterm - n.wal.Purge() - return - } - - tindex := ae.pindex - 1 - if err := n.wal.Truncate(tindex); err != nil { + if err := n.wal.Truncate(ae.pindex); err != nil { n.setWriteErrLocked(err) return } - n.pindex = tindex - if nae, _ := n.loadEntry(tindex); nae != nil { - n.pterm = nae.term - } else { - n.pterm = ae.term - } + n.pindex = ae.pindex + n.pterm = ae.term } // processAppendEntry will process an appendEntry.