Merge pull request #2015 from nats-io/memp

Updates to memory retention in filestore.
This commit is contained in:
Derek Collison
2021-03-18 04:51:14 -07:00
committed by GitHub
2 changed files with 80 additions and 22 deletions

View File

@@ -103,6 +103,7 @@ type msgBlock struct {
lwts int64
llts int64
lrts int64
llseq uint64
hh hash.Hash64
cache *cache
cloads uint64
@@ -718,15 +719,46 @@ func (fs *fileStore) hashKeyForBlock(index uint64) []byte {
return []byte(fmt.Sprintf("%s-%d", fs.cfg.Name, index))
}
func (mb *msgBlock) setupWriteCache(buf []byte) {
// Make sure we have a cache setup.
if mb.cache != nil {
return
}
mb.cache = &cache{buf: buf}
// Make sure we set the proper cache offset if we have existing data.
var fi os.FileInfo
if mb.mfd != nil {
fi, _ = mb.mfd.Stat()
} else {
fi, _ = os.Stat(mb.mfn)
}
if fi != nil {
mb.cache.off = int(fi.Size())
}
mb.startCacheExpireTimer()
}
// This rolls to a new append msg block.
// Lock should be held.
func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
var mbuf []byte
index := uint64(1)
if fs.lmb != nil {
index = fs.lmb.index + 1
if lmb := fs.lmb; lmb != nil {
index = lmb.index + 1
// Determine if we can reclaim resources here.
if fs.fip {
// Reset write timestamp and see if we can expire this cache.
lmb.mu.Lock()
lmb.closeFDsLocked()
lmb.lwts = 0
mbuf = lmb.expireCacheLocked()
lmb.mu.Unlock()
}
}
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire}
mb.setupWriteCache(mbuf)
// Now do local hash.
key := sha256.Sum256(fs.hashKeyForBlock(index))
@@ -755,7 +787,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
// Set cache time to creation time to start.
ts := time.Now().UnixNano()
mb.llts, mb.lrts, mb.lwts = ts, ts, ts
mb.llts, mb.lwts = ts, ts
// Remember our last sequence number.
mb.first.seq = fs.state.LastSeq + 1
@@ -1533,19 +1565,22 @@ func (mb *msgBlock) clearCache() {
func (mb *msgBlock) expireCache() {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.expireCacheLocked()
}
func (mb *msgBlock) expireCacheLocked() []byte {
if mb.cache == nil {
if mb.ctmr != nil {
mb.ctmr.Stop()
mb.ctmr = nil
}
return
return nil
}
// Can't expire if we are flushing or still have pending.
if mb.cache.flush || (len(mb.cache.buf)-int(mb.cache.wp) > 0) {
mb.resetCacheExpireTimer(mb.cexp)
return
return nil
}
// Grab timestamp to compare.
@@ -1560,11 +1595,12 @@ func (mb *msgBlock) expireCache() {
// Check for activity on the cache that would prevent us from expiring.
if tns-bufts <= int64(mb.cexp) {
mb.resetCacheExpireTimer(mb.cexp - time.Duration(tns-bufts))
return
return nil
}
// If we are here we will at least expire the core msg buffer.
// We need to capture offset in case we do a write next before a full load.
buf := mb.cache.buf
mb.cache.off += len(mb.cache.buf)
mb.cache.buf = nil
mb.cache.wp = 0
@@ -1576,6 +1612,8 @@ func (mb *msgBlock) expireCache() {
} else {
mb.resetCacheExpireTimer(mb.cexp)
}
return buf[:0]
}
func (fs *fileStore) startAgeChk() {
@@ -1668,18 +1706,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
mb.mu.Lock()
// Make sure we have a cache setup.
if mb.cache == nil {
mb.cache = &cache{}
// Make sure we set the proper cache offset if we have existing data.
var fi os.FileInfo
if mb.mfd != nil {
fi, _ = mb.mfd.Stat()
} else {
fi, _ = os.Stat(mb.mfn)
}
if fi != nil {
mb.cache.off = int(fi.Size())
}
mb.startCacheExpireTimer()
mb.setupWriteCache(nil)
}
// Indexing
@@ -1802,7 +1829,10 @@ func (mb *msgBlock) setFlushing() {
func (mb *msgBlock) closeFDs() error {
mb.mu.Lock()
defer mb.mu.Unlock()
return mb.closeFDsLocked()
}
func (mb *msgBlock) closeFDsLocked() error {
if buf, err := mb.bytesPending(); err == errFlushRunning || len(buf) > 0 {
return errPendingData
}
@@ -1871,9 +1901,6 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
// Grab our current last message block.
mb := fs.lmb
if mb == nil || mb.numBytes()+rl > fs.fcfg.BlockSize {
if fs.fip {
mb.closeFDs()
}
if mb, err = fs.newMsgBlockForWrite(); err != nil {
return 0, err
}
@@ -2312,6 +2339,7 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) {
// Update cache activity.
mb.llts = time.Now().UnixNano()
mb.llseq = seq
// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
@@ -2368,10 +2396,31 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
}
return nil, err
}
// Check to see if we are the last seq for this message block and are doing
// a linear scan. If that is true and no other write activity is present
// try to expire the cache.
mb.mu.RLock()
shouldTryExpire := seq == mb.last.seq && mb.llseq == seq-1
mb.mu.RUnlock()
// TODO(dlc) - older design had a check to prefetch when we knew we were
// loading in order and getting close to end of current mb. Should add
// something like it back in.
return mb.fetchMsg(seq)
fsm, err := mb.fetchMsg(seq)
if err != nil {
return nil, err
}
// We detected a linear scan and access to the last message.
if shouldTryExpire {
mb.mu.Lock()
mb.llts = 0
mb.expireCacheLocked()
mb.mu.Unlock()
}
return fsm, nil
}
// Internal function to return msg parts from a raw buffer.
@@ -2922,7 +2971,10 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
}
// Check for us being last message block
if mb == fs.lmb {
// Creating a new message write block requires that the lmb lock is not held.
mb.mu.Unlock()
fs.newMsgBlockForWrite()
mb.mu.Lock()
}
}

View File

@@ -2383,6 +2383,9 @@ type jsOutQ struct {
}
func (q *jsOutQ) pending() *jsPubMsg {
if q == nil {
return nil
}
q.mu.Lock()
head := q.head
q.head, q.tail = nil, nil
@@ -2391,6 +2394,9 @@ func (q *jsOutQ) pending() *jsPubMsg {
}
func (q *jsOutQ) send(msg *jsPubMsg) {
if q == nil || msg == nil {
return
}
q.mu.Lock()
var notify bool
if q.head == nil {