mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Fix lock bug, capture write errors and report better
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1370,6 +1370,7 @@ func (mb *msgBlock) truncate(sm *fileStoredMsg) (nmsgs, nbytes uint64, err error
|
||||
mb.mfd.ReadAt(lchk[:], eof-8)
|
||||
copy(mb.lchk[0:], lchk[:])
|
||||
} else {
|
||||
mb.mu.Unlock()
|
||||
return 0, 0, fmt.Errorf("failed to truncate msg block %d, file not open", mb.index)
|
||||
}
|
||||
|
||||
@@ -1632,7 +1633,6 @@ func (fs *fileStore) checkMsgs() *LostStreamData {
|
||||
// filestore lock will be held.
|
||||
func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error {
|
||||
mb.mu.Lock()
|
||||
|
||||
// Make sure we have a cache setup.
|
||||
if mb.cache == nil {
|
||||
mb.cache = &cache{}
|
||||
@@ -1725,7 +1725,9 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
|
||||
return err
|
||||
}
|
||||
if writeIndex {
|
||||
mb.writeIndexInfo()
|
||||
if err := mb.writeIndexInfo(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Kick the flusher here.
|
||||
@@ -2028,11 +2030,16 @@ func (mb *msgBlock) flushPendingMsgs() error {
|
||||
if err != nil {
|
||||
mb.removeIndexFile()
|
||||
mb.dirtyClose()
|
||||
if ld, err := mb.rebuildState(); err != nil && ld != nil {
|
||||
// Rebuild fs state too.
|
||||
mb.fs.rebuildState(ld)
|
||||
if !isOutOfSpaceErr(err) {
|
||||
if ld, err := mb.rebuildState(); err != nil && ld != nil {
|
||||
// Rebuild fs state too.
|
||||
fs := mb.fs
|
||||
fs.mu.Lock()
|
||||
fs.rebuildState(ld)
|
||||
fs.mu.Unlock()
|
||||
}
|
||||
}
|
||||
break
|
||||
return err
|
||||
}
|
||||
|
||||
woff += int64(n)
|
||||
|
||||
@@ -1876,7 +1876,7 @@ func TestFileStoreWriteFailures(t *testing.T) {
|
||||
// This test should be run inside an environment where this directory
|
||||
// has a limited size.
|
||||
// E.g. Docker
|
||||
// docker run -ti --tmpfs /jswf_test:rw,size=32k --rm -v ~/Development/go/src:/go/src -w /go/src/github.com/nats-io/nats-server/ golang:1.15 /bin/bash
|
||||
// docker run -ti --tmpfs /jswf_test:rw,size=32k --rm -v ~/Development/go/src:/go/src -w /go/src/github.com/nats-io/nats-server/ golang:1.16 /bin/bash
|
||||
tdir := path.Join("/", "jswf_test")
|
||||
if stat, err := os.Stat(tdir); err != nil || !stat.IsDir() {
|
||||
t.SkipNow()
|
||||
|
||||
Reference in New Issue
Block a user