mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
During startup each filtered consumer could do a linear scan of the stream
to determine number of messages pending. This improves that with a startup cache. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -536,7 +536,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
}
|
||||
|
||||
// Check if we have filtered subject that is a wildcard.
|
||||
if config.FilterSubject != _EMPTY_ && !subjectIsLiteral(config.FilterSubject) {
|
||||
if config.FilterSubject != _EMPTY_ && subjectHasWildcard(config.FilterSubject) {
|
||||
o.filterWC = true
|
||||
}
|
||||
|
||||
@@ -2909,17 +2909,7 @@ func (o *consumer) setInitialPending() {
|
||||
}
|
||||
} else {
|
||||
// Here we are filtered.
|
||||
// FIXME(dlc) - This could be slow with O(n)
|
||||
for seq := o.sseq; ; seq++ {
|
||||
subj, _, _, _, err := o.mset.store.LoadMsg(seq)
|
||||
if err == ErrStoreMsgNotFound {
|
||||
continue
|
||||
} else if err == ErrStoreEOF {
|
||||
break
|
||||
} else if err == nil && o.isFilteredMatch(subj) {
|
||||
o.sgap++
|
||||
}
|
||||
}
|
||||
o.sgap = o.mset.store.NumFilteredPending(o.sseq, o.cfg.FilterSubject)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -78,6 +78,8 @@ type fileStore struct {
|
||||
hh hash.Hash64
|
||||
qch chan struct{}
|
||||
cfs []*consumerFileStore
|
||||
fsi map[string]seqSlice
|
||||
fsis *simpleState
|
||||
closed bool
|
||||
expiring bool
|
||||
fip bool
|
||||
@@ -250,7 +252,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
|
||||
return nil, fmt.Errorf("could not create message storage directory - %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(odir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("could not create message storage directory - %v", err)
|
||||
return nil, fmt.Errorf("could not create consumer storage directory - %v", err)
|
||||
}
|
||||
|
||||
// Create highway hash for message blocks. Use sha256 of directory as key.
|
||||
@@ -260,11 +262,23 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
|
||||
return nil, fmt.Errorf("could not create hash: %v", err)
|
||||
}
|
||||
|
||||
// Recover our state.
|
||||
// Recover our message state.
|
||||
if err := fs.recoverMsgs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check to see if we have lots of messages and existing consumers.
|
||||
// If they could be filtered we should generate an index here.
|
||||
const lowWaterMarkMsgs = 8192
|
||||
if fs.state.Msgs > lowWaterMarkMsgs {
|
||||
// If we have one subject that is not a wildcard we can skip.
|
||||
if !(len(cfg.Subjects) == 1 && subjectIsLiteral(cfg.Subjects[0])) {
|
||||
if ofis, _ := ioutil.ReadDir(odir); len(ofis) > 0 {
|
||||
fs.genFilterIndex()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write our meta data iff does not exist.
|
||||
meta := path.Join(fcfg.StoreDir, JetStreamMetaFile)
|
||||
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
|
||||
@@ -663,6 +677,7 @@ func (fs *fileStore) recoverMsgs() error {
|
||||
fs.startAgeChk()
|
||||
fs.expireMsgsLocked()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -700,6 +715,108 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
type seqSlice []uint64
|
||||
|
||||
func (x seqSlice) Len() int { return len(x) }
|
||||
func (x seqSlice) Less(i, j int) bool { return x[i] < x[j] }
|
||||
func (x seqSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
|
||||
|
||||
func (x seqSlice) Search(n uint64) int {
|
||||
return sort.Search(len(x), func(i int) bool { return x[i] >= n })
|
||||
}
|
||||
|
||||
type simpleState struct {
|
||||
msgs, first, last uint64
|
||||
}
|
||||
|
||||
// This will generate an index for us on startup to determine num pending for
|
||||
// filtered consumers easier.
|
||||
func (fs *fileStore) genFilterIndex() {
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
fsi := make(map[string]seqSlice)
|
||||
|
||||
for _, mb := range fs.blks {
|
||||
mb.loadMsgs()
|
||||
mb.mu.Lock()
|
||||
fseq, lseq := mb.first.seq, mb.last.seq
|
||||
for seq := fseq; seq <= lseq; seq++ {
|
||||
if sm, err := mb.cacheLookupWithLock(seq); sm != nil && err == nil {
|
||||
fsi[sm.subj] = append(fsi[sm.subj], seq)
|
||||
}
|
||||
}
|
||||
// Expire this cache before moving on.
|
||||
mb.llts = 0
|
||||
mb.expireCacheLocked()
|
||||
mb.mu.Unlock()
|
||||
}
|
||||
|
||||
fs.fsi = fsi
|
||||
fs.fsis = &simpleState{fs.state.Msgs, fs.state.FirstSeq, fs.state.LastSeq}
|
||||
}
|
||||
|
||||
// Clears out the filter index.
|
||||
func (fs *fileStore) clearFilterIndex() {
|
||||
fs.mu.Lock()
|
||||
fs.fsi, fs.fsis = nil, nil
|
||||
fs.mu.Unlock()
|
||||
}
|
||||
|
||||
// Fetch our num filtered pending from our index.
|
||||
// Lock should be held.
|
||||
func (fs *fileStore) getNumFilteredPendingFromIndex(sseq uint64, subj string) (uint64, error) {
|
||||
cstate := simpleState{fs.state.Msgs, fs.state.FirstSeq, fs.state.LastSeq}
|
||||
if fs.fsis == nil || *fs.fsis != cstate {
|
||||
fs.fsi, fs.fsis = nil, nil
|
||||
return 0, errors.New("state changed, index not valid")
|
||||
}
|
||||
var total uint64
|
||||
for tsubj, seqs := range fs.fsi {
|
||||
if subjectIsSubsetMatch(tsubj, subj) {
|
||||
total += uint64(len(seqs[seqs.Search(sseq):]))
|
||||
}
|
||||
}
|
||||
return total, nil
|
||||
}
|
||||
|
||||
// Returns number of messages matching the subject starting at sequence sseq.
|
||||
func (fs *fileStore) NumFilteredPending(sseq uint64, subj string) (total uint64) {
|
||||
fs.mu.RLock()
|
||||
lseq := fs.state.LastSeq
|
||||
if sseq < fs.state.FirstSeq {
|
||||
sseq = fs.state.FirstSeq
|
||||
}
|
||||
if fs.fsi != nil {
|
||||
if np, err := fs.getNumFilteredPendingFromIndex(sseq, subj); err == nil {
|
||||
fs.mu.RUnlock()
|
||||
return np
|
||||
}
|
||||
}
|
||||
fs.mu.RUnlock()
|
||||
|
||||
if subj == _EMPTY_ {
|
||||
if sseq <= lseq {
|
||||
return lseq - sseq
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
var eq func(string, string) bool
|
||||
if subjectHasWildcard(subj) {
|
||||
eq = subjectIsSubsetMatch
|
||||
} else {
|
||||
eq = func(a, b string) bool { return a == b }
|
||||
}
|
||||
|
||||
for seq := sseq; seq <= lseq; seq++ {
|
||||
if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subj) {
|
||||
total++
|
||||
}
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
// RegisterStorageUpdates registers a callback for updates to storage changes.
|
||||
// It will present number of messages and bytes as a signed integer and an
|
||||
// optional sequence number of the message if a single.
|
||||
@@ -886,6 +1003,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
|
||||
fs.startAgeChk()
|
||||
}
|
||||
|
||||
// If we had an index cache wipe that out.
|
||||
if fs.fsi != nil {
|
||||
fs.fsi, fs.fsis = nil, nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1119,6 +1241,11 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) {
|
||||
fs.state.Msgs--
|
||||
fs.state.Bytes -= msz
|
||||
|
||||
// If we had an index cache wipe that out.
|
||||
if fs.fsi != nil {
|
||||
fs.fsi, fs.fsis = nil, nil
|
||||
}
|
||||
|
||||
// Now local mb updates.
|
||||
mb.msgs--
|
||||
mb.bytes -= msz
|
||||
@@ -2407,7 +2534,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
|
||||
|
||||
// Check to see if we are the last seq for this message block and are doing
|
||||
// a linear scan. If that is true and we are not the last message block we can
|
||||
// expire try to expire the cache.
|
||||
// try to expire the cache.
|
||||
mb.mu.RLock()
|
||||
shouldTryExpire := mb != lmb && seq == mb.last.seq && mb.llseq == seq-1
|
||||
mb.mu.RUnlock()
|
||||
|
||||
@@ -893,6 +893,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
s.Warnf(" Error restoring Consumer state: %v", err)
|
||||
}
|
||||
}
|
||||
mset.clearFilterIndex()
|
||||
}
|
||||
|
||||
// Make sure to cleanup and old remaining snapshots.
|
||||
|
||||
@@ -222,6 +222,37 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
|
||||
return uint64(index) + ms.state.FirstSeq
|
||||
}
|
||||
|
||||
// Returns number of messages matching the subject starting at sequence sseq.
|
||||
func (ms *memStore) NumFilteredPending(sseq uint64, subj string) (total uint64) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
|
||||
if sseq < ms.state.FirstSeq {
|
||||
sseq = ms.state.FirstSeq
|
||||
}
|
||||
|
||||
if subj == _EMPTY_ {
|
||||
if sseq <= ms.state.LastSeq {
|
||||
return ms.state.LastSeq - sseq
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
var eq func(string, string) bool
|
||||
if subjectHasWildcard(subj) {
|
||||
eq = subjectIsSubsetMatch
|
||||
} else {
|
||||
eq = func(a, b string) bool { return a == b }
|
||||
}
|
||||
|
||||
for seq := sseq; seq <= ms.state.LastSeq; seq++ {
|
||||
if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subj) {
|
||||
total++
|
||||
}
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
// Will check the msg limit and drop firstSeq msg if needed.
|
||||
// Lock should be held.
|
||||
func (ms *memStore) enforceMsgLimit() {
|
||||
|
||||
@@ -75,6 +75,7 @@ type StreamStore interface {
|
||||
Compact(seq uint64) (uint64, error)
|
||||
Truncate(seq uint64) error
|
||||
GetSeqFromTime(t time.Time) uint64
|
||||
NumFilteredPending(sseq uint64, subject string) uint64
|
||||
State() StreamState
|
||||
FastState(*StreamState)
|
||||
Type() StorageType
|
||||
|
||||
@@ -2083,6 +2083,16 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clears out any filtered index from filestores.
|
||||
func (mset *stream) clearFilterIndex() {
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
|
||||
if fs, ok := mset.store.(*fileStore); ok {
|
||||
fs.clearFilterIndex()
|
||||
}
|
||||
}
|
||||
|
||||
// Called for any updates to the underlying stream. We pass through the bytes to the
|
||||
// jetstream account. We do local processing for stream pending for consumers, but only
|
||||
// for removals.
|
||||
|
||||
Reference in New Issue
Block a user