mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 18:20:42 -07:00
Fix for panic from a bug in selecting a block and an index when num blocks > 32 and we used new binary search in NumPending().
The reason would be that we were not accounting for gaps as mb.first.seq can move. The behavior should always return a valid index and mb if seq is inclusive of range from first to last. The panic could orphan held locks for filestore, consumer and possibly stream. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2452,7 +2452,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
|
||||
|
||||
// See if we need to figure out starting block per sseq.
|
||||
if sseq > fs.state.FirstSeq {
|
||||
seqStart, _ = fs.selectMsgBlockWithIndex(sseq)
|
||||
// This should not, but can return -1, so make sure we check to avoid panic below.
|
||||
if seqStart, _ = fs.selectMsgBlockWithIndex(sseq); seqStart < 0 {
|
||||
seqStart = 0
|
||||
}
|
||||
}
|
||||
|
||||
var tsa, fsa [32]string
|
||||
@@ -4738,6 +4741,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
|
||||
return mb
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
|
||||
// Check for out of range.
|
||||
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq {
|
||||
@@ -4765,6 +4769,13 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
|
||||
if seq > last {
|
||||
low = mid + 1
|
||||
} else if seq < first {
|
||||
// A message block's first sequence can change here meaning we could find a gap.
|
||||
// We want to behave like above, which if inclusive (we check at start) should
|
||||
// always return an index and a valid mb.
|
||||
// If we have a gap then our seq would be > fs.blks[mid-1].last.seq
|
||||
if mid == 0 || seq > atomic.LoadUint64(&fs.blks[mid-1].last.seq) {
|
||||
return mid, mb
|
||||
}
|
||||
high = mid - 1
|
||||
} else {
|
||||
return mid, mb
|
||||
|
||||
@@ -5867,6 +5867,65 @@ func TestFileStoreFullStateTestSysRemovals(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileStoreSelectBlockWithFirstSeqRemovals(t *testing.T) {
|
||||
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
|
||||
fcfg.BlockSize = 100
|
||||
scfg := StreamConfig{
|
||||
Name: "zzz",
|
||||
Subjects: []string{"*"},
|
||||
MaxMsgsPer: 1,
|
||||
Storage: FileStorage,
|
||||
}
|
||||
|
||||
prf := func(context []byte) ([]byte, error) {
|
||||
h := hmac.New(sha256.New, []byte("dlc22"))
|
||||
if _, err := h.Write(context); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return h.Sum(nil), nil
|
||||
}
|
||||
if fcfg.Cipher == NoCipher {
|
||||
prf = nil
|
||||
}
|
||||
|
||||
fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
// This yields an internal record length of 50 bytes. So 2 msgs per blk.
|
||||
msgLen := 19
|
||||
msg := bytes.Repeat([]byte("A"), msgLen)
|
||||
|
||||
subjects := "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz+@$^"
|
||||
// We need over 32 blocks to kick in binary search. So 32*2+1 (65) msgs to get 33 blocks.
|
||||
for i := 0; i < 32*2+1; i++ {
|
||||
subj := string(subjects[i])
|
||||
fs.StoreMsg(subj, nil, msg)
|
||||
}
|
||||
require_True(t, fs.numMsgBlocks() == 33)
|
||||
|
||||
// Now we want to delete the first msg of each block to move the first sequence.
|
||||
// Want to do this via system removes, not user initiated moves.
|
||||
for i := 0; i < len(subjects); i += 2 {
|
||||
subj := string(subjects[i])
|
||||
fs.StoreMsg(subj, nil, msg)
|
||||
}
|
||||
|
||||
var ss StreamState
|
||||
fs.FastState(&ss)
|
||||
|
||||
// We want to make sure that select always returns an index and a non-nil mb.
|
||||
for seq := ss.FirstSeq; seq <= ss.LastSeq; seq++ {
|
||||
fs.mu.RLock()
|
||||
index, mb := fs.selectMsgBlockWithIndex(seq)
|
||||
fs.mu.RUnlock()
|
||||
require_True(t, index >= 0)
|
||||
require_True(t, mb != nil)
|
||||
require_True(t, (seq-1)/2 == uint64(index))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user