mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Only update per subject information if we know we have an update.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -168,6 +168,10 @@ type msgBlock struct {
|
||||
noTrack bool
|
||||
closed bool
|
||||
|
||||
// To avoid excessive writes when expiring cache.
|
||||
// These can be big.
|
||||
fssNeedsWrite bool
|
||||
|
||||
// Used to mock write failures.
|
||||
mockWriteErr bool
|
||||
}
|
||||
@@ -1111,6 +1115,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
|
||||
subj := mb.subjString(data[:slen])
|
||||
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
|
||||
}
|
||||
mb.fssNeedsWrite = true
|
||||
}
|
||||
}
|
||||
// Advance to next record.
|
||||
@@ -3238,6 +3243,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
|
||||
} else {
|
||||
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
|
||||
}
|
||||
mb.fssNeedsWrite = true
|
||||
}
|
||||
|
||||
// Indexing
|
||||
@@ -5288,6 +5294,10 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg)
|
||||
if ss == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Mark dirty
|
||||
mb.fssNeedsWrite = true
|
||||
|
||||
if ss.Msgs == 1 {
|
||||
delete(mb.fss, subj)
|
||||
return
|
||||
@@ -5378,6 +5388,7 @@ func (mb *msgBlock) generatePerSubjectInfo(hasLock bool) error {
|
||||
} else {
|
||||
mb.fss[sm.subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
|
||||
}
|
||||
mb.fssNeedsWrite = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5423,6 +5434,9 @@ func (mb *msgBlock) loadPerSubjectInfo() ([]byte, error) {
|
||||
// Helper to make sure fss loaded if we are tracking.
|
||||
// Lock should be held
|
||||
func (mb *msgBlock) ensurePerSubjectInfoLoaded() error {
|
||||
// Clear
|
||||
mb.fssNeedsWrite = false
|
||||
|
||||
if mb.fss != nil || mb.noTrack {
|
||||
return nil
|
||||
}
|
||||
@@ -5531,7 +5545,7 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error {
|
||||
// Lock should be held.
|
||||
func (mb *msgBlock) writePerSubjectInfo() error {
|
||||
// Raft groups do not have any subjects.
|
||||
if len(mb.fss) == 0 || len(mb.sfn) == 0 {
|
||||
if len(mb.fss) == 0 || len(mb.sfn) == 0 || !mb.fssNeedsWrite {
|
||||
return nil
|
||||
}
|
||||
var scratch [4 * binary.MaxVarintLen64]byte
|
||||
@@ -5562,6 +5576,11 @@ func (mb *msgBlock) writePerSubjectInfo() error {
|
||||
err := os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms)
|
||||
dios <- struct{}{}
|
||||
|
||||
// Clear write flag if no error.
|
||||
if err == nil {
|
||||
mb.fssNeedsWrite = false
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -5227,3 +5227,50 @@ func TestFileStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) {
|
||||
require_True(t, state.NumSubjects == 500)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileStoreOnlyWritePerSubjectInfoOnExpireWithUpdate(t *testing.T) {
|
||||
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
|
||||
fcfg.CacheExpire = 100 * time.Millisecond
|
||||
|
||||
fs, err := newFileStore(
|
||||
fcfg,
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage},
|
||||
)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
subj := fmt.Sprintf("foo.%d", i)
|
||||
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
// Grab first msg block.
|
||||
fs.mu.RLock()
|
||||
mb := fs.blks[0]
|
||||
fs.mu.RUnlock()
|
||||
|
||||
needsUpdate := func() bool {
|
||||
mb.mu.RLock()
|
||||
defer mb.mu.RUnlock()
|
||||
return mb.fssNeedsWrite
|
||||
}
|
||||
require_True(t, needsUpdate())
|
||||
time.Sleep(2 * fcfg.CacheExpire)
|
||||
require_False(t, needsUpdate())
|
||||
|
||||
// Make sure reads do not trigger an update.
|
||||
_, err = fs.LoadMsg(1, nil)
|
||||
require_NoError(t, err)
|
||||
require_False(t, needsUpdate())
|
||||
|
||||
// Remove will though.
|
||||
_, err = fs.RemoveMsg(1)
|
||||
require_NoError(t, err)
|
||||
require_True(t, needsUpdate())
|
||||
|
||||
// We should update then clear.
|
||||
time.Sleep(2 * fcfg.CacheExpire)
|
||||
require_False(t, needsUpdate())
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user