From 22ed97c6c9d7bd8f4969172bc3b5a1514f642139 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 25 Aug 2023 08:59:47 -0700 Subject: [PATCH] Fix for purge with keep bug and improved search for large number of blocks. Signed-off-by: Derek Collison --- server/filestore.go | 43 +++++++++++++++++----------------------- server/filestore_test.go | 20 +++++++++++++++++++ 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 2857abec..e30e8a77 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4008,28 +4008,30 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) { return -1, nil } - // Starting index, defaults to beginning. - si := 0 + const linearThresh = 32 + nb := len(fs.blks) - 1 - // TODO(dlc) - Use new AVL and make this real for anything beyond certain size. - // Max threshold before we probe for a starting block to start our linear search. - const maxl = 256 - if nb := len(fs.blks); nb > maxl { - d := nb / 8 - for _, i := range []int{d, 2 * d, 3 * d, 4 * d, 5 * d, 6 * d, 7 * d} { - mb := fs.blks[i] + if nb < linearThresh { + for i, mb := range fs.blks { if seq <= atomic.LoadUint64(&mb.last.seq) { - break + return i, mb } - si = i } + return -1, nil } - // blks are sorted in ascending order. - for i := si; i < len(fs.blks); i++ { - mb := fs.blks[i] - if seq <= atomic.LoadUint64(&mb.last.seq) { - return i, mb + // Do traditional binary search here since we know the blocks are sorted by sequence first and last. + for low, high, mid := 0, nb, nb/2; low <= high; mid = (low + high) / 2 { + mb := fs.blks[mid] + // Right now these atomic loads do not factor in, so fine to leave. Was considering + // uplifting these to fs scope to avoid atomic load but not needed. + first, last := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) + if seq > last { + low = mid + 1 + } else if seq < first { + high = mid - 1 + } else { + return mid, mb } } @@ -5205,16 +5207,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } if sequence > 1 { return fs.Compact(sequence) - } else if keep > 0 { - fs.mu.RLock() - msgs, lseq := fs.state.Msgs, fs.state.LastSeq - fs.mu.RUnlock() - if keep >= msgs { - return 0, nil - } - return fs.Compact(lseq - keep + 1) } - return 0, nil } eq, wc := compareFn(subject), subjectHasWildcard(subject) diff --git a/server/filestore_test.go b/server/filestore_test.go index 6c902f2a..81032303 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5569,3 +5569,23 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } + +func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) { + fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}) + require_NoError(t, err) + + msg := bytes.Repeat([]byte("A"), 19) + for i := 0; i < 5; i++ { + fs.StoreMsg("A", nil, msg) + fs.StoreMsg("B", nil, msg) + } + + n, err := fs.PurgeEx("A", 0, 0) + require_NoError(t, err) + require_True(t, n == 5) + + // Purge with keep. + n, err = fs.PurgeEx(_EMPTY_, 0, 2) + require_NoError(t, err) + require_True(t, n == 3) +}