From e776013bf085ef344c5bcca7eeeee4b9ec011566 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 13 Mar 2021 16:21:21 -0500 Subject: [PATCH] Close FDs when we are done writing Signed-off-by: Derek Collison --- server/filestore.go | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 25cae3e6..e7b72974 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1279,6 +1279,16 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) { ts *= 2 } mb.flushPendingMsgs() + // Check if we are no longer the last message block. If we are + // not we can close FDs and exit. + mb.fs.mu.RLock() + notLast := mb != mb.fs.lmb + mb.fs.mu.RUnlock() + if notLast { + if err := mb.closeFDs(); err == nil { + return + } + } } if infoChanged() { mb.writeIndexInfo() @@ -1784,6 +1794,25 @@ func (mb *msgBlock) setFlushing() { } } +// Try to close our FDs if we can. +func (mb *msgBlock) closeFDs() error { + mb.mu.Lock() + defer mb.mu.Unlock() + + if buf, err := mb.bytesPending(); err == errFlushRunning || len(buf) > 0 { + return errPendingData + } + if mb.mfd != nil { + mb.mfd.Close() + mb.mfd = nil + } + if mb.ifd != nil { + mb.ifd.Close() + mb.ifd = nil + } + return nil +} + // bytesPending returns the buffer to be used for writing to the underlying file. // This marks we are in flush and will return nil if asked again until cleared. // Lock should be held. @@ -1838,6 +1867,9 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg // Grab our current last message block. mb := fs.lmb if mb == nil || mb.numBytes()+rl > fs.fcfg.BlockSize { + if fs.fip { + mb.closeFDs() + } if mb, err = fs.newMsgBlockForWrite(); err != nil { return 0, err } @@ -2042,9 +2074,7 @@ func (mb *msgBlock) flushPendingMsgs() error { mfd := mb.mfd mb.mu.Unlock() - var tn int - - var n int + var n, tn int // Append new data to the message block file. for lbb := lob; lbb > 0; lbb = len(buf) { @@ -2231,6 +2261,7 @@ var ( errNotReadable = errors.New("storage directory not readable") errFlushRunning = errors.New("flush is already running") errCorruptState = errors.New("corrupt state file") + errPendingData = errors.New("pending data still present") ) // Used for marking messages that have had their checksums checked. @@ -2902,7 +2933,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { } } -// Called by purge to simply get rid of the cache and close and fds. +// Called by purge to simply get rid of the cache and close our fds. // Lock should not be held. func (mb *msgBlock) dirtyClose() { mb.mu.Lock()