Truncate up to entry, no need for previous

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-14 05:18:52 -07:00
parent 10afedcc46
commit 3c85df0a44
3 changed files with 38 additions and 17 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.0-RC.8.4"
VERSION = "2.2.0-RC.8.5"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -810,6 +810,40 @@ func TestFileStoreCompact(t *testing.T) {
}
}
func TestFileStoreCompactLastPlusOne(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 8192, AsyncFlush: false}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
subj, msg := "foo", make([]byte, 10_000)
for i := 0; i < 10_000; i++ {
if _, _, err := fs.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
if state := fs.State(); state.Msgs != 10_000 {
t.Fatalf("Expected 1000000 msgs, got %d", state.Msgs)
}
if _, err := fs.Compact(10_001); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
state := fs.State()
if state.Msgs != 0 {
t.Fatalf("Expected no message but got %d", state.Msgs)
}
fmt.Printf("state is %+v\n", state)
fs.StoreMsg(subj, nil, msg)
state = fs.State()
fmt.Printf("state is %+v\n", state)
}
func TestFileStoreCompactPerf(t *testing.T) {
t.SkipNow()

View File

@@ -2210,25 +2210,12 @@ func (n *raft) attemptStepDown(newLeader string) {
func (n *raft) truncateWal(ae *appendEntry) {
n.debug("Truncating and repairing WAL")
// Special case if already at 0.
if ae.pindex == 0 {
n.pindex = ae.pindex
n.pterm = ae.pterm
n.wal.Purge()
return
}
tindex := ae.pindex - 1
if err := n.wal.Truncate(tindex); err != nil {
if err := n.wal.Truncate(ae.pindex); err != nil {
n.setWriteErrLocked(err)
return
}
n.pindex = tindex
if nae, _ := n.loadEntry(tindex); nae != nil {
n.pterm = nae.term
} else {
n.pterm = ae.term
}
n.pindex = ae.pindex
n.pterm = ae.term
}
// processAppendEntry will process an appendEntry.