Merge pull request #2306 from nats-io/fspm

Improvements to filtered purge and general memory usage for filestore.
This commit is contained in:
Derek Collison
2021-06-22 16:13:38 -07:00
committed by GitHub
2 changed files with 194 additions and 57 deletions

View File

@@ -137,6 +137,49 @@ type msgBlock struct {
closed bool
}
// For handling msgBlock buffers more efficiently and be friendlier to GC.
// Generate a new blk buffer.
func newBlkBuffer(sz int) []byte {
if sz <= defaultOtherBlockSize/2 || sz > defaultStreamBlockSize {
return make([]byte, 0, sz)
}
var p *sync.Pool
if sz <= defaultOtherBlockSize {
p = &mb8Free
} else {
p = &mb16Free
}
bp := p.Get().(*[]byte)
return *bp
}
func freeBlkBuffer(buf []byte) {
sz := cap(buf)
if sz <= defaultOtherBlockSize/2 || sz > defaultStreamBlockSize {
return
}
buf = buf[:0]
if sz <= defaultOtherBlockSize {
mb8Free.Put(&buf)
} else {
mb16Free.Put(&buf)
}
}
// Just care for now about 2 cases.
var mb16Free = sync.Pool{
New: func() interface{} {
buf := make([]byte, 0, defaultStreamBlockSize)
return &buf
},
}
var mb8Free = sync.Pool{
New: func() interface{} {
buf := make([]byte, 0, defaultOtherBlockSize)
return &buf
},
}
// Write through caching layer that is also used on loading messages.
type cache struct {
buf []byte
@@ -526,12 +569,13 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e
// If we created keys here, let's check the data and if it is plaintext convert here.
if createdKeys {
buf, err := ioutil.ReadFile(mb.mfn)
buf, err := mb.loadBlock()
if err != nil {
return nil, err
}
if err := mb.indexCacheBuf(buf); err != nil {
// This likely indicates this was already encrypted or corrupt.
mb.cache = nil
return nil, err
}
// Undo cache from above for later.
@@ -544,6 +588,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e
if err := ioutil.WriteFile(mb.mfn, buf, defaultFilePerms); err != nil {
return nil, err
}
freeBlkBuffer(buf)
// Remove the index file here since it will be in plaintext as well so we just rebuild.
os.Remove(mb.ifn)
}
@@ -638,10 +683,11 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
mb.last.seq, mb.last.ts = 0, 0
firstNeedsSet := true
buf, err := ioutil.ReadFile(mb.mfn)
buf, err := mb.loadBlock()
if err != nil {
return nil, err
}
defer freeBlkBuffer(buf)
// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
@@ -913,7 +959,12 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
func (mb *msgBlock) filteredPending(subj string, wc bool, seq uint64) (total, first, last uint64) {
mb.mu.Lock()
defer mb.mu.Unlock()
return mb.filteredPendingLocked(subj, wc, seq)
}
// This will traverse a message block and generate the filtered pending.
// Lock should be held.
func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (total, first, last uint64) {
if mb.fss == nil {
return 0, 0, 0
}
@@ -1143,15 +1194,12 @@ func (fs *fileStore) hashKeyForBlock(index uint64) []byte {
return []byte(fmt.Sprintf("%s-%d", fs.cfg.Name, index))
}
func (mb *msgBlock) setupWriteCache(buf []byte) {
func (mb *msgBlock) setupWriteCache() {
// Make sure we have a cache setup.
if mb.cache != nil {
return
}
if buf != nil {
buf = buf[:0]
}
mb.cache = &cache{buf: buf}
mb.cache = &cache{buf: newBlkBuffer(int(mb.fs.fcfg.BlockSize))}
// Make sure we set the proper cache offset if we have existing data.
var fi os.FileInfo
if mb.mfd != nil {
@@ -1168,7 +1216,6 @@ func (mb *msgBlock) setupWriteCache(buf []byte) {
// This rolls to a new append msg block.
// Lock should be held.
func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
var mbuf []byte
index := uint64(1)
if lmb := fs.lmb; lmb != nil {
index = lmb.index + 1
@@ -1179,7 +1226,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
lmb.mu.Lock()
lmb.closeFDsLocked()
lmb.lwts = 0
mbuf = lmb.expireCacheLocked()
lmb.expireCacheLocked()
lmb.mu.Unlock()
}
}
@@ -1188,7 +1235,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
// Lock should be held to quiet race detector.
mb.mu.Lock()
mb.setupWriteCache(mbuf)
mb.setupWriteCache()
if fs.tms {
mb.fss = make(map[string]*SimpleState)
}
@@ -1233,7 +1280,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
ts := time.Now().UnixNano()
// Race detector wants these protected.
mb.mu.Lock()
mb.llts, mb.lwts = ts, ts
mb.llts, mb.lwts = 0, ts
mb.mu.Unlock()
// Remember our last sequence number.
@@ -2087,19 +2134,19 @@ func (mb *msgBlock) expireCache() {
mb.expireCacheLocked()
}
func (mb *msgBlock) expireCacheLocked() []byte {
func (mb *msgBlock) expireCacheLocked() {
if mb.cache == nil {
if mb.ctmr != nil {
mb.ctmr.Stop()
mb.ctmr = nil
}
return nil
return
}
// Can't expire if we are flushing or still have pending.
if mb.cache.flush || (len(mb.cache.buf)-int(mb.cache.wp) > 0) {
mb.resetCacheExpireTimer(mb.cexp)
return nil
return
}
// Grab timestamp to compare.
@@ -2114,7 +2161,7 @@ func (mb *msgBlock) expireCacheLocked() []byte {
// Check for activity on the cache that would prevent us from expiring.
if tns-bufts <= int64(mb.cexp) {
mb.resetCacheExpireTimer(mb.cexp - time.Duration(tns-bufts))
return nil
return
}
// If we are here we will at least expire the core msg buffer.
@@ -2123,6 +2170,7 @@ func (mb *msgBlock) expireCacheLocked() []byte {
mb.cache.off += len(mb.cache.buf)
mb.cache.buf = nil
mb.cache.wp = 0
freeBlkBuffer(buf)
// The idx is used in removes, and will have a longer timeframe.
// See if we should also remove the idx.
@@ -2131,8 +2179,6 @@ func (mb *msgBlock) expireCacheLocked() []byte {
} else {
mb.resetCacheExpireTimer(mb.cexp)
}
return buf[:0]
}
func (fs *fileStore) startAgeChk() {
@@ -2221,7 +2267,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
mb.mu.Lock()
// Make sure we have a cache setup.
if mb.cache == nil {
mb.setupWriteCache(nil)
mb.setupWriteCache()
}
// Indexing
@@ -2755,6 +2801,42 @@ func (mb *msgBlock) cacheAlreadyLoaded() bool {
return mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 && len(mb.cache.buf) > 0
}
// Used to load in the block contents.
// Lock should be held and all conditionals satisfied prior.
func (mb *msgBlock) loadBlock() ([]byte, error) {
f, err := os.Open(mb.mfn)
if err != nil {
return nil, err
}
defer f.Close()
var sz int
if info, err := f.Stat(); err == nil {
sz64 := info.Size()
if int64(int(sz64)) == sz64 {
sz = int(sz64)
}
}
sz++ // one byte for final read at EOF
buf := newBlkBuffer(sz)
for {
if len(buf) >= cap(buf) {
d := append(buf[:cap(buf)], 0)
buf = d[:len(buf)]
}
n, err := f.Read(buf[len(buf):cap(buf)])
buf = buf[:len(buf)+n]
if err != nil {
if err == io.EOF {
err = nil
}
return buf, err
}
}
}
func (mb *msgBlock) loadMsgsWithLock() error {
// Check to see if we are loading already.
if mb.loading {
@@ -2778,7 +2860,6 @@ checkCache:
return nil
}
mfn := mb.mfn
mb.llts = time.Now().UnixNano()
// FIXME(dlc) - We could be smarter here.
@@ -2794,7 +2875,7 @@ checkCache:
// Load in the whole block. We want to hold the mb lock here to avoid any changes to
// state.
buf, err := ioutil.ReadFile(mfn)
buf, err := mb.loadBlock()
if err != nil {
return err
}
@@ -3363,29 +3444,87 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
return 0, nil
}
eq := compareFn(subject)
if ss := fs.FilteredState(1, subject); ss.Msgs > 0 {
if keep > 0 {
if keep >= ss.Msgs {
return 0, nil
}
ss.Msgs -= keep
eq, wc := compareFn(subject), subjectHasWildcard(subject)
var firstSeqNeedsUpdate bool
// If we have a "keep" designation need to get full filtered state so we know how many to purge.
var maxp uint64
if keep > 0 {
ss := fs.FilteredState(1, subject)
if keep >= ss.Msgs {
return 0, nil
}
last := ss.Last
if sequence > 0 {
last = sequence - 1
maxp = ss.Msgs - keep
}
fs.mu.Lock()
for _, mb := range fs.blks {
mb.mu.Lock()
t, f, l := mb.filteredPendingLocked(subject, wc, mb.first.seq)
if t == 0 {
mb.mu.Unlock()
continue
}
for seq := ss.First; seq <= last; seq++ {
if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subject) {
if ok, _ := fs.removeMsg(sm.seq, false, true); ok {
purged++
if purged >= ss.Msgs {
break
var shouldExpire bool
if !mb.cacheAlreadyLoaded() {
mb.loadMsgsWithLock()
shouldExpire = true
}
if sequence > 0 && sequence <= l {
l = sequence - 1
}
for seq := f; seq <= l; seq++ {
if sm, _ := mb.cacheLookupWithLock(seq); sm != nil && eq(sm.subj, subject) {
rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
// Do fast in place remove.
// Stats
fs.state.Msgs--
fs.state.Bytes -= rl
mb.msgs--
mb.bytes -= rl
// FSS updates.
mb.removeSeqPerSubject(sm.subj, seq)
// Check for first message.
if seq == mb.first.seq {
mb.selectNextFirst()
if mb.isEmpty() {
fs.removeMsgBlock(mb)
firstSeqNeedsUpdate = seq == fs.state.FirstSeq
} else if seq == fs.state.FirstSeq {
fs.state.FirstSeq = mb.first.seq // new one.
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
} else {
// Out of order delete.
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
mb.dmap[seq] = struct{}{}
}
purged++
if maxp > 0 && purged >= maxp {
break
}
}
}
// Expire if we were responsible for loading.
if shouldExpire {
// Expire this cache before moving on.
mb.llts = 0
mb.expireCacheLocked()
}
mb.mu.Unlock()
// Update our index info on disk.
mb.writeIndexInfo()
}
if firstSeqNeedsUpdate {
fs.selectNextFirst()
}
fs.mu.Unlock()
return purged, nil
}
@@ -4031,8 +4170,7 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ
}
// We could stream but don't want to hold the lock and prevent changes, so just read in and
// release the lock for now.
// TODO(dlc) - Maybe reuse buffer?
buf, err = ioutil.ReadFile(mb.mfn)
buf, err = mb.loadBlock()
if err != nil {
mb.mu.Unlock()
writeErr(fmt.Sprintf("Could not read message block [%d]: %v", mb.index, err))
@@ -4053,6 +4191,7 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ
if writeFile(msgPre+fmt.Sprintf(blkScan, mb.index), buf) != nil {
return
}
freeBlkBuffer(buf)
}
// Bail if no consumers requested.

View File

@@ -1757,8 +1757,6 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) {
}
func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) {
t.Skip("fails always")
cerr := func(t *testing.T, err error) {
t.Helper()
if err != nil {
@@ -1769,6 +1767,10 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
@@ -1779,28 +1781,20 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) {
})
cerr(t, err)
// 100kb messages spread over 1000 subjects
// 100kb messages spread over 1000 different subjects
body := make([]byte, 100*1024)
for i := 0; i < 100000; i++ {
err := nc.Publish(fmt.Sprintf("kv.%d", i%1000), body)
cerr(t, err)
for i := 0; i < 50000; i++ {
if _, err := js.PublishAsync(fmt.Sprintf("kv.%d", i%1000), body); err != nil {
cerr(t, err)
}
}
si, err = js.StreamInfo("KV")
cerr(t, err)
if si == nil || si.Config.Name != "KV" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
si, err = js.StreamInfo("KV")
if err != nil {
if si, err = js.StreamInfo("KV"); err != nil {
return err
}
if si.State.Msgs == 100000 {
if si.State.Msgs == 50000 {
return nil
}
return fmt.Errorf("waiting for more")
})
@@ -1815,8 +1809,12 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) {
if !pres.Success {
t.Fatalf("purge failed: %#v", pres)
}
if elapsed > time.Second {
t.Fatalf("Purge took %s", elapsed)
if elapsed > 5*time.Second {
t.Fatalf("Purge took too long %s", elapsed)
}
v, _ := s.Varz(nil)
if v.Mem > 600*1024*1024 { // 600MB limit nbut in practice < 100MB -> Was ~7GB when failing.
t.Fatalf("Used too much memory: %v", friendlyBytes(v.Mem))
}
}