diff --git a/server/filestore.go b/server/filestore.go index 2cd718f0..a1b13edd 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -137,6 +137,49 @@ type msgBlock struct { closed bool } +// For handling msgBlock buffers more efficiently and be friendlier to GC. +// Generate a new blk buffer. +func newBlkBuffer(sz int) []byte { + if sz <= defaultOtherBlockSize/2 || sz > defaultStreamBlockSize { + return make([]byte, 0, sz) + } + var p *sync.Pool + if sz <= defaultOtherBlockSize { + p = &mb8Free + } else { + p = &mb16Free + } + bp := p.Get().(*[]byte) + return *bp +} + +func freeBlkBuffer(buf []byte) { + sz := cap(buf) + if sz <= defaultOtherBlockSize/2 || sz > defaultStreamBlockSize { + return + } + buf = buf[:0] + if sz <= defaultOtherBlockSize { + mb8Free.Put(&buf) + } else { + mb16Free.Put(&buf) + } +} + +// Just care for now about 2 cases. +var mb16Free = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 0, defaultStreamBlockSize) + return &buf + }, +} +var mb8Free = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 0, defaultOtherBlockSize) + return &buf + }, +} + // Write through caching layer that is also used on loading messages. type cache struct { buf []byte @@ -526,7 +569,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e // If we created keys here, let's check the data and if it is plaintext convert here. if createdKeys { - buf, err := ioutil.ReadFile(mb.mfn) + buf, err := mb.loadBlock() if err != nil { return nil, err } @@ -638,7 +681,7 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { mb.last.seq, mb.last.ts = 0, 0 firstNeedsSet := true - buf, err := ioutil.ReadFile(mb.mfn) + buf, err := mb.loadBlock() if err != nil { return nil, err } @@ -913,7 +956,12 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { func (mb *msgBlock) filteredPending(subj string, wc bool, seq uint64) (total, first, last uint64) { mb.mu.Lock() defer mb.mu.Unlock() + return mb.filteredPendingLocked(subj, wc, seq) +} +// This will traverse a message block and generate the filtered pending. +// Lock should be held. +func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (total, first, last uint64) { if mb.fss == nil { return 0, 0, 0 } @@ -1143,15 +1191,12 @@ func (fs *fileStore) hashKeyForBlock(index uint64) []byte { return []byte(fmt.Sprintf("%s-%d", fs.cfg.Name, index)) } -func (mb *msgBlock) setupWriteCache(buf []byte) { +func (mb *msgBlock) setupWriteCache() { // Make sure we have a cache setup. if mb.cache != nil { return } - if buf != nil { - buf = buf[:0] - } - mb.cache = &cache{buf: buf} + mb.cache = &cache{buf: newBlkBuffer(int(mb.fs.fcfg.BlockSize))} // Make sure we set the proper cache offset if we have existing data. var fi os.FileInfo if mb.mfd != nil { @@ -1168,7 +1213,6 @@ func (mb *msgBlock) setupWriteCache(buf []byte) { // This rolls to a new append msg block. // Lock should be held. func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { - var mbuf []byte index := uint64(1) if lmb := fs.lmb; lmb != nil { index = lmb.index + 1 @@ -1179,7 +1223,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { lmb.mu.Lock() lmb.closeFDsLocked() lmb.lwts = 0 - mbuf = lmb.expireCacheLocked() + lmb.expireCacheLocked() lmb.mu.Unlock() } } @@ -1188,7 +1232,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { // Lock should be held to quiet race detector. mb.mu.Lock() - mb.setupWriteCache(mbuf) + mb.setupWriteCache() if fs.tms { mb.fss = make(map[string]*SimpleState) } @@ -1233,7 +1277,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { ts := time.Now().UnixNano() // Race detector wants these protected. mb.mu.Lock() - mb.llts, mb.lwts = ts, ts + mb.llts, mb.lwts = 0, ts mb.mu.Unlock() // Remember our last sequence number. @@ -2087,19 +2131,19 @@ func (mb *msgBlock) expireCache() { mb.expireCacheLocked() } -func (mb *msgBlock) expireCacheLocked() []byte { +func (mb *msgBlock) expireCacheLocked() { if mb.cache == nil { if mb.ctmr != nil { mb.ctmr.Stop() mb.ctmr = nil } - return nil + return } // Can't expire if we are flushing or still have pending. if mb.cache.flush || (len(mb.cache.buf)-int(mb.cache.wp) > 0) { mb.resetCacheExpireTimer(mb.cexp) - return nil + return } // Grab timestamp to compare. @@ -2114,7 +2158,7 @@ func (mb *msgBlock) expireCacheLocked() []byte { // Check for activity on the cache that would prevent us from expiring. if tns-bufts <= int64(mb.cexp) { mb.resetCacheExpireTimer(mb.cexp - time.Duration(tns-bufts)) - return nil + return } // If we are here we will at least expire the core msg buffer. @@ -2123,6 +2167,7 @@ func (mb *msgBlock) expireCacheLocked() []byte { mb.cache.off += len(mb.cache.buf) mb.cache.buf = nil mb.cache.wp = 0 + freeBlkBuffer(buf) // The idx is used in removes, and will have a longer timeframe. // See if we should also remove the idx. @@ -2131,8 +2176,6 @@ func (mb *msgBlock) expireCacheLocked() []byte { } else { mb.resetCacheExpireTimer(mb.cexp) } - - return buf[:0] } func (fs *fileStore) startAgeChk() { @@ -2221,7 +2264,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte mb.mu.Lock() // Make sure we have a cache setup. if mb.cache == nil { - mb.setupWriteCache(nil) + mb.setupWriteCache() } // Indexing @@ -2755,6 +2798,42 @@ func (mb *msgBlock) cacheAlreadyLoaded() bool { return mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 && len(mb.cache.buf) > 0 } +// Used to load in the block contents. +// Lock should be held and all conditionals satisfied prior. +func (mb *msgBlock) loadBlock() ([]byte, error) { + f, err := os.Open(mb.mfn) + if err != nil { + return nil, err + } + defer f.Close() + + var sz int + if info, err := f.Stat(); err == nil { + sz64 := info.Size() + if int64(int(sz64)) == sz64 { + sz = int(sz64) + } + } + + sz++ // one byte for final read at EOF + buf := newBlkBuffer(sz) + + for { + if len(buf) >= cap(buf) { + d := append(buf[:cap(buf)], 0) + buf = d[:len(buf)] + } + n, err := f.Read(buf[len(buf):cap(buf)]) + buf = buf[:len(buf)+n] + if err != nil { + if err == io.EOF { + err = nil + } + return buf, err + } + } +} + func (mb *msgBlock) loadMsgsWithLock() error { // Check to see if we are loading already. if mb.loading { @@ -2778,7 +2857,6 @@ checkCache: return nil } - mfn := mb.mfn mb.llts = time.Now().UnixNano() // FIXME(dlc) - We could be smarter here. @@ -2794,7 +2872,7 @@ checkCache: // Load in the whole block. We want to hold the mb lock here to avoid any changes to // state. - buf, err := ioutil.ReadFile(mfn) + buf, err := mb.loadBlock() if err != nil { return err } @@ -3363,29 +3441,87 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } return 0, nil } - eq := compareFn(subject) - if ss := fs.FilteredState(1, subject); ss.Msgs > 0 { - if keep > 0 { - if keep >= ss.Msgs { - return 0, nil - } - ss.Msgs -= keep + + eq, wc := compareFn(subject), subjectHasWildcard(subject) + var firstSeqNeedsUpdate bool + + // If we have a "keep" designation need to get full filtered state so we know how many to purge. + var maxp uint64 + if keep > 0 { + ss := fs.FilteredState(1, subject) + if keep >= ss.Msgs { + return 0, nil } - last := ss.Last - if sequence > 0 { - last = sequence - 1 + maxp = ss.Msgs - keep + } + + fs.mu.Lock() + for _, mb := range fs.blks { + mb.mu.Lock() + t, f, l := mb.filteredPendingLocked(subject, wc, mb.first.seq) + if t == 0 { + mb.mu.Unlock() + continue } - for seq := ss.First; seq <= last; seq++ { - if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subject) { - if ok, _ := fs.removeMsg(sm.seq, false, true); ok { - purged++ - if purged >= ss.Msgs { - break + + var shouldExpire bool + if !mb.cacheAlreadyLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + if sequence > 0 && sequence <= l { + l = sequence - 1 + } + for seq := f; seq <= l; seq++ { + if sm, _ := mb.cacheLookupWithLock(seq); sm != nil && eq(sm.subj, subject) { + rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) + // Do fast in place remove. + // Stats + fs.state.Msgs-- + fs.state.Bytes -= rl + mb.msgs-- + mb.bytes -= rl + // FSS updates. + mb.removeSeqPerSubject(sm.subj, seq) + // Check for first message. + if seq == mb.first.seq { + mb.selectNextFirst() + if mb.isEmpty() { + fs.removeMsgBlock(mb) + firstSeqNeedsUpdate = seq == fs.state.FirstSeq + } else if seq == fs.state.FirstSeq { + fs.state.FirstSeq = mb.first.seq // new one. + fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() } + } else { + // Out of order delete. + if mb.dmap == nil { + mb.dmap = make(map[uint64]struct{}) + } + mb.dmap[seq] = struct{}{} + } + purged++ + if maxp > 0 && purged >= maxp { + break } } } + // Expire if we were responsible for loading. + if shouldExpire { + // Expire this cache before moving on. + mb.llts = 0 + mb.expireCacheLocked() + } + + mb.mu.Unlock() + // Update our index info on disk. + mb.writeIndexInfo() } + if firstSeqNeedsUpdate { + fs.selectNextFirst() + } + + fs.mu.Unlock() return purged, nil } @@ -4031,8 +4167,7 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ } // We could stream but don't want to hold the lock and prevent changes, so just read in and // release the lock for now. - // TODO(dlc) - Maybe reuse buffer? - buf, err = ioutil.ReadFile(mb.mfn) + buf, err = mb.loadBlock() if err != nil { mb.mu.Unlock() writeErr(fmt.Sprintf("Could not read message block [%d]: %v", mb.index, err)) diff --git a/server/norace_test.go b/server/norace_test.go index 4d6b8d52..a6fec7d7 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1757,8 +1757,6 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) { } func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { - t.Skip("fails always") - cerr := func(t *testing.T, err error) { t.Helper() if err != nil { @@ -1769,6 +1767,10 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + nc, js := jsClientConnect(t, s) defer nc.Close() @@ -1779,28 +1781,20 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { }) cerr(t, err) - // 100kb messages spread over 1000 subjects + // 100kb messages spread over 1000 different subjects body := make([]byte, 100*1024) - for i := 0; i < 100000; i++ { - err := nc.Publish(fmt.Sprintf("kv.%d", i%1000), body) - cerr(t, err) + for i := 0; i < 50000; i++ { + if _, err := js.PublishAsync(fmt.Sprintf("kv.%d", i%1000), body); err != nil { + cerr(t, err) + } } - si, err = js.StreamInfo("KV") - cerr(t, err) - if si == nil || si.Config.Name != "KV" { - t.Fatalf("StreamInfo is not correct %+v", si) - } - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { - si, err = js.StreamInfo("KV") - if err != nil { + if si, err = js.StreamInfo("KV"); err != nil { return err } - - if si.State.Msgs == 100000 { + if si.State.Msgs == 50000 { return nil } - return fmt.Errorf("waiting for more") }) @@ -1815,8 +1809,12 @@ func TestNoRaceJetStreamClusterExtendedStreamPurgeStall(t *testing.T) { if !pres.Success { t.Fatalf("purge failed: %#v", pres) } - if elapsed > time.Second { - t.Fatalf("Purge took %s", elapsed) + if elapsed > 5*time.Second { + t.Fatalf("Purge took too long %s", elapsed) + } + v, _ := s.Varz(nil) + if v.Mem > 600*1024*1024 { // 600MB limit nbut in practice < 100MB -> Was ~7GB when failing. + t.Fatalf("Used too much memory: %v", friendlyBytes(v.Mem)) } }