From c0e8590c0fbb2bcbd9c8311206af3d5462d3f4ca Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 7 Apr 2021 09:15:01 -0700 Subject: [PATCH] During startup each filtered consumer could do a linear scan of the stream to determine number of messages pending. This improves that with a startup cache. Signed-off-by: Derek Collison --- server/consumer.go | 14 +---- server/filestore.go | 133 +++++++++++++++++++++++++++++++++++++++++++- server/jetstream.go | 1 + server/memstore.go | 31 +++++++++++ server/store.go | 1 + server/stream.go | 10 ++++ 6 files changed, 175 insertions(+), 15 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index dc5a4a53..8c02b3f1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -536,7 +536,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } // Check if we have filtered subject that is a wildcard. - if config.FilterSubject != _EMPTY_ && !subjectIsLiteral(config.FilterSubject) { + if config.FilterSubject != _EMPTY_ && subjectHasWildcard(config.FilterSubject) { o.filterWC = true } @@ -2909,17 +2909,7 @@ func (o *consumer) setInitialPending() { } } else { // Here we are filtered. - // FIXME(dlc) - This could be slow with O(n) - for seq := o.sseq; ; seq++ { - subj, _, _, _, err := o.mset.store.LoadMsg(seq) - if err == ErrStoreMsgNotFound { - continue - } else if err == ErrStoreEOF { - break - } else if err == nil && o.isFilteredMatch(subj) { - o.sgap++ - } - } + o.sgap = o.mset.store.NumFilteredPending(o.sseq, o.cfg.FilterSubject) } } diff --git a/server/filestore.go b/server/filestore.go index 88b22a9a..3d076cd5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -78,6 +78,8 @@ type fileStore struct { hh hash.Hash64 qch chan struct{} cfs []*consumerFileStore + fsi map[string]seqSlice + fsis *simpleState closed bool expiring bool fip bool @@ -250,7 +252,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return nil, fmt.Errorf("could not create message storage directory - %v", err) } if err := os.MkdirAll(odir, 0755); err != nil { - return nil, fmt.Errorf("could not create message storage directory - %v", err) + return nil, fmt.Errorf("could not create consumer storage directory - %v", err) } // Create highway hash for message blocks. Use sha256 of directory as key. @@ -260,11 +262,23 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return nil, fmt.Errorf("could not create hash: %v", err) } - // Recover our state. + // Recover our message state. if err := fs.recoverMsgs(); err != nil { return nil, err } + // Check to see if we have lots of messages and existing consumers. + // If they could be filtered we should generate an index here. + const lowWaterMarkMsgs = 8192 + if fs.state.Msgs > lowWaterMarkMsgs { + // If we have one subject that is not a wildcard we can skip. + if !(len(cfg.Subjects) == 1 && subjectIsLiteral(cfg.Subjects[0])) { + if ofis, _ := ioutil.ReadDir(odir); len(ofis) > 0 { + fs.genFilterIndex() + } + } + } + // Write our meta data iff does not exist. meta := path.Join(fcfg.StoreDir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) { @@ -663,6 +677,7 @@ func (fs *fileStore) recoverMsgs() error { fs.startAgeChk() fs.expireMsgsLocked() } + return nil } @@ -700,6 +715,108 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { return 0 } +type seqSlice []uint64 + +func (x seqSlice) Len() int { return len(x) } +func (x seqSlice) Less(i, j int) bool { return x[i] < x[j] } +func (x seqSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] } + +func (x seqSlice) Search(n uint64) int { + return sort.Search(len(x), func(i int) bool { return x[i] >= n }) +} + +type simpleState struct { + msgs, first, last uint64 +} + +// This will generate an index for us on startup to determine num pending for +// filtered consumers easier. +func (fs *fileStore) genFilterIndex() { + fs.mu.Lock() + defer fs.mu.Unlock() + + fsi := make(map[string]seqSlice) + + for _, mb := range fs.blks { + mb.loadMsgs() + mb.mu.Lock() + fseq, lseq := mb.first.seq, mb.last.seq + for seq := fseq; seq <= lseq; seq++ { + if sm, err := mb.cacheLookupWithLock(seq); sm != nil && err == nil { + fsi[sm.subj] = append(fsi[sm.subj], seq) + } + } + // Expire this cache before moving on. + mb.llts = 0 + mb.expireCacheLocked() + mb.mu.Unlock() + } + + fs.fsi = fsi + fs.fsis = &simpleState{fs.state.Msgs, fs.state.FirstSeq, fs.state.LastSeq} +} + +// Clears out the filter index. +func (fs *fileStore) clearFilterIndex() { + fs.mu.Lock() + fs.fsi, fs.fsis = nil, nil + fs.mu.Unlock() +} + +// Fetch our num filtered pending from our index. +// Lock should be held. +func (fs *fileStore) getNumFilteredPendingFromIndex(sseq uint64, subj string) (uint64, error) { + cstate := simpleState{fs.state.Msgs, fs.state.FirstSeq, fs.state.LastSeq} + if fs.fsis == nil || *fs.fsis != cstate { + fs.fsi, fs.fsis = nil, nil + return 0, errors.New("state changed, index not valid") + } + var total uint64 + for tsubj, seqs := range fs.fsi { + if subjectIsSubsetMatch(tsubj, subj) { + total += uint64(len(seqs[seqs.Search(sseq):])) + } + } + return total, nil +} + +// Returns number of messages matching the subject starting at sequence sseq. +func (fs *fileStore) NumFilteredPending(sseq uint64, subj string) (total uint64) { + fs.mu.RLock() + lseq := fs.state.LastSeq + if sseq < fs.state.FirstSeq { + sseq = fs.state.FirstSeq + } + if fs.fsi != nil { + if np, err := fs.getNumFilteredPendingFromIndex(sseq, subj); err == nil { + fs.mu.RUnlock() + return np + } + } + fs.mu.RUnlock() + + if subj == _EMPTY_ { + if sseq <= lseq { + return lseq - sseq + } + return 0 + } + + var eq func(string, string) bool + if subjectHasWildcard(subj) { + eq = subjectIsSubsetMatch + } else { + eq = func(a, b string) bool { return a == b } + } + + for seq := sseq; seq <= lseq; seq++ { + if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subj) { + total++ + } + } + return total +} + // RegisterStorageUpdates registers a callback for updates to storage changes. // It will present number of messages and bytes as a signed integer and an // optional sequence number of the message if a single. @@ -886,6 +1003,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in fs.startAgeChk() } + // If we had an index cache wipe that out. + if fs.fsi != nil { + fs.fsi, fs.fsis = nil, nil + } + return nil } @@ -1119,6 +1241,11 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { fs.state.Msgs-- fs.state.Bytes -= msz + // If we had an index cache wipe that out. + if fs.fsi != nil { + fs.fsi, fs.fsis = nil, nil + } + // Now local mb updates. mb.msgs-- mb.bytes -= msz @@ -2407,7 +2534,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) { // Check to see if we are the last seq for this message block and are doing // a linear scan. If that is true and we are not the last message block we can - // expire try to expire the cache. + // try to expire the cache. mb.mu.RLock() shouldTryExpire := mb != lmb && seq == mb.last.seq && mb.llseq == seq-1 mb.mu.RUnlock() diff --git a/server/jetstream.go b/server/jetstream.go index 95bb56f5..f47fc7d3 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -893,6 +893,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Warnf(" Error restoring Consumer state: %v", err) } } + mset.clearFilterIndex() } // Make sure to cleanup and old remaining snapshots. diff --git a/server/memstore.go b/server/memstore.go index fef1a3cc..9566fbc2 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -222,6 +222,37 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { return uint64(index) + ms.state.FirstSeq } +// Returns number of messages matching the subject starting at sequence sseq. +func (ms *memStore) NumFilteredPending(sseq uint64, subj string) (total uint64) { + ms.mu.RLock() + defer ms.mu.RUnlock() + + if sseq < ms.state.FirstSeq { + sseq = ms.state.FirstSeq + } + + if subj == _EMPTY_ { + if sseq <= ms.state.LastSeq { + return ms.state.LastSeq - sseq + } + return 0 + } + + var eq func(string, string) bool + if subjectHasWildcard(subj) { + eq = subjectIsSubsetMatch + } else { + eq = func(a, b string) bool { return a == b } + } + + for seq := sseq; seq <= ms.state.LastSeq; seq++ { + if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subj) { + total++ + } + } + return total +} + // Will check the msg limit and drop firstSeq msg if needed. // Lock should be held. func (ms *memStore) enforceMsgLimit() { diff --git a/server/store.go b/server/store.go index 90d0f1eb..69a54115 100644 --- a/server/store.go +++ b/server/store.go @@ -75,6 +75,7 @@ type StreamStore interface { Compact(seq uint64) (uint64, error) Truncate(seq uint64) error GetSeqFromTime(t time.Time) uint64 + NumFilteredPending(sseq uint64, subject string) uint64 State() StreamState FastState(*StreamState) Type() StorageType diff --git a/server/stream.go b/server/stream.go index cb25bdda..1089a294 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2083,6 +2083,16 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { return nil } +// Clears out any filtered index from filestores. +func (mset *stream) clearFilterIndex() { + mset.mu.Lock() + defer mset.mu.Unlock() + + if fs, ok := mset.store.(*fileStore); ok { + fs.clearFilterIndex() + } +} + // Called for any updates to the underlying stream. We pass through the bytes to the // jetstream account. We do local processing for stream pending for consumers, but only // for removals.