mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 10:40:41 -07:00
Fix for race in selectMsgBlock
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1148,28 +1148,35 @@ func (fs *fileStore) syncBlocks() {
|
||||
// Return nil if not in the set.
|
||||
func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
|
||||
fs.mu.RLock()
|
||||
outOfRange := seq < fs.state.FirstSeq || seq > fs.state.LastSeq
|
||||
blks := fs.blks
|
||||
lmb := fs.lmb
|
||||
fs.mu.RUnlock()
|
||||
|
||||
if outOfRange {
|
||||
// Check for out of range.
|
||||
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq {
|
||||
fs.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
var smb *msgBlock
|
||||
var needsFlush bool
|
||||
|
||||
// blks are sorted in ascending order.
|
||||
// TODO(dlc) - Can be smarter here, when lots of blks maybe use binary search.
|
||||
// For now this is cache friendly for small to medium num blks.
|
||||
for _, mb := range blks {
|
||||
for _, mb := range fs.blks {
|
||||
if seq <= atomic.LoadUint64(&mb.last.seq) {
|
||||
// This detects if what we may be looking for is staged in the write buffer.
|
||||
if mb == lmb {
|
||||
fs.flushPendingWritesUnlocked()
|
||||
if mb == fs.lmb {
|
||||
needsFlush = true
|
||||
}
|
||||
return mb
|
||||
smb = mb
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
fs.mu.RUnlock()
|
||||
|
||||
if needsFlush {
|
||||
fs.flushPendingWritesUnlocked()
|
||||
}
|
||||
|
||||
return smb
|
||||
}
|
||||
|
||||
// Select the message block where this message should be found.
|
||||
|
||||
Reference in New Issue
Block a user