mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix for a call into fs.recalculateFirstForSubj() from fs.recalculateFirstForSubj() that did not lock the mb properly.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3114,9 +3114,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
|
||||
mb.mu.Unlock()
|
||||
return 0, err
|
||||
}
|
||||
ss := mb.fss[subj]
|
||||
mb.mu.Unlock()
|
||||
if ss != nil {
|
||||
if ss := mb.fss[subj]; ss != nil {
|
||||
// Adjust first if it was not where we thought it should be.
|
||||
if i != start {
|
||||
if info, ok := fs.psim[subj]; ok {
|
||||
@@ -3126,8 +3124,10 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
|
||||
if ss.firstNeedsUpdate {
|
||||
mb.recalculateFirstForSubj(subj, ss.First, ss)
|
||||
}
|
||||
mb.mu.Unlock()
|
||||
return ss.First, nil
|
||||
}
|
||||
mb.mu.Unlock()
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
@@ -6578,6 +6578,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Mark first as updated.
|
||||
ss.firstNeedsUpdate = false
|
||||
startSeq++
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -5429,6 +5430,60 @@ func TestFileStoreSyncIntervals(t *testing.T) {
|
||||
checkSyncFlag(false)
|
||||
}
|
||||
|
||||
// https://github.com/nats-io/nats-server/issues/4529
|
||||
// Run this wuth --race and you will see the unlocked access that probably caused this.
|
||||
func TestFileStoreRecalcFirstSequenceBug(t *testing.T) {
|
||||
fcfg := FileStoreConfig{StoreDir: t.TempDir()}
|
||||
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage})
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
msg := bytes.Repeat([]byte("A"), 22)
|
||||
|
||||
for _, subj := range []string{"A", "A", "B", "B"} {
|
||||
fs.StoreMsg(subj, nil, msg)
|
||||
}
|
||||
// Make sure the buffer is cleared.
|
||||
clearLMBCache := func() {
|
||||
fs.mu.RLock()
|
||||
mb := fs.lmb
|
||||
fs.mu.RUnlock()
|
||||
mb.mu.Lock()
|
||||
mb.clearCacheAndOffset()
|
||||
mb.mu.Unlock()
|
||||
}
|
||||
|
||||
clearLMBCache()
|
||||
|
||||
// Do first here.
|
||||
fs.StoreMsg("A", nil, msg)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
start := make(chan bool)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-start
|
||||
for i := 0; i < 1_000; i++ {
|
||||
fs.LoadLastMsg("A", nil)
|
||||
clearLMBCache()
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-start
|
||||
for i := 0; i < 1_000; i++ {
|
||||
fs.StoreMsg("A", nil, msg)
|
||||
}
|
||||
}()
|
||||
|
||||
close(start)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// New WAL based architecture tests
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user