mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Partial fix for #2068.
This will not protect against the index file being completely removed. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -616,6 +616,11 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
|
||||
index += rl
|
||||
}
|
||||
|
||||
// For empty msg blocks make sure we recover last seq correctly based off of first.
|
||||
if mb.msgs == 0 {
|
||||
mb.last.seq = mb.first.seq - 1
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2769,3 +2769,63 @@ func TestFileStoreStreamDeleteCacheBug(t *testing.T) {
|
||||
t.Fatalf("Unexpected error looking up msg: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/nats-io/nats-server/issues/2068
|
||||
func TestFileStoreStreamPurgeAndDirtyRestartBug(t *testing.T) {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
defer removeDir(t, storeDir)
|
||||
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer fs.Stop()
|
||||
|
||||
// Load up some messages.
|
||||
num, subj, hdr, msg := 100, "foo", []byte("name:derek"), []byte("Hello World")
|
||||
for i := 0; i < num; i++ {
|
||||
fs.StoreMsg(subj, hdr, msg)
|
||||
}
|
||||
// Now purge
|
||||
fs.Purge()
|
||||
|
||||
// Snapshot state.
|
||||
state := fs.State()
|
||||
if state.FirstSeq != uint64(num+1) || state.LastSeq != uint64(num) {
|
||||
t.Fatalf("Unexpected state: %+v", state)
|
||||
}
|
||||
|
||||
// Now we will stop the store and corrupt the index such that on restart it will do a rebuild.
|
||||
fs.mu.Lock()
|
||||
lmb := fs.lmb
|
||||
fs.mu.Unlock()
|
||||
|
||||
lmb.mu.RLock()
|
||||
ifn := lmb.ifn
|
||||
lmb.mu.RUnlock()
|
||||
|
||||
fs.Stop()
|
||||
|
||||
fd, err := os.OpenFile(ifn, os.O_RDWR, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("Error opening the index file: %v", err)
|
||||
}
|
||||
defer fd.Close()
|
||||
fi, _ := fd.Stat()
|
||||
if _, err = fd.WriteAt([]byte{1, 1}, fi.Size()-2); err != nil {
|
||||
t.Fatalf("Error writing the index file: %v", err)
|
||||
}
|
||||
fd.Close()
|
||||
|
||||
// Restart
|
||||
fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer fs.Stop()
|
||||
|
||||
state = fs.State()
|
||||
if state.FirstSeq != uint64(num+1) || state.LastSeq != uint64(num) {
|
||||
t.Fatalf("Unexpected state: %+v", state)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user