From 601dbf48c1531c851406bd9585c97d3971e40223 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 17 Mar 2021 18:51:18 -0700 Subject: [PATCH 1/2] Protect against the stream nil'ing out the outq for the consumer Signed-off-by: Derek Collison --- server/stream.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/stream.go b/server/stream.go index 9ea748e5..c0f29533 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2383,6 +2383,9 @@ type jsOutQ struct { } func (q *jsOutQ) pending() *jsPubMsg { + if q == nil { + return nil + } q.mu.Lock() head := q.head q.head, q.tail = nil, nil @@ -2391,6 +2394,9 @@ func (q *jsOutQ) pending() *jsPubMsg { } func (q *jsOutQ) send(msg *jsPubMsg) { + if q == nil || msg == nil { + return + } q.mu.Lock() var notify bool if q.head == nil { From 566937a880532ad712e516dd6a47a9ab43c1db68 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 17 Mar 2021 18:51:34 -0700 Subject: [PATCH 2/2] Updates to memory pressure. 1. Release cache during heavy writes when we move to a new block if no read activity. 2. If we detect a linear scan reading dump the cache on the last seq read for the mb. Signed-off-by: Derek Collison --- server/filestore.go | 96 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 74 insertions(+), 22 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 1de11b2f..edf68986 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -103,6 +103,7 @@ type msgBlock struct { lwts int64 llts int64 lrts int64 + llseq uint64 hh hash.Hash64 cache *cache cloads uint64 @@ -718,15 +719,46 @@ func (fs *fileStore) hashKeyForBlock(index uint64) []byte { return []byte(fmt.Sprintf("%s-%d", fs.cfg.Name, index)) } +func (mb *msgBlock) setupWriteCache(buf []byte) { + // Make sure we have a cache setup. + if mb.cache != nil { + return + } + mb.cache = &cache{buf: buf} + // Make sure we set the proper cache offset if we have existing data. + var fi os.FileInfo + if mb.mfd != nil { + fi, _ = mb.mfd.Stat() + } else { + fi, _ = os.Stat(mb.mfn) + } + if fi != nil { + mb.cache.off = int(fi.Size()) + } + mb.startCacheExpireTimer() +} + // This rolls to a new append msg block. // Lock should be held. func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { + var mbuf []byte index := uint64(1) - if fs.lmb != nil { - index = fs.lmb.index + 1 + if lmb := fs.lmb; lmb != nil { + index = lmb.index + 1 + + // Determine if we can reclaim resources here. + if fs.fip { + // Reset write timestamp and see if we can expire this cache. + lmb.mu.Lock() + lmb.closeFDsLocked() + lmb.lwts = 0 + mbuf = lmb.expireCacheLocked() + lmb.mu.Unlock() + } } mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire} + mb.setupWriteCache(mbuf) // Now do local hash. key := sha256.Sum256(fs.hashKeyForBlock(index)) @@ -755,7 +787,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { // Set cache time to creation time to start. ts := time.Now().UnixNano() - mb.llts, mb.lrts, mb.lwts = ts, ts, ts + mb.llts, mb.lwts = ts, ts // Remember our last sequence number. mb.first.seq = fs.state.LastSeq + 1 @@ -1533,19 +1565,22 @@ func (mb *msgBlock) clearCache() { func (mb *msgBlock) expireCache() { mb.mu.Lock() defer mb.mu.Unlock() + mb.expireCacheLocked() +} +func (mb *msgBlock) expireCacheLocked() []byte { if mb.cache == nil { if mb.ctmr != nil { mb.ctmr.Stop() mb.ctmr = nil } - return + return nil } // Can't expire if we are flushing or still have pending. if mb.cache.flush || (len(mb.cache.buf)-int(mb.cache.wp) > 0) { mb.resetCacheExpireTimer(mb.cexp) - return + return nil } // Grab timestamp to compare. @@ -1560,11 +1595,12 @@ func (mb *msgBlock) expireCache() { // Check for activity on the cache that would prevent us from expiring. if tns-bufts <= int64(mb.cexp) { mb.resetCacheExpireTimer(mb.cexp - time.Duration(tns-bufts)) - return + return nil } // If we are here we will at least expire the core msg buffer. // We need to capture offset in case we do a write next before a full load. + buf := mb.cache.buf mb.cache.off += len(mb.cache.buf) mb.cache.buf = nil mb.cache.wp = 0 @@ -1576,6 +1612,8 @@ func (mb *msgBlock) expireCache() { } else { mb.resetCacheExpireTimer(mb.cexp) } + + return buf[:0] } func (fs *fileStore) startAgeChk() { @@ -1668,18 +1706,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte mb.mu.Lock() // Make sure we have a cache setup. if mb.cache == nil { - mb.cache = &cache{} - // Make sure we set the proper cache offset if we have existing data. - var fi os.FileInfo - if mb.mfd != nil { - fi, _ = mb.mfd.Stat() - } else { - fi, _ = os.Stat(mb.mfn) - } - if fi != nil { - mb.cache.off = int(fi.Size()) - } - mb.startCacheExpireTimer() + mb.setupWriteCache(nil) } // Indexing @@ -1802,7 +1829,10 @@ func (mb *msgBlock) setFlushing() { func (mb *msgBlock) closeFDs() error { mb.mu.Lock() defer mb.mu.Unlock() + return mb.closeFDsLocked() +} +func (mb *msgBlock) closeFDsLocked() error { if buf, err := mb.bytesPending(); err == errFlushRunning || len(buf) > 0 { return errPendingData } @@ -1871,9 +1901,6 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg // Grab our current last message block. mb := fs.lmb if mb == nil || mb.numBytes()+rl > fs.fcfg.BlockSize { - if fs.fip { - mb.closeFDs() - } if mb, err = fs.newMsgBlockForWrite(); err != nil { return 0, err } @@ -2312,6 +2339,7 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) { // Update cache activity. mb.llts = time.Now().UnixNano() + mb.llseq = seq // We use the high bit to denote we have already checked the checksum. var hh hash.Hash64 @@ -2368,10 +2396,31 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) { } return nil, err } + + // Check to see if we are the last seq for this message block and are doing + // a linear scan. If that is true and no other write activity is present + // try to expire the cache. + mb.mu.RLock() + shouldTryExpire := seq == mb.last.seq && mb.llseq == seq-1 + mb.mu.RUnlock() + // TODO(dlc) - older design had a check to prefetch when we knew we were // loading in order and getting close to end of current mb. Should add // something like it back in. - return mb.fetchMsg(seq) + fsm, err := mb.fetchMsg(seq) + if err != nil { + return nil, err + } + + // We detected a linear scan and access to the last message. + if shouldTryExpire { + mb.mu.Lock() + mb.llts = 0 + mb.expireCacheLocked() + mb.mu.Unlock() + } + + return fsm, nil } // Internal function to return msg parts from a raw buffer. @@ -2922,7 +2971,10 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { } // Check for us being last message block if mb == fs.lmb { + // Creating a new message write block requires that the lmb lock is not held. + mb.mu.Unlock() fs.newMsgBlockForWrite() + mb.mu.Lock() } }