mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Changes to handle short writes. Bug fixes to truncation.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -407,8 +407,8 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) *msgBlock {
|
||||
// Close here since we need to rebuild state.
|
||||
file.Close()
|
||||
|
||||
// If we get an error rebuilding the message block state record that with the fs itself.
|
||||
if ld, err := mb.rebuildState(); err != nil && ld != nil {
|
||||
// If we get data loss rebuilding the message block state record that with the fs itself.
|
||||
if ld, _ := mb.rebuildState(); ld != nil {
|
||||
fs.rebuildState(ld)
|
||||
}
|
||||
// Rewrite this to make sure we are sync'd.
|
||||
@@ -486,18 +486,22 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
|
||||
if mb.mfd != nil {
|
||||
fd = mb.mfd
|
||||
} else {
|
||||
fd, _ := os.Open(mb.mfn)
|
||||
defer fd.Close()
|
||||
fd, err = os.OpenFile(mb.mfn, os.O_RDWR, 0644)
|
||||
if err != nil {
|
||||
defer fd.Close()
|
||||
}
|
||||
}
|
||||
if fd != nil {
|
||||
fd.Truncate(int64(index))
|
||||
fd.Sync()
|
||||
if fd == nil {
|
||||
return
|
||||
}
|
||||
if err := fd.Truncate(int64(index)); err == nil {
|
||||
// Update our checksum.
|
||||
if index >= 8 {
|
||||
var lchk [8]byte
|
||||
mb.mfd.ReadAt(lchk[:], int64(index-8))
|
||||
fd.ReadAt(lchk[:], int64(index-8))
|
||||
copy(mb.lchk[0:], lchk[:])
|
||||
}
|
||||
fd.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,6 +515,11 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
|
||||
}
|
||||
|
||||
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
|
||||
if index+msgHdrSize >= lbuf {
|
||||
truncate(index)
|
||||
return gatherLost(lbuf - index), nil
|
||||
}
|
||||
|
||||
hdr := buf[index : index+msgHdrSize]
|
||||
rl := le.Uint32(hdr[0:])
|
||||
slen := le.Uint16(hdr[20:])
|
||||
@@ -1943,6 +1952,9 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
|
||||
lbuf := uint32(len(buf))
|
||||
|
||||
for index < lbuf {
|
||||
if index+msgHdrSize >= lbuf {
|
||||
return errCorruptState
|
||||
}
|
||||
hdr := buf[index : index+msgHdrSize]
|
||||
rl := le.Uint32(hdr[0:])
|
||||
seq := le.Uint64(hdr[4:])
|
||||
@@ -1957,7 +1969,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
|
||||
if dlen < 0 || int(slen) > dlen || dlen > int(rl) {
|
||||
// This means something is off.
|
||||
// TODO(dlc) - Add into bad list?
|
||||
return errBadMsg
|
||||
return errCorruptState
|
||||
}
|
||||
// Clear erase bit.
|
||||
seq = seq &^ ebit
|
||||
@@ -2042,10 +2054,9 @@ func (mb *msgBlock) flushPendingMsgs() error {
|
||||
if !isOutOfSpaceErr(err) {
|
||||
if ld, err := mb.rebuildState(); err != nil && ld != nil {
|
||||
// Rebuild fs state too.
|
||||
fs := mb.fs
|
||||
fs.mu.Lock()
|
||||
fs.rebuildState(ld)
|
||||
fs.mu.Unlock()
|
||||
mb.fs.mu.Lock()
|
||||
mb.fs.rebuildState(ld)
|
||||
mb.fs.mu.Unlock()
|
||||
}
|
||||
}
|
||||
return err
|
||||
@@ -2128,7 +2139,14 @@ func (mb *msgBlock) loadMsgs() error {
|
||||
mb.loading = true
|
||||
defer mb.clearLoading()
|
||||
|
||||
var nchecks int
|
||||
|
||||
checkCache:
|
||||
nchecks++
|
||||
if nchecks > 8 {
|
||||
return errCorruptState
|
||||
}
|
||||
|
||||
// Check to see if we have a full cache.
|
||||
if mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 && len(mb.cache.buf) > 0 {
|
||||
return nil
|
||||
@@ -2160,7 +2178,21 @@ checkCache:
|
||||
mb.clearCacheAndOffset()
|
||||
|
||||
if err := mb.indexCacheBuf(buf); err != nil {
|
||||
return err
|
||||
if err == errCorruptState {
|
||||
fs := mb.fs
|
||||
mb.mu.Unlock()
|
||||
var ld *LostStreamData
|
||||
if ld, err = mb.rebuildState(); ld != nil {
|
||||
fs.mu.Lock()
|
||||
fs.rebuildState(ld)
|
||||
fs.mu.Unlock()
|
||||
}
|
||||
mb.mu.Lock()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
goto checkCache
|
||||
}
|
||||
|
||||
if len(buf) > 0 {
|
||||
|
||||
@@ -1920,10 +1920,10 @@ func TestFileStoreWriteFailures(t *testing.T) {
|
||||
if stat, err := os.Stat(tdir); err != nil || !stat.IsDir() {
|
||||
t.SkipNow()
|
||||
}
|
||||
defer os.RemoveAll(tdir)
|
||||
|
||||
storeDir := path.Join(tdir, JetStreamStoreDir)
|
||||
os.MkdirAll(storeDir, 0755)
|
||||
defer os.RemoveAll(storeDir)
|
||||
|
||||
subj, msg := "foo", []byte("Hello Write Failures!")
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
@@ -1945,6 +1945,7 @@ func TestFileStoreWriteFailures(t *testing.T) {
|
||||
}
|
||||
|
||||
state := fs.State()
|
||||
|
||||
if state.LastSeq != lseq-1 {
|
||||
t.Fatalf("Expected last seq to be %d, got %d\n", lseq-1, state.LastSeq)
|
||||
}
|
||||
@@ -1955,7 +1956,7 @@ func TestFileStoreWriteFailures(t *testing.T) {
|
||||
t.Fatalf("Expected error loading seq that failed, got none")
|
||||
}
|
||||
// Loading should still work.
|
||||
if _, _, _, _, err := fs.LoadMsg(2); err != nil {
|
||||
if _, _, _, _, err := fs.LoadMsg(1); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
@@ -1969,6 +1970,7 @@ func TestFileStoreWriteFailures(t *testing.T) {
|
||||
defer fs.Stop()
|
||||
|
||||
state2 := fs.State()
|
||||
|
||||
// Ignore lost state.
|
||||
state.Lost, state2.Lost = nil, nil
|
||||
if !reflect.DeepEqual(state2, state) {
|
||||
|
||||
Reference in New Issue
Block a user