diff --git a/server/filestore.go b/server/filestore.go index b73b3497..7836db01 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6887,8 +6887,21 @@ func (fs *fileStore) fileStoreConfig() FileStoreConfig { return fs.fcfg } -// When we will write a run length encoded record vs adding to the existing avl.SequenceSet. -const rlThresh = 4096 +// Read lock all existing message blocks. +// Lock held on entry. +func (fs *fileStore) readLockAllMsgBlocks() { + for _, mb := range fs.blks { + mb.mu.RLock() + } +} + +// Read unlock all existing message blocks. +// Lock held on entry. +func (fs *fileStore) readUnlockAllMsgBlocks() { + for _, mb := range fs.blks { + mb.mu.RUnlock() + } +} // Binary encoded state snapshot, >= v2.10 server. func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { @@ -6919,6 +6932,10 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { if numDeleted > 0 { var scratch [4 * 1024]byte + + fs.readLockAllMsgBlocks() + defer fs.readUnlockAllMsgBlocks() + for _, db := range fs.deleteBlocks() { switch db := db.(type) { case *DeleteRange: @@ -6943,68 +6960,23 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { return b, nil } -// Lock should be held. +// We used to be more sophisticated to save memory, but speed is more important. +// All blocks should be at least read locked. func (fs *fileStore) deleteBlocks() DeleteBlocks { - var ( - dbs DeleteBlocks - adm *avl.SequenceSet - prevLast uint64 - ) + var dbs DeleteBlocks + var prevLast uint64 + for _, mb := range fs.blks { - mb.mu.RLock() // Detect if we have a gap between these blocks. if prevLast > 0 && prevLast+1 != mb.first.seq { - // Detect if we need to encode a run length encoding here. - if gap := mb.first.seq - prevLast - 1; gap > rlThresh { - // Check if we have a running adm, if so write that out first, or if contigous update rle params. - if min, max, num := adm.State(); num > 0 { - // Check if we are all contingous. - if num == max-min+1 { - prevLast, gap = min-1, mb.first.seq-min - } else { - dbs = append(dbs, adm) - } - // Always nil out here. - adm = nil - } - dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap}) - } else { - // Common dmap - if adm == nil { - adm = &avl.SequenceSet{} - adm.SetInitialMin(prevLast + 1) - } - for seq := prevLast + 1; seq < mb.first.seq; seq++ { - adm.Insert(seq) - } - } + gap := mb.first.seq - prevLast - 1 + dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap}) } - if min, max, num := mb.dmap.State(); num > 0 { - // Check in case the mb's dmap is contiguous and over our threshold. - if num == max-min+1 && num > rlThresh { - // Need to write out adm if it exists. - if adm != nil && adm.Size() > 0 { - dbs = append(dbs, adm) - adm = nil - } - dbs = append(dbs, &DeleteRange{First: min, Num: max - min + 1}) - } else { - // Aggregated dmap - if adm == nil { - adm = mb.dmap.Clone() - } else { - adm.Union(&mb.dmap) - } - } + if mb.dmap.Size() > 0 { + dbs = append(dbs, &mb.dmap) } prevLast = mb.last.seq - mb.mu.RUnlock() } - - if adm != nil { - dbs = append(dbs, adm) - } - return dbs } @@ -7013,9 +6985,13 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { if len(dbs) == 0 { return } + fs.mu.Lock() defer fs.mu.Unlock() + var needsCheck DeleteBlocks + + fs.readLockAllMsgBlocks() mdbs := fs.deleteBlocks() for i, db := range dbs { // If the block is same as what we have we can skip. @@ -7027,6 +7003,11 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { } } // Need to insert these. + needsCheck = append(needsCheck, db) + } + fs.readUnlockAllMsgBlocks() + + for _, db := range needsCheck { db.Range(func(dseq uint64) bool { fs.removeMsg(dseq, false, true, false) return true diff --git a/server/filestore_test.go b/server/filestore_test.go index 24fa033c..f4a3d57a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -30,7 +30,6 @@ import ( "os" "path/filepath" "reflect" - "strconv" "strings" "testing" "time" @@ -5707,39 +5706,3 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } - -func TestFileStoreStreamEncoderDecoder(t *testing.T) { - fs, err := newFileStore( - FileStoreConfig{StoreDir: t.TempDir()}, - StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage}, - ) - require_NoError(t, err) - defer fs.Stop() - - const seed = 2222222 - prand := rand.New(rand.NewSource(seed)) - - tick := time.NewTicker(time.Second) - defer tick.Stop() - done := time.NewTimer(10 * time.Second) - - msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes - - for running := true; running; { - select { - case <-tick.C: - var state StreamState - fs.FastState(&state) - snap, err := fs.EncodedStreamState(0) - require_NoError(t, err) - ss, err := DecodeStreamState(snap) - require_True(t, len(ss.Deleted) > 0) - require_NoError(t, err) - case <-done.C: - running = false - default: - key := strconv.Itoa(prand.Intn(256_000)) - fs.StoreMsg(key, nil, msg) - } - } -}