From 3b235059fa313a63c1113ec485e0dd59299a50cd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 5 Aug 2023 12:33:30 -0700 Subject: [PATCH] We were trying to be too smart to save space at the expense of encoding time for filestore. Revert back to very simple but way faster method. Sometimes 100x faster and only ~8% size increase. Signed-off-by: Derek Collison --- server/filestore.go | 93 ++++++++++++++++------------------------ server/filestore_test.go | 37 ---------------- 2 files changed, 37 insertions(+), 93 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index b73b3497..7836db01 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6887,8 +6887,21 @@ func (fs *fileStore) fileStoreConfig() FileStoreConfig { return fs.fcfg } -// When we will write a run length encoded record vs adding to the existing avl.SequenceSet. -const rlThresh = 4096 +// Read lock all existing message blocks. +// Lock held on entry. +func (fs *fileStore) readLockAllMsgBlocks() { + for _, mb := range fs.blks { + mb.mu.RLock() + } +} + +// Read unlock all existing message blocks. +// Lock held on entry. +func (fs *fileStore) readUnlockAllMsgBlocks() { + for _, mb := range fs.blks { + mb.mu.RUnlock() + } +} // Binary encoded state snapshot, >= v2.10 server. func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { @@ -6919,6 +6932,10 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { if numDeleted > 0 { var scratch [4 * 1024]byte + + fs.readLockAllMsgBlocks() + defer fs.readUnlockAllMsgBlocks() + for _, db := range fs.deleteBlocks() { switch db := db.(type) { case *DeleteRange: @@ -6943,68 +6960,23 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { return b, nil } -// Lock should be held. +// We used to be more sophisticated to save memory, but speed is more important. +// All blocks should be at least read locked. func (fs *fileStore) deleteBlocks() DeleteBlocks { - var ( - dbs DeleteBlocks - adm *avl.SequenceSet - prevLast uint64 - ) + var dbs DeleteBlocks + var prevLast uint64 + for _, mb := range fs.blks { - mb.mu.RLock() // Detect if we have a gap between these blocks. if prevLast > 0 && prevLast+1 != mb.first.seq { - // Detect if we need to encode a run length encoding here. - if gap := mb.first.seq - prevLast - 1; gap > rlThresh { - // Check if we have a running adm, if so write that out first, or if contigous update rle params. - if min, max, num := adm.State(); num > 0 { - // Check if we are all contingous. - if num == max-min+1 { - prevLast, gap = min-1, mb.first.seq-min - } else { - dbs = append(dbs, adm) - } - // Always nil out here. - adm = nil - } - dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap}) - } else { - // Common dmap - if adm == nil { - adm = &avl.SequenceSet{} - adm.SetInitialMin(prevLast + 1) - } - for seq := prevLast + 1; seq < mb.first.seq; seq++ { - adm.Insert(seq) - } - } + gap := mb.first.seq - prevLast - 1 + dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap}) } - if min, max, num := mb.dmap.State(); num > 0 { - // Check in case the mb's dmap is contiguous and over our threshold. - if num == max-min+1 && num > rlThresh { - // Need to write out adm if it exists. - if adm != nil && adm.Size() > 0 { - dbs = append(dbs, adm) - adm = nil - } - dbs = append(dbs, &DeleteRange{First: min, Num: max - min + 1}) - } else { - // Aggregated dmap - if adm == nil { - adm = mb.dmap.Clone() - } else { - adm.Union(&mb.dmap) - } - } + if mb.dmap.Size() > 0 { + dbs = append(dbs, &mb.dmap) } prevLast = mb.last.seq - mb.mu.RUnlock() } - - if adm != nil { - dbs = append(dbs, adm) - } - return dbs } @@ -7013,9 +6985,13 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { if len(dbs) == 0 { return } + fs.mu.Lock() defer fs.mu.Unlock() + var needsCheck DeleteBlocks + + fs.readLockAllMsgBlocks() mdbs := fs.deleteBlocks() for i, db := range dbs { // If the block is same as what we have we can skip. @@ -7027,6 +7003,11 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { } } // Need to insert these. + needsCheck = append(needsCheck, db) + } + fs.readUnlockAllMsgBlocks() + + for _, db := range needsCheck { db.Range(func(dseq uint64) bool { fs.removeMsg(dseq, false, true, false) return true diff --git a/server/filestore_test.go b/server/filestore_test.go index 24fa033c..f4a3d57a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -30,7 +30,6 @@ import ( "os" "path/filepath" "reflect" - "strconv" "strings" "testing" "time" @@ -5707,39 +5706,3 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } - -func TestFileStoreStreamEncoderDecoder(t *testing.T) { - fs, err := newFileStore( - FileStoreConfig{StoreDir: t.TempDir()}, - StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage}, - ) - require_NoError(t, err) - defer fs.Stop() - - const seed = 2222222 - prand := rand.New(rand.NewSource(seed)) - - tick := time.NewTicker(time.Second) - defer tick.Stop() - done := time.NewTimer(10 * time.Second) - - msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes - - for running := true; running; { - select { - case <-tick.C: - var state StreamState - fs.FastState(&state) - snap, err := fs.EncodedStreamState(0) - require_NoError(t, err) - ss, err := DecodeStreamState(snap) - require_True(t, len(ss.Deleted) > 0) - require_NoError(t, err) - case <-done.C: - running = false - default: - key := strconv.Itoa(prand.Intn(256_000)) - fs.StoreMsg(key, nil, msg) - } - } -}