Improvements to cache management.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-09-18 09:51:54 -07:00
parent 0bef39ab5b
commit 7a4c904761

View File

@@ -1118,7 +1118,7 @@ func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (tot
// We need to scan this block to compute the correct number of pending for this block.
// We want to only do this once so we will adjust subs and test against them all here.
if !mb.cacheAlreadyLoaded() {
if mb.cacheNotLoaded() {
mb.loadMsgsWithLock()
shouldExpire = true
}
@@ -1184,6 +1184,7 @@ func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (tot
// If we loaded this block for this operation go ahead and expire it here.
if shouldExpire {
mb.llts = 0
mb.expireCacheLocked()
}
@@ -1751,29 +1752,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
return false, err
}
// If we have a callback grab the message since we need the subject.
// TODO(dlc) - This will cause whole buffer to be loaded which I was trying
// to avoid. Maybe use side cache for subjects or understand when we really need them.
// Meaning if the stream above is only a single subject no need to store, this is just
// for updating stream pending for consumers.
var sm *fileStoredMsg
if fs.scb != nil {
sm, _ = mb.fetchMsg(seq)
}
mb.mu.Lock()
// Check cache. This should be very rare.
if !mb.cacheAlreadyLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
mb.mu.Unlock()
fsUnlock()
return false, err
}
}
// See if the sequence numbers is still relevant. Check first and cache first.
if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) {
// See if the sequence numbers is still relevant.
if seq < mb.first.seq {
mb.mu.Unlock()
fsUnlock()
return false, nil
@@ -1788,14 +1770,29 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
}
}
// We used to not have to load in the messages except with callbacks or the filtered subject state (which is now always on).
// Now just load regardless.
// TODO(dlc) - Figure out a way not to have to load it in, we need subject tracking outside main data block.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
mb.mu.Unlock()
fsUnlock()
return false, err
}
}
sm, err := mb.cacheLookup(seq)
if err != nil {
mb.mu.Unlock()
fsUnlock()
return false, err
}
// Grab size
msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
// Set cache timestamp for last remove.
mb.lrts = time.Now().UnixNano()
// Grab record length from idx.
slot := seq - mb.cache.fseq
ri, rl, _, _ := mb.slotInfo(int(slot))
msz := uint64(rl)
// Global stats
fs.state.Msgs--
fs.state.Bytes -= msz
@@ -1805,21 +1802,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
mb.bytes -= msz
// If we are tracking multiple subjects here make sure we update that accounting.
if mb.fss != nil {
if sm == nil {
if !mb.cacheAlreadyLoaded() {
mb.loadMsgsWithLock()
}
sm, _ = mb.cacheLookup(seq)
}
if sm != nil {
mb.removeSeqPerSubject(sm.subj, seq)
}
}
mb.removeSeqPerSubject(sm.subj, seq)
var shouldWriteIndex, firstSeqNeedsUpdate bool
if secure {
// Grab record info.
ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq))
mb.eraseMsg(seq, int(ri), int(rl))
}
@@ -1911,7 +1900,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
// Write lock needs to be held.
func (mb *msgBlock) compact() {
if !mb.cacheAlreadyLoaded() {
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
@@ -2958,6 +2947,8 @@ func (mb *msgBlock) flushPendingMsgs() error {
mb.cache.wp = 0
// Place buffer back in the cache structure.
mb.cache.buf = buf
// Mark fseq to 0
mb.cache.fseq = 0
}
return mb.werr
@@ -2978,13 +2969,18 @@ func (mb *msgBlock) loadMsgs() error {
// Lock should be held.
func (mb *msgBlock) cacheAlreadyLoaded() bool {
if mb.cache == nil || mb.cache.off != 0 {
if mb.cache == nil || mb.cache.off != 0 || mb.cache.fseq == 0 || len(mb.cache.buf) == 0 {
return false
}
numEntries := mb.msgs + uint64(len(mb.dmap)) + (mb.first.seq - mb.cache.fseq)
return numEntries == uint64(len(mb.cache.idx)) && len(mb.cache.buf) > 0
}
// Lock should be held.
func (mb *msgBlock) cacheNotLoaded() bool {
return !mb.cacheAlreadyLoaded()
}
// Used to load in the block contents.
// Lock should be held and all conditionals satisfied prior.
func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
@@ -3096,19 +3092,13 @@ checkCache:
// Fetch a message from this block, possibly reading in and caching the messages.
// We assume the block was selected and is correct, so we do not do range checks.
func (mb *msgBlock) fetchMsg(seq uint64) (*fileStoredMsg, error) {
var sm *fileStoredMsg
mb.mu.Lock()
defer mb.mu.Unlock()
sm, err := mb.cacheLookup(seq)
if err == nil || (err != errNoCache && err != errPartialCache) {
return sm, err
}
// We have a cache miss here.
if err := mb.loadMsgsWithLock(); err != nil {
return nil, err
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil, err
}
}
return mb.cacheLookup(seq)
}
@@ -3136,11 +3126,7 @@ const ebit = 1 << 63
// Will do a lookup from cache.
// Lock should be held.
func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
if mb.cache == nil || len(mb.cache.idx) == 0 {
return nil, errNoCache
}
if seq < mb.first.seq || seq < mb.cache.fseq || seq > mb.last.seq {
if seq < mb.first.seq || seq > mb.last.seq {
return nil, ErrStoreMsgNotFound
}
@@ -3150,14 +3136,18 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
return nil, errDeletedMsg
}
}
if mb.cache.off > 0 {
// Detect no cache loaded.
if mb.cache == nil || mb.cache.fseq == 0 || len(mb.cache.idx) == 0 || len(mb.cache.buf) == 0 {
return nil, errNoCache
}
// Check partial cache status.
if seq < mb.cache.fseq {
return nil, errPartialCache
}
bi, _, hashChecked, err := mb.slotInfo(int(seq - mb.cache.fseq))
if err != nil {
return nil, errPartialCache
return nil, err
}
// Update cache activity.
@@ -3176,6 +3166,9 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
return nil, errPartialCache
}
buf := mb.cache.buf[li:]
// Parse from the raw buffer.
@@ -3188,16 +3181,7 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq)
}
sm := &fileStoredMsg{
subj: subj,
hdr: hdr,
msg: msg,
seq: seq,
ts: ts,
mb: mb,
off: int64(bi),
}
return sm, nil
return &fileStoredMsg{subj, hdr, msg, seq, ts, mb, int64(bi)}, nil
}
// Will return message for the given sequence number.
@@ -3432,7 +3416,8 @@ func (mb *msgBlock) writeIndexInfo() error {
if len(mb.dmap) > 0 {
buf = append(buf, mb.genDeleteMap()...)
}
var err error
// Open our FD if needed.
if mb.ifd == nil {
ifd, err := os.OpenFile(mb.ifn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
if err != nil {
@@ -3441,13 +3426,14 @@ func (mb *msgBlock) writeIndexInfo() error {
mb.ifd = ifd
}
mb.lwits = time.Now().UnixNano()
// Encrypt if needed.
if mb.aek != nil {
buf = mb.aek.Seal(buf[:0], mb.nonce, buf, nil)
}
mb.lwits = time.Now().UnixNano()
var err error
if n, err = mb.ifd.WriteAt(buf, 0); err == nil {
mb.liwsz = int64(n)
mb.werr = nil
@@ -3668,7 +3654,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
var shouldExpire bool
if !mb.cacheAlreadyLoaded() {
if mb.cacheNotLoaded() {
mb.loadMsgsWithLock()
shouldExpire = true
}
@@ -4096,7 +4082,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
defer mb.mu.Unlock()
var shouldExpire bool
if !mb.cacheAlreadyLoaded() {
if mb.cacheNotLoaded() {
mb.loadMsgsWithLock()
shouldExpire = true
}