mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIX] PurgeEx with keep and deleted bug (#4431)
Fix for purge with keep bug with user deletes and improved search for large number of blocks. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -4008,28 +4008,30 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
// Starting index, defaults to beginning.
|
||||
si := 0
|
||||
const linearThresh = 32
|
||||
nb := len(fs.blks) - 1
|
||||
|
||||
// TODO(dlc) - Use new AVL and make this real for anything beyond certain size.
|
||||
// Max threshold before we probe for a starting block to start our linear search.
|
||||
const maxl = 256
|
||||
if nb := len(fs.blks); nb > maxl {
|
||||
d := nb / 8
|
||||
for _, i := range []int{d, 2 * d, 3 * d, 4 * d, 5 * d, 6 * d, 7 * d} {
|
||||
mb := fs.blks[i]
|
||||
if nb < linearThresh {
|
||||
for i, mb := range fs.blks {
|
||||
if seq <= atomic.LoadUint64(&mb.last.seq) {
|
||||
break
|
||||
return i, mb
|
||||
}
|
||||
si = i
|
||||
}
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
// blks are sorted in ascending order.
|
||||
for i := si; i < len(fs.blks); i++ {
|
||||
mb := fs.blks[i]
|
||||
if seq <= atomic.LoadUint64(&mb.last.seq) {
|
||||
return i, mb
|
||||
// Do traditional binary search here since we know the blocks are sorted by sequence first and last.
|
||||
for low, high, mid := 0, nb, nb/2; low <= high; mid = (low + high) / 2 {
|
||||
mb := fs.blks[mid]
|
||||
// Right now these atomic loads do not factor in, so fine to leave. Was considering
|
||||
// uplifting these to fs scope to avoid atomic load but not needed.
|
||||
first, last := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq)
|
||||
if seq > last {
|
||||
low = mid + 1
|
||||
} else if seq < first {
|
||||
high = mid - 1
|
||||
} else {
|
||||
return mid, mb
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5205,16 +5207,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
|
||||
}
|
||||
if sequence > 1 {
|
||||
return fs.Compact(sequence)
|
||||
} else if keep > 0 {
|
||||
fs.mu.RLock()
|
||||
msgs, lseq := fs.state.Msgs, fs.state.LastSeq
|
||||
fs.mu.RUnlock()
|
||||
if keep >= msgs {
|
||||
return 0, nil
|
||||
}
|
||||
return fs.Compact(lseq - keep + 1)
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
eq, wc := compareFn(subject), subjectHasWildcard(subject)
|
||||
|
||||
@@ -5569,3 +5569,23 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
|
||||
// Make sure it was update properly.
|
||||
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
|
||||
}
|
||||
|
||||
func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
|
||||
require_NoError(t, err)
|
||||
|
||||
msg := bytes.Repeat([]byte("A"), 19)
|
||||
for i := 0; i < 5; i++ {
|
||||
fs.StoreMsg("A", nil, msg)
|
||||
fs.StoreMsg("B", nil, msg)
|
||||
}
|
||||
|
||||
n, err := fs.PurgeEx("A", 0, 0)
|
||||
require_NoError(t, err)
|
||||
require_True(t, n == 5)
|
||||
|
||||
// Purge with keep.
|
||||
n, err = fs.PurgeEx(_EMPTY_, 0, 2)
|
||||
require_NoError(t, err)
|
||||
require_True(t, n == 3)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user