Improvments to writeIndexInfo logic and managing open FDs.

Also hold lock while doing sync and optionally close FDs if idle.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-09-19 11:45:16 -07:00
parent 7a4c904761
commit 4283358dcd

View File

@@ -201,6 +201,8 @@ const (
defaultCacheIdxExpiration = 5 * time.Minute
// default sync interval
defaultSyncInterval = 60 * time.Second
// default idle timeout to close FDs.
closeFDsIdle = 30 * time.Second
// coalesceMinimum
coalesceMinimum = 16 * 1024
// maxFlushWait is maximum we will wait to gather messages to flush.
@@ -605,6 +607,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e
// Rewrite this to make sure we are sync'd.
mb.writeIndexInfo()
mb.closeFDs()
fs.blks = append(fs.blks, mb)
fs.lmb = mb
return mb, nil
@@ -884,7 +887,6 @@ func (fs *fileStore) recoverMsgs() error {
if len(fs.blks) > 0 {
sort.Slice(fs.blks, func(i, j int) bool { return fs.blks[i].index < fs.blks[j].index })
fs.lmb = fs.blks[len(fs.blks)-1]
err = fs.enableLastMsgBlockForWriting()
} else {
_, err = fs.newMsgBlockForWrite()
}
@@ -1012,7 +1014,6 @@ func (fs *fileStore) expireMsgsOnRecover() {
fs.lmb = nil
} else {
fs.lmb = fs.blks[lb-1]
fs.enableLastMsgBlockForWriting()
}
}
// Update top level accounting.
@@ -1184,8 +1185,7 @@ func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (tot
// If we loaded this block for this operation go ahead and expire it here.
if shouldExpire {
mb.llts = 0
mb.expireCacheLocked()
mb.tryForceExpireCacheLocked()
}
return total, first, last
@@ -1352,16 +1352,22 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
if lmb := fs.lmb; lmb != nil {
index = lmb.index + 1
// Determine if we can reclaim resources here.
// Make sure to write out our index file if needed.
if lmb.indexNeedsUpdate() {
lmb.writeIndexInfo()
}
// Determine if we can reclaim any resources here.
if fs.fip {
// Reset write timestamp and see if we can expire this cache.
lmb.mu.Lock()
lmb.closeFDsLocked()
if lmb.cache != nil {
// Reset write timestamp and see if we can expire this cache.
lwts, buf, llts := lmb.lwts, lmb.cache.buf, lmb.llts
lmb.lwts = 0
buf, llts := lmb.cache.buf, lmb.llts
lmb.expireCacheLocked()
// We could check for a certain time since last load, but to be safe just reuse if no loads.
lmb.lwts = lwts
// We could check for a certain time since last load, but to be safe just reuse if no loads at all.
if llts == 0 && (lmb.cache == nil || lmb.cache.buf == nil) {
rbuf = buf
}
@@ -1459,30 +1465,6 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
return nil
}
// Make sure we can write to the last message block.
// Lock should be held.
func (fs *fileStore) enableLastMsgBlockForWriting() error {
mb := fs.lmb
if mb == nil {
return fmt.Errorf("no last message block assigned, can not enable for writing")
}
if mb.mfd != nil {
return nil
}
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
return fmt.Errorf("error opening msg block file [%q]: %v", mb.mfn, err)
}
mb.mfd = mfd
// Spin up our flusher loop if needed.
if !fs.fip {
mb.spinUpFlushLoop()
}
return nil
}
// Stores a raw message with expected sequence number and timestamp.
// Lock should be held.
func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int64) error {
@@ -2379,6 +2361,20 @@ func (mb *msgBlock) expireCache() {
mb.expireCacheLocked()
}
func (mb *msgBlock) tryForceExpireCache() {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.tryForceExpireCacheLocked()
}
// We will attempt to force expire this be temp clearing the last load time.
func (mb *msgBlock) tryForceExpireCacheLocked() {
llts := mb.llts
mb.llts = 0
mb.expireCacheLocked()
mb.llts = llts
}
func (mb *msgBlock) expireCacheLocked() {
if mb.cache == nil {
if mb.ctmr != nil {
@@ -2477,7 +2473,9 @@ func (fs *fileStore) checkAndFlushAllBlocks() {
if mb.pendingWriteSize() > 0 {
mb.flushPendingMsgs()
}
mb.writeIndexInfo()
if mb.indexNeedsUpdate() {
mb.writeIndexInfo()
}
}
}
@@ -2497,6 +2495,28 @@ func (fs *fileStore) checkMsgs() *LostStreamData {
return fs.ld
}
// Lock should be held.
func (mb *msgBlock) enableForWriting(fip bool) error {
if mb == nil {
return errNoMsgBlk
}
if mb.mfd != nil {
return nil
}
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
return fmt.Errorf("error opening msg block file [%q]: %v", mb.mfn, err)
}
mb.mfd = mfd
// Spin up our flusher loop if needed.
if !fip {
mb.spinUpFlushLoop()
}
return nil
}
// Will write the message record to the underlying message block.
// filestore lock will be held.
func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error {
@@ -2505,6 +2525,12 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
if mb.cache == nil {
mb.setupWriteCache(nil)
}
// Enable for writing if our mfd is not open.
if mb.mfd == nil {
if err := mb.enableForWriting(flush); err != nil {
return err
}
}
// Indexing
index := len(mb.cache.buf) + int(mb.cache.off)
@@ -2713,19 +2739,29 @@ func (fs *fileStore) syncBlocks() {
fs.mu.RUnlock()
for _, mb := range blks {
mb.mu.RLock()
mfd := mb.mfd
ifd := mb.ifd
liwsz := mb.liwsz
mb.mu.RUnlock()
if mfd != nil {
mfd.Sync()
// Flush anything that may be pending.
if mb.pendingWriteSize() > 0 {
mb.flushPendingMsgs()
}
if ifd != nil {
ifd.Truncate(liwsz)
ifd.Sync()
if mb.indexNeedsUpdate() {
mb.writeIndexInfo()
}
// Do actual sync. Hold lock for consistency.
mb.mu.Lock()
if !mb.closed {
if mb.mfd != nil {
mb.mfd.Sync()
}
if mb.ifd != nil {
mb.ifd.Truncate(mb.liwsz)
mb.ifd.Sync()
}
// See if we can close FDs do to being idle.
if mb.ifd != nil || mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
}
mb.mu.Unlock()
}
fs.mu.Lock()
@@ -3114,6 +3150,7 @@ var (
errPendingData = errors.New("pending data still present")
errNoEncryption = errors.New("encryption not enabled")
errBadKeySize = errors.New("encryption bad key size")
errNoMsgBlk = errors.New("no message block")
)
// Used for marking messages that have had their checksums checked.
@@ -3227,10 +3264,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
// We detected a linear scan and access to the last message.
if shouldTryExpire {
mb.mu.Lock()
mb.llts = 0
mb.expireCacheLocked()
mb.mu.Unlock()
mb.tryForceExpireCache()
}
return fsm, nil
@@ -3389,6 +3423,26 @@ func fileStoreMsgSizeEstimate(slen, maxPayload int) uint64 {
return uint64(emptyRecordLen + slen + 4 + maxPayload)
}
// Determine time since last write or remove of a message.
// Read lock should be held.
func (mb *msgBlock) sinceLastWriteActivity() time.Duration {
if mb.closed {
return 0
}
last := mb.lwts
if mb.lrts > last {
last = mb.lrts
}
return time.Since(time.Unix(0, last).UTC())
}
// Determine if we need to write out this index info.
func (mb *msgBlock) indexNeedsUpdate() bool {
mb.mu.RLock()
defer mb.mu.RUnlock()
return mb.lwits < mb.lwts || mb.lwits < mb.lrts
}
// Write index info to the appropriate file.
// Lock should be held.
func (mb *msgBlock) writeIndexInfo() error {
@@ -3698,10 +3752,8 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
// Expire if we were responsible for loading.
if shouldExpire {
// Expire this cache before moving on.
mb.llts = 0
mb.expireCacheLocked()
mb.tryForceExpireCacheLocked()
}
mb.mu.Unlock()
// Update our index info on disk.
mb.writeIndexInfo()
@@ -3907,7 +3959,9 @@ func (fs *fileStore) Truncate(seq uint64) error {
// Set lmb to nlmb and make sure writeable.
fs.lmb = nlmb
fs.enableLastMsgBlockForWriting()
if err := nlmb.enableForWriting(fs.fip); err != nil {
return err
}
var purged, bytes uint64
@@ -4102,8 +4156,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
}
if shouldExpire {
// Expire this cache before moving on.
mb.llts = 0
mb.expireCacheLocked()
mb.tryForceExpireCacheLocked()
}
return nil
}
@@ -4385,9 +4438,10 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ
for _, mb := range blks {
if mb.pendingWriteSize() > 0 {
mb.flushPendingMsgs()
}
if mb.indexNeedsUpdate() {
mb.writeIndexInfo()
}
mb.mu.Lock()
buf, err := ioutil.ReadFile(mb.ifn)
if err != nil {