Improve performance on storing msgs when multiple subjects exists with multiple messages and we have store limits that are being hit.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-13 15:27:34 -07:00
parent 69b427bc94
commit b597485102
3 changed files with 109 additions and 36 deletions

View File

@@ -1358,7 +1358,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
// Update fss
// Make sure we have fss loaded.
mb.removeSeqPerSubject(sm.subj, seq, nil)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
}
// Make sure we have a proper next first sequence.
@@ -1482,6 +1482,9 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = mb.last.seq + 1
for _, subj := range subs {
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
}
@@ -1587,6 +1590,9 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
var havePartial bool
for subj, ss := range mb.fss {
if isAll || isMatch(subj) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
update(ss)
} else if sseq <= ss.Last {
@@ -1784,6 +1790,9 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.ensurePerSubjectInfoLoaded()
for subj, ss := range mb.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
@@ -1883,6 +1892,9 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
seen[subj] = true
}
} else {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
} else if sseq <= ss.Last {
@@ -2474,6 +2486,9 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
return ss.First, nil
}
}
@@ -2548,6 +2563,9 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
if ss == nil {
continue
@@ -2717,7 +2735,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.ensurePerSubjectInfoLoaded()
// If we are tracking multiple subjects here make sure we update that accounting.
mb.removeSeqPerSubject(sm.subj, seq, &smv)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
if secure {
@@ -5094,7 +5112,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
bytes += rl
}
// FSS updates.
mb.removeSeqPerSubject(sm.subj, seq, &smv)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
// Check for first message.
@@ -5302,7 +5320,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
purged++
}
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq, &smv)
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
}
}
@@ -5678,7 +5696,7 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
// Remove a seq from the fss and select new first.
// Lock should be held.
func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg) {
func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss == nil {
@@ -5692,37 +5710,64 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg)
}
ss.Msgs--
if seq != ss.First {
return
}
// Only one left.
if ss.Msgs == 1 {
if seq != ss.First {
if seq == ss.Last {
ss.Last = ss.First
mb.fssNeedsWrite = true // Mark dirty
} else {
ss.First = ss.Last
mb.fssNeedsWrite = true // Mark dirty
}
mb.fssNeedsWrite = true // Mark dirty
return
}
// Recalculate first.
// TODO(dlc) - Might want to optimize this.
if seq == ss.First {
var smv StoreMsg
if smp == nil {
smp = &smv
// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}
// Will recalulate the first sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
for tseq := seq + 1; tseq <= ss.Last; tseq++ {
if sm, _ := mb.cacheLookup(tseq, smp); sm != nil {
if sm.subj == subj {
ss.First = tseq
mb.fssNeedsWrite = true // Mark dirty
return
}
// Mark first as updated.
ss.firstNeedsUpdate = false
startSeq++
startSlot := int(startSeq - mb.cache.fseq)
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
}
var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
li := int(mb.cache.idx[slot]&^hbit) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq&ebit != 0 {
continue
}
if len(mb.dmap) > 0 {
if _, ok := mb.dmap[seq]; ok {
continue
}
}
ss.First = seq
mb.fssNeedsWrite = true // Mark dirty
return
}
}
}
@@ -5951,6 +5996,9 @@ func (mb *msgBlock) writePerSubjectInfo() error {
n := binary.PutUvarint(scratch[0:], uint64(len(mb.fss)))
b.Write(scratch[0:n])
for subj, ss := range mb.fss {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
n := binary.PutUvarint(scratch[0:], uint64(len(subj)))
b.Write(scratch[0:n])
b.WriteString(subj)

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -79,9 +79,9 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
lm := uint64(ms.maxp)
for _, ss := range ms.fss {
for subj, ss := range ms.fss {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(ss)
ms.enforcePerSubjectLimit(subj, ss)
}
}
}
@@ -125,6 +125,9 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < uint64(len(msg)+len(hdr)) {
return ErrMaxBytes
@@ -176,7 +179,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
ss.Last = seq
// Check per subject limits.
if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) {
ms.enforcePerSubjectLimit(ss)
ms.enforcePerSubjectLimit(subj, ss)
}
} else {
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
@@ -358,6 +361,9 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
// We will track start and end sequences as we go.
for subj, fss := range ms.fss {
if isMatch(subj) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
@@ -452,6 +458,9 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
for subj, ss := range ms.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
@@ -503,11 +512,14 @@ func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
// Will check the msg limit for this tracked subject.
// Lock should be held.
func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) {
func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
if ms.maxp <= 0 {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if !ms.removeMsg(ss.First, false) {
break
}
@@ -899,6 +911,9 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if ss == nil {
continue
}
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss.First < fseq {
fseq = ss.First
}
@@ -981,19 +996,26 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
return
}
ss.Msgs--
if seq != ss.First {
return
}
// If we know we only have 1 msg left don't need to search for next first.
if ss.Msgs == 1 {
ss.First = ss.Last
return
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
} else {
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}
// TODO(dlc) - Might want to optimize this longer term.
for tseq := seq + 1; tseq <= ss.Last; tseq++ {
}
// Will recalulate the first sequence for this subject in this block.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
for tseq := startSeq + 1; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
break
ss.firstNeedsUpdate = false
return
}
}
}

View File

@@ -156,6 +156,9 @@ type SimpleState struct {
Msgs uint64 `json:"messages"`
First uint64 `json:"first_seq"`
Last uint64 `json:"last_seq"`
// Internal usage for when the first needs to be updated before use.
firstNeedsUpdate bool
}
// LostStreamData indicates msgs that have been lost.