From 7a4c90476142c90e16d41e7fe44c2a054e0f4e30 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 18 Sep 2021 09:51:54 -0700 Subject: [PATCH] Improvements to cache management. Signed-off-by: Derek Collison --- server/filestore.go | 132 ++++++++++++++++++++------------------------ 1 file changed, 59 insertions(+), 73 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index ea5f0e19..2e4ac966 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1118,7 +1118,7 @@ func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (tot // We need to scan this block to compute the correct number of pending for this block. // We want to only do this once so we will adjust subs and test against them all here. - if !mb.cacheAlreadyLoaded() { + if mb.cacheNotLoaded() { mb.loadMsgsWithLock() shouldExpire = true } @@ -1184,6 +1184,7 @@ func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (tot // If we loaded this block for this operation go ahead and expire it here. if shouldExpire { + mb.llts = 0 mb.expireCacheLocked() } @@ -1751,29 +1752,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error return false, err } - // If we have a callback grab the message since we need the subject. - // TODO(dlc) - This will cause whole buffer to be loaded which I was trying - // to avoid. Maybe use side cache for subjects or understand when we really need them. - // Meaning if the stream above is only a single subject no need to store, this is just - // for updating stream pending for consumers. - var sm *fileStoredMsg - if fs.scb != nil { - sm, _ = mb.fetchMsg(seq) - } - mb.mu.Lock() - // Check cache. This should be very rare. - if !mb.cacheAlreadyLoaded() { - if err := mb.loadMsgsWithLock(); err != nil { - mb.mu.Unlock() - fsUnlock() - return false, err - } - } - - // See if the sequence numbers is still relevant. Check first and cache first. - if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { + // See if the sequence numbers is still relevant. + if seq < mb.first.seq { mb.mu.Unlock() fsUnlock() return false, nil @@ -1788,14 +1770,29 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error } } + // We used to not have to load in the messages except with callbacks or the filtered subject state (which is now always on). + // Now just load regardless. + // TODO(dlc) - Figure out a way not to have to load it in, we need subject tracking outside main data block. + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + mb.mu.Unlock() + fsUnlock() + return false, err + } + } + + sm, err := mb.cacheLookup(seq) + if err != nil { + mb.mu.Unlock() + fsUnlock() + return false, err + } + // Grab size + msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) + // Set cache timestamp for last remove. mb.lrts = time.Now().UnixNano() - // Grab record length from idx. - slot := seq - mb.cache.fseq - ri, rl, _, _ := mb.slotInfo(int(slot)) - msz := uint64(rl) - // Global stats fs.state.Msgs-- fs.state.Bytes -= msz @@ -1805,21 +1802,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error mb.bytes -= msz // If we are tracking multiple subjects here make sure we update that accounting. - if mb.fss != nil { - if sm == nil { - if !mb.cacheAlreadyLoaded() { - mb.loadMsgsWithLock() - } - sm, _ = mb.cacheLookup(seq) - } - if sm != nil { - mb.removeSeqPerSubject(sm.subj, seq) - } - } + mb.removeSeqPerSubject(sm.subj, seq) var shouldWriteIndex, firstSeqNeedsUpdate bool if secure { + // Grab record info. + ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq)) mb.eraseMsg(seq, int(ri), int(rl)) } @@ -1911,7 +1900,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error // writing new messages. We will silently bail on any issues with the underlying block and let someone else detect. // Write lock needs to be held. func (mb *msgBlock) compact() { - if !mb.cacheAlreadyLoaded() { + if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { return } @@ -2958,6 +2947,8 @@ func (mb *msgBlock) flushPendingMsgs() error { mb.cache.wp = 0 // Place buffer back in the cache structure. mb.cache.buf = buf + // Mark fseq to 0 + mb.cache.fseq = 0 } return mb.werr @@ -2978,13 +2969,18 @@ func (mb *msgBlock) loadMsgs() error { // Lock should be held. func (mb *msgBlock) cacheAlreadyLoaded() bool { - if mb.cache == nil || mb.cache.off != 0 { + if mb.cache == nil || mb.cache.off != 0 || mb.cache.fseq == 0 || len(mb.cache.buf) == 0 { return false } numEntries := mb.msgs + uint64(len(mb.dmap)) + (mb.first.seq - mb.cache.fseq) return numEntries == uint64(len(mb.cache.idx)) && len(mb.cache.buf) > 0 } +// Lock should be held. +func (mb *msgBlock) cacheNotLoaded() bool { + return !mb.cacheAlreadyLoaded() +} + // Used to load in the block contents. // Lock should be held and all conditionals satisfied prior. func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) { @@ -3096,19 +3092,13 @@ checkCache: // Fetch a message from this block, possibly reading in and caching the messages. // We assume the block was selected and is correct, so we do not do range checks. func (mb *msgBlock) fetchMsg(seq uint64) (*fileStoredMsg, error) { - var sm *fileStoredMsg - mb.mu.Lock() defer mb.mu.Unlock() - sm, err := mb.cacheLookup(seq) - if err == nil || (err != errNoCache && err != errPartialCache) { - return sm, err - } - - // We have a cache miss here. - if err := mb.loadMsgsWithLock(); err != nil { - return nil, err + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + return nil, err + } } return mb.cacheLookup(seq) } @@ -3136,11 +3126,7 @@ const ebit = 1 << 63 // Will do a lookup from cache. // Lock should be held. func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { - if mb.cache == nil || len(mb.cache.idx) == 0 { - return nil, errNoCache - } - - if seq < mb.first.seq || seq < mb.cache.fseq || seq > mb.last.seq { + if seq < mb.first.seq || seq > mb.last.seq { return nil, ErrStoreMsgNotFound } @@ -3150,14 +3136,18 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { return nil, errDeletedMsg } } - - if mb.cache.off > 0 { + // Detect no cache loaded. + if mb.cache == nil || mb.cache.fseq == 0 || len(mb.cache.idx) == 0 || len(mb.cache.buf) == 0 { + return nil, errNoCache + } + // Check partial cache status. + if seq < mb.cache.fseq { return nil, errPartialCache } bi, _, hashChecked, err := mb.slotInfo(int(seq - mb.cache.fseq)) if err != nil { - return nil, errPartialCache + return nil, err } // Update cache activity. @@ -3176,6 +3166,9 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { } li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + return nil, errPartialCache + } buf := mb.cache.buf[li:] // Parse from the raw buffer. @@ -3188,16 +3181,7 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq) } - sm := &fileStoredMsg{ - subj: subj, - hdr: hdr, - msg: msg, - seq: seq, - ts: ts, - mb: mb, - off: int64(bi), - } - return sm, nil + return &fileStoredMsg{subj, hdr, msg, seq, ts, mb, int64(bi)}, nil } // Will return message for the given sequence number. @@ -3432,7 +3416,8 @@ func (mb *msgBlock) writeIndexInfo() error { if len(mb.dmap) > 0 { buf = append(buf, mb.genDeleteMap()...) } - var err error + + // Open our FD if needed. if mb.ifd == nil { ifd, err := os.OpenFile(mb.ifn, os.O_CREATE|os.O_RDWR, defaultFilePerms) if err != nil { @@ -3441,13 +3426,14 @@ func (mb *msgBlock) writeIndexInfo() error { mb.ifd = ifd } - mb.lwits = time.Now().UnixNano() - // Encrypt if needed. if mb.aek != nil { buf = mb.aek.Seal(buf[:0], mb.nonce, buf, nil) } + mb.lwits = time.Now().UnixNano() + + var err error if n, err = mb.ifd.WriteAt(buf, 0); err == nil { mb.liwsz = int64(n) mb.werr = nil @@ -3668,7 +3654,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } var shouldExpire bool - if !mb.cacheAlreadyLoaded() { + if mb.cacheNotLoaded() { mb.loadMsgsWithLock() shouldExpire = true } @@ -4096,7 +4082,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { defer mb.mu.Unlock() var shouldExpire bool - if !mb.cacheAlreadyLoaded() { + if mb.cacheNotLoaded() { mb.loadMsgsWithLock() shouldExpire = true }