Close FDs when we are done writing

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-13 16:21:21 -05:00
parent 2d521471fd
commit e776013bf0

View File

@@ -1279,6 +1279,16 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) {
ts *= 2
}
mb.flushPendingMsgs()
// Check if we are no longer the last message block. If we are
// not we can close FDs and exit.
mb.fs.mu.RLock()
notLast := mb != mb.fs.lmb
mb.fs.mu.RUnlock()
if notLast {
if err := mb.closeFDs(); err == nil {
return
}
}
}
if infoChanged() {
mb.writeIndexInfo()
@@ -1784,6 +1794,25 @@ func (mb *msgBlock) setFlushing() {
}
}
// Try to close our FDs if we can.
func (mb *msgBlock) closeFDs() error {
mb.mu.Lock()
defer mb.mu.Unlock()
if buf, err := mb.bytesPending(); err == errFlushRunning || len(buf) > 0 {
return errPendingData
}
if mb.mfd != nil {
mb.mfd.Close()
mb.mfd = nil
}
if mb.ifd != nil {
mb.ifd.Close()
mb.ifd = nil
}
return nil
}
// bytesPending returns the buffer to be used for writing to the underlying file.
// This marks we are in flush and will return nil if asked again until cleared.
// Lock should be held.
@@ -1838,6 +1867,9 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
// Grab our current last message block.
mb := fs.lmb
if mb == nil || mb.numBytes()+rl > fs.fcfg.BlockSize {
if fs.fip {
mb.closeFDs()
}
if mb, err = fs.newMsgBlockForWrite(); err != nil {
return 0, err
}
@@ -2042,9 +2074,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
mfd := mb.mfd
mb.mu.Unlock()
var tn int
var n int
var n, tn int
// Append new data to the message block file.
for lbb := lob; lbb > 0; lbb = len(buf) {
@@ -2231,6 +2261,7 @@ var (
errNotReadable = errors.New("storage directory not readable")
errFlushRunning = errors.New("flush is already running")
errCorruptState = errors.New("corrupt state file")
errPendingData = errors.New("pending data still present")
)
// Used for marking messages that have had their checksums checked.
@@ -2902,7 +2933,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
}
}
// Called by purge to simply get rid of the cache and close and fds.
// Called by purge to simply get rid of the cache and close our fds.
// Lock should not be held.
func (mb *msgBlock) dirtyClose() {
mb.mu.Lock()