Merge pull request #3320 from nats-io/kv_improvements

[IMPROVED] Speed of storing new keys with a large number of pre-existing keys.
This commit is contained in:
Derek Collison
2022-08-02 09:15:06 -07:00
committed by GitHub

View File

@@ -97,7 +97,6 @@ type fileStore struct {
sips int
closed bool
fip bool
tms bool
}
// Represents a message store block and its data.
@@ -310,9 +309,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return nil, fmt.Errorf("could not create hash: %v", err)
}
// Always track per subject information.
fs.tms = true
// Recover our message state.
if err := fs.recoverMsgs(); err != nil {
return nil, err
@@ -692,10 +688,8 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e
// Quick sanity check here.
// Note this only checks that the message blk file is not newer then this file, or is empty and we expect empty.
if (mb.rbytes == 0 && mb.msgs == 0) || bytes.Equal(lchk[:], mb.lchk[:]) {
if fs.tms {
if err = mb.readPerSubjectInfo(); err != nil {
return nil, err
}
if err = mb.readPerSubjectInfo(); err != nil {
return nil, err
}
fs.blks = append(fs.blks, mb)
// If we only have one subject registered we can optimize filtered lookups here.
@@ -841,9 +835,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
}
// Rebuild per subject info.
if mb.fs.tms {
mb.fss = make(map[string]*SimpleState)
}
mb.fss = make(map[string]*SimpleState)
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
@@ -928,7 +920,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.bytes += uint64(rl)
// Do per subject info.
if slen > 0 && mb.fss != nil {
if slen > 0 {
// For the lookup, we cast the byte slice and there won't be any copy
if ss := mb.fss[string(data[:slen])]; ss != nil {
ss.Msgs++
@@ -1478,7 +1470,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
}
// If subj is empty or we are not tracking multiple subjects.
if subj == _EMPTY_ || subj == fwcs || !fs.tms {
if subj == _EMPTY_ || subj == fwcs {
total := lseq - sseq + 1
if state := fs.State(); len(state.Deleted) > 0 {
for _, dseq := range state.Deleted {
@@ -1492,68 +1484,34 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
}
wc := subjectHasWildcard(subj)
// Are we tracking multiple subject states?
if fs.tms {
fs.mu.RLock()
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if sseq > atomic.LoadUint64(&mb.last.seq) {
continue
}
t, f, l := mb.filteredPending(subj, wc, sseq)
ss.Msgs += t
if ss.First == 0 || (f > 0 && f < ss.First) {
ss.First = f
}
if l > ss.Last {
ss.Last = l
}
// Tracking subject state.
fs.mu.RLock()
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if sseq > atomic.LoadUint64(&mb.last.seq) {
continue
}
fs.mu.RUnlock()
} else {
// Fallback to linear scan.
var smv StoreMsg
eq := compareFn(subj)
for seq := sseq; seq <= lseq; seq++ {
if sm, _ := fs.msgForSeq(seq, &smv); sm != nil && eq(sm.subj, subj) {
ss.Msgs++
if ss.First == 0 {
ss.First = seq
}
ss.Last = seq
}
t, f, l := mb.filteredPending(subj, wc, sseq)
ss.Msgs += t
if ss.First == 0 || (f > 0 && f < ss.First) {
ss.First = f
}
if l > ss.Last {
ss.Last = l
}
}
fs.mu.RUnlock()
return ss
}
// Will gather complete filtered state for the subject.
// Lock should be held.
func (fs *fileStore) perSubjectState(subj string) (total, first, last uint64) {
if !fs.tms {
return
}
wc := subjectHasWildcard(subj)
for _, mb := range fs.blks {
t, f, l := mb.filteredPending(subj, wc, 1)
total += t
if first == 0 || (f > 0 && f < first) {
first = f
}
if l > last {
last = l
}
}
return total, first, last
}
// SubjectsState returns a map of SimpleState for all matching subjects.
func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
fs.mu.RLock()
defer fs.mu.RUnlock()
if !fs.tms || fs.state.Msgs == 0 {
if fs.state.Msgs == 0 {
return nil
}
@@ -1653,9 +1611,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
// Lock should be held to quiet race detector.
mb.mu.Lock()
mb.setupWriteCache(rbuf)
if fs.tms {
mb.fss = make(map[string]*SimpleState)
}
mb.fss = make(map[string]*SimpleState)
mb.mu.Unlock()
// Now do local hash.
@@ -1745,22 +1701,22 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
return ErrStoreClosed
}
var pscheck bool
var asl bool
// Per subject max check needed.
var psmc uint64
psmax := fs.cfg.MaxMsgsPer > 0 && len(subj) > 0
if psmax {
psmc = fs.psmc[subj]
}
var fseq uint64
// Check if we are discarding new messages when we reach the limit.
if fs.cfg.Discard == DiscardNew {
var fseq uint64
if fs.cfg.MaxMsgsPer > 0 && len(subj) > 0 {
pscheck = true
var msgs uint64
if msgs, fseq, _ = fs.perSubjectState(subj); msgs >= uint64(fs.cfg.MaxMsgsPer) {
asl = true
}
var asl bool
if psmax && psmc >= uint64(fs.cfg.MaxMsgsPer) {
fseq, asl = fs.firstSeqForSubj(subj), true
}
if fs.cfg.MaxMsgs > 0 && fs.state.Msgs >= uint64(fs.cfg.MaxMsgs) {
if !asl {
return ErrMaxMsgs
}
if fs.cfg.MaxMsgs > 0 && fs.state.Msgs >= uint64(fs.cfg.MaxMsgs) && !asl {
return ErrMaxMsgs
}
if fs.cfg.MaxBytes > 0 && fs.state.Bytes+uint64(len(msg)+len(hdr)) >= uint64(fs.cfg.MaxBytes) {
if !asl || fs.sizeForSeq(fseq) <= len(msg)+len(hdr) {
@@ -1801,10 +1757,13 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
fs.state.LastTime = now
// Enforce per message limits.
if fs.cfg.MaxMsgsPer > 0 && len(subj) > 0 {
if !pscheck || asl {
fs.enforcePerSubjectLimit(subj)
// We snapshotted psmc before our actual write, so >= comparison needed.
if psmax && psmc >= uint64(fs.cfg.MaxMsgsPer) {
// We may have done this above.
if fseq == 0 {
fseq = fs.firstSeqForSubj(subj)
}
fs.removeMsg(fseq, false, false)
}
// Limits checks and enforcement.
@@ -1924,21 +1883,20 @@ func (fs *fileStore) rebuildFirst() {
}
}
// Will check the msg limit for this tracked subject.
// Optimized helper function to return first sequence.
// subj will always be publish subject here, meaning non-wildcard.
// We assume a fast check that this subj even exists already happened.
// Lock should be held.
func (fs *fileStore) enforcePerSubjectLimit(subj string) {
if fs.closed || fs.sips > 0 || fs.cfg.MaxMsgsPer <= 0 || !fs.tms {
return
}
for {
msgs, first, _ := fs.perSubjectState(subj)
if msgs <= uint64(fs.cfg.MaxMsgsPer) {
return
}
if ok, _ := fs.removeMsg(first, false, false); !ok {
break
func (fs *fileStore) firstSeqForSubj(subj string) uint64 {
for _, mb := range fs.blks {
mb.mu.RLock()
ss := mb.fss[subj]
mb.mu.RUnlock()
if ss != nil {
return ss.First
}
}
return 0
}
// Will check the msg limit and drop firstSeq msg if needed.
@@ -3676,6 +3634,9 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
// enough room for a max bytes condition.
// Lock should be already held.
func (fs *fileStore) sizeForSeq(seq uint64) int {
if seq == 0 {
return 0
}
var smv StoreMsg
if mb := fs.selectMsgBlock(seq); mb != nil {
if sm, _, _ := mb.fetchMsg(seq, &smv); sm != nil {
@@ -3833,6 +3794,11 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err
defer fs.mu.RUnlock()
wc := subjectHasWildcard(subj)
// If literal subject check for presence.
if !wc && fs.psmc[subj] == 0 {
return
}
// Walk blocks backwards.
for i := len(fs.blks) - 1; i >= 0; i-- {
mb := fs.blks[i]