Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-08-29 16:48:04 -07:00
2 changed files with 73 additions and 15 deletions

View File

@@ -522,7 +522,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.ageChk = nil
}
if cfg.MaxMsgsPer > 0 && cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
fs.enforceMsgPerSubjectLimit()
}
fs.mu.Unlock()
@@ -5081,20 +5081,17 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
start = fs.state.FirstSeq
}
// TODO(dlc) - If num blocks gets large maybe use selectMsgBlock but have it return index b/c
// we need to keep walking if no match found in first mb.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if start > atomic.LoadUint64(&mb.last.seq) {
continue
}
if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil {
if expireOk && mb != fs.lmb {
mb.tryForceExpireCache()
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
for i := bi; i < len(fs.blks); i++ {
mb := fs.blks[i]
if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil {
if expireOk && mb != fs.lmb {
mb.tryForceExpireCache()
}
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
}
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
}
}
@@ -6237,7 +6234,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq&ebit != 0 {
if seq < mb.first.seq || seq&ebit != 0 {
continue
}
if mb.dmap.Exists(seq) {