From 24c2f3b452c5db6bc79c4504d5538a7bf1ed41ef Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 24 Feb 2023 17:22:18 -0800 Subject: [PATCH 1/3] Improved performance of subjects details for stream info. This version avoids all disk IO in the filestore version. Signed-off-by: Derek Collison --- server/filestore.go | 35 +++++++++++++++-- server/filestore_test.go | 80 +++++++++++++++++++++++++++++++++++++++ server/jetstream_api.go | 68 +++++++++++++++++---------------- server/memstore.go | 27 ++++++++++++++ server/memstore_test.go | 81 ++++++++++++++++++++++++++++++++++++++++ server/store.go | 3 +- 6 files changed, 257 insertions(+), 37 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index f5af9eec..869eb8af 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1746,10 +1746,12 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { if !isAll { wc := subjectHasWildcard(filter) // Do start - if mb := fs.bim[start]; mb != nil { + mb := fs.bim[start] + if mb != nil { _, f, _ := mb.filteredPending(filter, wc, 0) ss.First = f - } else { + } + if ss.First == 0 { // This is a miss. This can happen since psi.fblk is lazy, but should be very rare. for i := start + 1; i <= stop; i++ { mb := fs.bim[i] @@ -1763,7 +1765,7 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { } } // Now last - if mb := fs.bim[stop]; mb != nil { + if mb = fs.bim[stop]; mb != nil { _, _, l := mb.filteredPending(filter, wc, 0) ss.Last = l } @@ -1826,6 +1828,33 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { return fss } +// SubjectsTotal return message totals per subject. +func (fs *fileStore) SubjectsTotals(filterSubject string) map[string]uint64 { + fs.mu.RLock() + defer fs.mu.RUnlock() + + if len(fs.psim) == 0 { + return nil + } + + tsa := [32]string{} + fsa := [32]string{} + fts := tokenizeSubjectIntoSlice(fsa[:0], filterSubject) + isAll := filterSubject == _EMPTY_ || filterSubject == fwcs + + fst := make(map[string]uint64) + for subj, psi := range fs.psim { + if isAll { + fst[subj] = psi.total + } else { + if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) { + fst[subj] = psi.total + } + } + } + return fst +} + // RegisterStorageUpdates registers a callback for updates to storage changes. // It will present number of messages and bytes as a signed integer and an // optional sequence number of the message if a single. diff --git a/server/filestore_test.go b/server/filestore_test.go index 33495630..718d90a8 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5274,3 +5274,83 @@ func TestFileStoreOnlyWritePerSubjectInfoOnExpireWithUpdate(t *testing.T) { require_False(t, needsUpdate()) }) } + +func TestFileStoreSubjectsTotals(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fs, err := newFileStore( + fcfg, + StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + fmap := make(map[int]int) + bmap := make(map[int]int) + + var m map[int]int + var ft string + + for i := 0; i < 10_000; i++ { + // Flip coin for prefix + if rand.Intn(2) == 0 { + ft, m = "foo", fmap + } else { + ft, m = "bar", bmap + } + dt := rand.Intn(100) + subj := fmt.Sprintf("%s.%d", ft, dt) + m[dt]++ + + _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + // Now test SubjectsTotal + for dt, total := range fmap { + subj := fmt.Sprintf("foo.%d", dt) + m := fs.SubjectsTotals(subj) + if m[subj] != uint64(total) { + t.Fatalf("Expected %q to have %d total, got %d", subj, total, m[subj]) + } + } + + // Check fmap. + if st := fs.SubjectsTotals("foo.*"); len(st) != len(fmap) { + t.Fatalf("Expected %d subjects for %q, got %d", len(fmap), "foo.*", len(st)) + } else { + expected := 0 + for _, n := range fmap { + expected += n + } + received := uint64(0) + for _, n := range st { + received += n + } + if received != uint64(expected) { + t.Fatalf("Expected %d total but got %d", expected, received) + } + } + + // Check bmap. + if st := fs.SubjectsTotals("bar.*"); len(st) != len(bmap) { + t.Fatalf("Expected %d subjects for %q, got %d", len(bmap), "bar.*", len(st)) + } else { + expected := 0 + for _, n := range bmap { + expected += n + } + received := uint64(0) + for _, n := range st { + received += n + } + if received != uint64(expected) { + t.Fatalf("Expected %d total but got %d", expected, received) + } + } + + // All with pwc match. + if st, expected := fs.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected { + t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) + } + }) +} diff --git a/server/jetstream_api.go b/server/jetstream_api.go index c14106c3..cfe06442 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1842,42 +1842,44 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s // Check if they have asked for subject details. if subjects != _EMPTY_ { - if mss := mset.store.SubjectsState(subjects); len(mss) > 0 { - // As go iterates over map in a non-consistent order, no choice but to buffer it a slice - - buffer := make([]string, 0, len(mss)) - for subj := range mss { - buffer = append(buffer, subj) - } - - // Sort it - sort.Strings(buffer) - - if offset > len(buffer) { - offset = len(buffer) - } - - end := offset + JSMaxSubjectDetails - if end > len(buffer) { - end = len(buffer) - } - - actualSize := end - offset - var sd map[string]uint64 - - if actualSize > 0 { - sd = make(map[string]uint64, actualSize) - for _, ss := range buffer[offset:end] { - sd[ss] = mss[ss].Msgs - } - } - - resp.StreamInfo.State.Subjects = sd + st := mset.store.SubjectsTotals(subjects) + if lst := len(st); lst > 0 { + // Common for both cases. resp.Offset = offset resp.Limit = JSMaxSubjectDetails - resp.Total = len(mss) - } + resp.Total = lst + if offset == 0 && lst <= JSMaxSubjectDetails { + resp.StreamInfo.State.Subjects = st + } else { + // Here we have to filter list due to offset or maximum constraints. + subjs := make([]string, 0, len(st)) + for subj := range st { + subjs = append(subjs, subj) + } + // Sort it + sort.Strings(subjs) + + if offset > len(subjs) { + offset = len(subjs) + } + + end := offset + JSMaxSubjectDetails + if end > len(subjs) { + end = len(subjs) + } + actualSize := end - offset + var sd map[string]uint64 + + if actualSize > 0 { + sd = make(map[string]uint64, actualSize) + for _, ss := range subjs[offset:end] { + sd[ss] = st[ss] + } + } + resp.StreamInfo.State.Subjects = sd + } + } } // Check for out of band catchups. if mset.hasCatchupPeers() { diff --git a/server/memstore.go b/server/memstore.go index ebcd8085..7adb59e7 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -375,6 +375,33 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { return fss } +// SubjectsTotal return message totals per subject. +func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 { + ms.mu.RLock() + defer ms.mu.RUnlock() + + if len(ms.fss) == 0 { + return nil + } + + tsa := [32]string{} + fsa := [32]string{} + fts := tokenizeSubjectIntoSlice(fsa[:0], filterSubject) + isAll := filterSubject == _EMPTY_ || filterSubject == fwcs + + fst := make(map[string]uint64) + for subj, ss := range ms.fss { + if isAll { + fst[subj] = ss.Msgs + } else { + if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) { + fst[subj] = ss.Msgs + } + } + } + return fst +} + // Will check the msg limit for this tracked subject. // Lock should be held. func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) { diff --git a/server/memstore_test.go b/server/memstore_test.go index fa3a5d12..db09633e 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -16,6 +16,7 @@ package server import ( "bytes" "fmt" + "math/rand" "reflect" "testing" "time" @@ -520,3 +521,83 @@ func TestMemStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) { state := ms.State() require_True(t, state.NumSubjects == 500) } + +func TestMemStoreSubjectsTotals(t *testing.T) { + cfg := &StreamConfig{ + Name: "TEST", + Storage: MemoryStorage, + Subjects: []string{"*.*"}, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + + fmap := make(map[int]int) + bmap := make(map[int]int) + + var m map[int]int + var ft string + + for i := 0; i < 10_000; i++ { + // Flip coin for prefix + if rand.Intn(2) == 0 { + ft, m = "foo", fmap + } else { + ft, m = "bar", bmap + } + dt := rand.Intn(100) + subj := fmt.Sprintf("%s.%d", ft, dt) + m[dt]++ + + _, _, err := ms.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + // Now test SubjectsTotal + for dt, total := range fmap { + subj := fmt.Sprintf("foo.%d", dt) + m := ms.SubjectsTotals(subj) + if m[subj] != uint64(total) { + t.Fatalf("Expected %q to have %d total, got %d", subj, total, m[subj]) + } + } + + // Check fmap. + if st := ms.SubjectsTotals("foo.*"); len(st) != len(fmap) { + t.Fatalf("Expected %d subjects for %q, got %d", len(fmap), "foo.*", len(st)) + } else { + expected := 0 + for _, n := range fmap { + expected += n + } + received := uint64(0) + for _, n := range st { + received += n + } + if received != uint64(expected) { + t.Fatalf("Expected %d total but got %d", expected, received) + } + } + + // Check bmap. + if st := ms.SubjectsTotals("bar.*"); len(st) != len(bmap) { + t.Fatalf("Expected %d subjects for %q, got %d", len(bmap), "bar.*", len(st)) + } else { + expected := 0 + for _, n := range bmap { + expected += n + } + received := uint64(0) + for _, n := range st { + received += n + } + if received != uint64(expected) { + t.Fatalf("Expected %d total but got %d", expected, received) + } + } + + // All with pwc match. + if st, expected := ms.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected { + t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) + } + +} diff --git a/server/store.go b/server/store.go index 6c90c6a9..77477ba5 100644 --- a/server/store.go +++ b/server/store.go @@ -1,4 +1,4 @@ -// Copyright 2019-2022 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -95,6 +95,7 @@ type StreamStore interface { GetSeqFromTime(t time.Time) uint64 FilteredState(seq uint64, subject string) SimpleState SubjectsState(filterSubject string) map[string]SimpleState + SubjectsTotals(filterSubject string) map[string]uint64 State() StreamState FastState(*StreamState) Type() StorageType From daacbf5580b8683505b69f6b34dd7cc4dfdde744 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 25 Feb 2023 12:30:14 -0800 Subject: [PATCH 2/3] Added optimized store NumPending() call. Optimized and fixed a bug in filestore filteredPending(). Optimized memstore FilteredState(). Added comprehensive tests for NumPending() and FilteredState(). Signed-off-by: Derek Collison --- server/consumer.go | 18 +- server/filestore.go | 429 ++++++++++++++++++++++++++++----------- server/filestore_test.go | 281 ++++++++++++++++++------- server/memstore.go | 171 ++++++++++++---- server/memstore_test.go | 122 ++++++++++- server/store.go | 1 + 6 files changed, 783 insertions(+), 239 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index d53a30a7..cb6647a3 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3501,21 +3501,11 @@ func (o *consumer) streamNumPendingLocked() uint64 { func (o *consumer) streamNumPending() uint64 { if o.mset == nil || o.mset.store == nil { o.npc, o.npf = 0, 0 - } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { - o.npc, o.npf = 0, 0 - for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) { - if o.sseq <= ss.Last { - o.npc++ - if ss.Last > o.npf { - // Set our num pending sequence floor. - o.npf = ss.Last - } - } - } } else { - ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject) - // Set our num pending and sequence floor. - o.npc, o.npf = int64(ss.Msgs), ss.Last + isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject + // Set our num pending and valid sequence floor. + npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject) + o.npc, o.npf = int64(npc), npf } return o.numPending() diff --git a/server/filestore.go b/server/filestore.go index 869eb8af..00fa70ab 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -71,6 +71,7 @@ type StoreCipher int const ( ChaCha StoreCipher = iota AES + NoCipher ) func (cipher StoreCipher) String() string { @@ -79,6 +80,8 @@ func (cipher StoreCipher) String() string { return "ChaCha20-Poly1305" case AES: return "AES-GCM" + case NoCipher: + return "None" default: return "Unknown StoreCipher" } @@ -1526,31 +1529,15 @@ func (mb *msgBlock) filteredPending(subj string, wc bool, seq uint64) (total, fi // This will traverse a message block and generate the filtered pending. // Lock should be held. -func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, seq uint64) (total, first, last uint64) { +func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (total, first, last uint64) { isAll := filter == _EMPTY_ || filter == fwcs // First check if we can optimize this part. // This means we want all and the starting sequence was before this block. - if isAll && seq <= mb.first.seq { + if isAll && sseq <= mb.first.seq { return mb.msgs, mb.first.seq, mb.last.seq } - // Make sure we have fss loaded. - mb.ensurePerSubjectInfoLoaded() - - subs := []string{filter} - // If we have a wildcard match against all tracked subjects we know about. - if wc || isAll { - subs = subs[:0] - for subj := range mb.fss { - if isAll || subjectIsSubsetMatch(subj, filter) { - subs = append(subs, subj) - } - } - } - // If we load the cache for a linear scan we want to expire that cache upon exit. - var shouldExpire bool - update := func(ss *SimpleState) { total += ss.Msgs if first == 0 || ss.First < first { @@ -1561,98 +1548,70 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, seq uint64) (t } } - for i, subj := range subs { - // If the starting seq is less then or equal that means we want all and we do not need to load any messages. - ss := mb.fss[subj] - if ss == nil || seq > ss.Last { - continue + // Make sure we have fss loaded. + mb.ensurePerSubjectInfoLoaded() + + tsa := [32]string{} + fsa := [32]string{} + fts := tokenizeSubjectIntoSlice(fsa[:0], filter) + + // 1. See if we match any subs from fss. + // 2. If we match and the sseq is past ss.Last then we can use meta only. + // 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending. + + isMatch := func(subj string) bool { + if !wc { + return subj == filter } - - // If the seq we are starting at is less then the simple state's first sequence we can just return the total msgs. - if seq <= ss.First { - update(ss) - continue - } - - // We may need to scan this one block since we have a partial set to consider. - // If we are all inclusive then we can do simple math and avoid the scan. - if allInclusive := ss.Msgs == ss.Last-ss.First+1; allInclusive { - update(ss) - // Make sure to compensate for the diff from the head. - if seq > ss.First { - first, total = seq, total-(seq-ss.First) - } - continue - } - - // We need to scan this block to compute the correct number of pending for this block. - // We want to only do this once so we will adjust subs and test against them all here. - - if mb.cacheNotLoaded() { - mb.loadMsgsWithLock() - shouldExpire = true - } - - var all, lseq uint64 - // Grab last applicable sequence as a union of all applicable subjects. - for _, subj := range subs[i:] { - if ss := mb.fss[subj]; ss != nil { - all += ss.Msgs - if ss.Last > lseq { - lseq = ss.Last - } - } - } - numScanIn, numScanOut := lseq-seq, seq-mb.first.seq - - var smv StoreMsg - - isMatch := func(seq uint64) bool { - if sm, _ := mb.cacheLookup(seq, &smv); sm != nil { - if len(subs) == 1 && sm.subj == subs[0] { - return true - } - for _, subj := range subs { - if sm.subj == subj { - return true - } - } - } - return false - } - - // Decide on whether to scan those included or those excluded based on which scan amount is less. - if numScanIn < numScanOut { - for tseq := seq; tseq <= lseq; tseq++ { - if isMatch(tseq) { - total++ - if first == 0 || tseq < first { - first = tseq - } - last = tseq - } - } - } else { - // Here its more efficient to scan the out nodes. - var discard uint64 - for tseq := mb.first.seq; tseq < seq; tseq++ { - if isMatch(tseq) { - discard++ - } - } - total += (all - discard) - // Now make sure we match our first - for tseq := seq; tseq <= lseq; tseq++ { - if isMatch(tseq) { - first = tseq - break - } - } - } - // We can bail since we scanned all remaining in this pass. - break + tts := tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tts, fts) } + var havePartial bool + for subj, ss := range mb.fss { + if isAll || isMatch(subj) { + if sseq <= ss.First { + update(ss) + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + break + } + } + } + + // If we did not encounter any partials we can return here. + if !havePartial { + return total, first, last + } + + // If we are here we need to scan the msgs. + // Clear what we had. + total, first, last = 0, 0, 0 + + // If we load the cache for a linear scan we want to expire that cache upon exit. + var shouldExpire bool + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + + var smv StoreMsg + for seq := sseq; seq <= mb.last.seq; seq++ { + sm, _ := mb.cacheLookup(seq, &smv) + if sm == nil { + continue + } + if isAll || isMatch(sm.subj) { + total++ + if first == 0 || seq < first { + first = seq + } + if seq > last { + last = seq + } + } + } // If we loaded this block for this operation go ahead and expire it here. if shouldExpire { mb.tryForceExpireCacheLocked() @@ -1828,8 +1787,231 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { return fss } +// NumPending will return the number of pending messages matching the filter subject starting at sequence. +// Optimized for stream num pending calculations for consumers. +func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + // This can always be last for these purposes. + validThrough = fs.state.LastSeq + + if fs.state.Msgs == 0 || sseq > fs.state.LastSeq { + return 0, validThrough + } + + // Track starting for both block for the sseq and staring block that matches any subject. + var seqStart, subjStart int + + // See if we need to figure out starting block per sseq. + if sseq > fs.state.FirstSeq { + seqStart, _ = fs.selectMsgBlockWithIndex(sseq) + } + + tsa := [32]string{} + fsa := [32]string{} + fts := tokenizeSubjectIntoSlice(fsa[:0], filter) + isAll := filter == _EMPTY_ || filter == fwcs + wc := subjectHasWildcard(filter) + + isMatch := func(subj string) bool { + if isAll { + return true + } + if !wc { + return subj == filter + } + tts := tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tts, fts) + } + + // If we would need to scan more from the beginning, revert back to calculating directly here. + // TODO(dlc) - Redo properly with sublists etc for subject-based filtering. + if lastPerSubject || seqStart >= (len(fs.blks)/2) { + // If we need to track seen for last per subject. + var seen map[string]bool + if lastPerSubject { + seen = make(map[string]bool) + } + + for i := seqStart; i < len(fs.blks); i++ { + mb := fs.blks[i] + mb.mu.Lock() + var t uint64 + if isAll && sseq <= mb.first.seq { + if lastPerSubject { + for subj := range mb.fss { + if !seen[subj] { + total++ + seen[subj] = true + } + } + } else { + total += mb.msgs + } + mb.mu.Unlock() + continue + } + + // If we are here we need to at least scan the subject fss. + // Make sure we have fss loaded. + mb.ensurePerSubjectInfoLoaded() + var havePartial bool + for subj, ss := range mb.fss { + if !seen[subj] && isMatch(subj) { + if lastPerSubject { + // Can't have a partials with last by subject. + if sseq <= ss.Last { + t++ + seen[subj] = true + } + } else { + if sseq <= ss.First { + t += ss.Msgs + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + break + } + } + } + } + // See if we need to scan msgs here. + if havePartial { + // Clear on partial. + t = 0 + // If we load the cache for a linear scan we want to expire that cache upon exit. + var shouldExpire bool + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + var smv StoreMsg + for seq := sseq; seq <= mb.last.seq; seq++ { + if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && (isAll || isMatch(sm.subj)) { + t++ + } + } + // If we loaded this block for this operation go ahead and expire it here. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + } + mb.mu.Unlock() + total += t + } + return total, validThrough + } + + // If we are here its better to calculate totals from psim and adjust downward by scanning less blocks. + // TODO(dlc) - Eventually when sublist uses generics, make this sublist driven instead. + start := uint32(math.MaxUint32) + for subj, psi := range fs.psim { + if isMatch(subj) { + if lastPerSubject { + total++ + // Keep track of start index for this subject. + // Use last block in this case. + if psi.lblk < start { + start = psi.lblk + } + } else { + total += psi.total + // Keep track of start index for this subject. + if psi.fblk < start { + start = psi.fblk + } + } + } + } + // See if we were asked for all, if so we are done. + if sseq <= fs.state.FirstSeq { + return total, validThrough + } + + // If we are here we need to calculate partials for the first blocks. + subjStart = int(start) + firstSubjBlk := fs.bim[uint32(subjStart)] + var firstSubjBlkFound bool + var smv StoreMsg + + // Adjust in case not found. + if firstSubjBlk == nil { + firstSubjBlkFound = true + } + + // Track how many we need to adjust against the total. + var adjust uint64 + + for i := 0; i <= seqStart; i++ { + mb := fs.blks[i] + + // We can skip blks if we know they are below the first one that has any subject matches. + if !firstSubjBlkFound { + if mb == firstSubjBlk { + firstSubjBlkFound = true + } else { + continue + } + } + + // We need to scan this block. + var shouldExpire bool + mb.mu.Lock() + // Check if we should include all of this block in adjusting. If so work with metadata. + if sseq > mb.last.seq { + // We need to adjust for all matches in this block. + // We will scan fss state vs messages themselves. + // Make sure we have fss loaded. + mb.ensurePerSubjectInfoLoaded() + for subj, ss := range mb.fss { + if isMatch(subj) { + if lastPerSubject { + adjust++ + } else { + adjust += ss.Msgs + } + } + } + } else { + // This is the last block. We need to scan per message here. + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + mb.mu.Unlock() + return 0, 0 + } + shouldExpire = true + } + + var last = mb.last.seq + if sseq < last { + last = sseq + } + for seq := mb.first.seq; seq < last; seq++ { + sm, _ := mb.cacheLookup(seq, &smv) + if sm == nil { + continue + } + // Check if it matches our filter. + if isMatch(sm.subj) && sm.seq < sseq { + adjust++ + } + } + } + // If we loaded the block try to force expire. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + } + // Make final adjustment. + total -= adjust + + return total, validThrough +} + // SubjectsTotal return message totals per subject. -func (fs *fileStore) SubjectsTotals(filterSubject string) map[string]uint64 { +func (fs *fileStore) SubjectsTotals(filter string) map[string]uint64 { fs.mu.RLock() defer fs.mu.RUnlock() @@ -1839,17 +2021,22 @@ func (fs *fileStore) SubjectsTotals(filterSubject string) map[string]uint64 { tsa := [32]string{} fsa := [32]string{} - fts := tokenizeSubjectIntoSlice(fsa[:0], filterSubject) - isAll := filterSubject == _EMPTY_ || filterSubject == fwcs + fts := tokenizeSubjectIntoSlice(fsa[:0], filter) + isAll := filter == _EMPTY_ || filter == fwcs + wc := subjectHasWildcard(filter) + + isMatch := func(subj string) bool { + if !wc { + return subj == filter + } + tts := tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tts, fts) + } fst := make(map[string]uint64) for subj, psi := range fs.psim { - if isAll { + if isAll || isMatch(subj) { fst[subj] = psi.total - } else { - if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) { - fst[subj] = psi.total - } } } return fst @@ -3618,14 +3805,20 @@ func (fs *fileStore) syncBlocks() { // Return nil if not in the set. // Read lock should be held. func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { + _, mb := fs.selectMsgBlockWithIndex(seq) + return mb +} + +func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) { // Check for out of range. if seq < fs.state.FirstSeq || seq > fs.state.LastSeq { - return nil + return -1, nil } // Starting index, defaults to beginning. si := 0 + // TODO(dlc) - Use new AVL and make this real for anything beyond certain size. // Max threshold before we probe for a starting block to start our linear search. const maxl = 256 if nb := len(fs.blks); nb > maxl { @@ -3643,11 +3836,11 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { for i := si; i < len(fs.blks); i++ { mb := fs.blks[i] if seq <= atomic.LoadUint64(&mb.last.seq) { - return mb + return i, mb } } - return nil + return -1, nil } // Select the message block where this message should be found. diff --git a/server/filestore_test.go b/server/filestore_test.go index 718d90a8..92ccfd2f 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -39,6 +39,7 @@ import ( func testFileStoreAllPermutations(t *testing.T, fn func(t *testing.T, fcfg FileStoreConfig)) { for _, fcfg := range []FileStoreConfig{ + {Cipher: NoCipher}, {Cipher: ChaCha}, {Cipher: AES}, } { @@ -800,7 +801,9 @@ func TestFileStoreCompact(t *testing.T) { } return h.Sum(nil), nil } - + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err := newFileStoreWithCreated( fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, @@ -989,6 +992,9 @@ func TestFileStoreStreamTruncate(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err := newFileStoreWithCreated( fcfg, @@ -3460,6 +3466,9 @@ func TestFileStoreSparseCompaction(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err = newFileStoreWithCreated(fcfg, cfg, time.Now(), prf) if err != nil { @@ -3772,6 +3781,9 @@ func TestFileStoreCompactReclaimHeadSpace(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err = newFileStoreWithCreated( fcfg, @@ -4024,6 +4036,9 @@ func TestFileStoreShortIndexWriteBug(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } created := time.Now() @@ -4087,6 +4102,9 @@ func TestFileStoreDoubleCompactWithWriteInBetweenEncryptedBug(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err := newFileStoreWithCreated( fcfg, @@ -5276,81 +5294,204 @@ func TestFileStoreOnlyWritePerSubjectInfoOnExpireWithUpdate(t *testing.T) { } func TestFileStoreSubjectsTotals(t *testing.T) { - testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { - fs, err := newFileStore( - fcfg, - StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, Storage: FileStorage}, - ) + // No need for all permutations here. + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + fmap := make(map[int]int) + bmap := make(map[int]int) + + var m map[int]int + var ft string + + for i := 0; i < 10_000; i++ { + // Flip coin for prefix + if rand.Intn(2) == 0 { + ft, m = "foo", fmap + } else { + ft, m = "bar", bmap + } + dt := rand.Intn(100) + subj := fmt.Sprintf("%s.%d", ft, dt) + m[dt]++ + + _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) require_NoError(t, err) - defer fs.Stop() + } - fmap := make(map[int]int) - bmap := make(map[int]int) - - var m map[int]int - var ft string - - for i := 0; i < 10_000; i++ { - // Flip coin for prefix - if rand.Intn(2) == 0 { - ft, m = "foo", fmap - } else { - ft, m = "bar", bmap - } - dt := rand.Intn(100) - subj := fmt.Sprintf("%s.%d", ft, dt) - m[dt]++ - - _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) - require_NoError(t, err) + // Now test SubjectsTotal + for dt, total := range fmap { + subj := fmt.Sprintf("foo.%d", dt) + m := fs.SubjectsTotals(subj) + if m[subj] != uint64(total) { + t.Fatalf("Expected %q to have %d total, got %d", subj, total, m[subj]) } + } - // Now test SubjectsTotal - for dt, total := range fmap { - subj := fmt.Sprintf("foo.%d", dt) - m := fs.SubjectsTotals(subj) - if m[subj] != uint64(total) { - t.Fatalf("Expected %q to have %d total, got %d", subj, total, m[subj]) - } + // Check fmap. + if st := fs.SubjectsTotals("foo.*"); len(st) != len(fmap) { + t.Fatalf("Expected %d subjects for %q, got %d", len(fmap), "foo.*", len(st)) + } else { + expected := 0 + for _, n := range fmap { + expected += n } + received := uint64(0) + for _, n := range st { + received += n + } + if received != uint64(expected) { + t.Fatalf("Expected %d total but got %d", expected, received) + } + } - // Check fmap. - if st := fs.SubjectsTotals("foo.*"); len(st) != len(fmap) { - t.Fatalf("Expected %d subjects for %q, got %d", len(fmap), "foo.*", len(st)) - } else { - expected := 0 - for _, n := range fmap { - expected += n - } - received := uint64(0) - for _, n := range st { - received += n - } - if received != uint64(expected) { - t.Fatalf("Expected %d total but got %d", expected, received) - } + // Check bmap. + if st := fs.SubjectsTotals("bar.*"); len(st) != len(bmap) { + t.Fatalf("Expected %d subjects for %q, got %d", len(bmap), "bar.*", len(st)) + } else { + expected := 0 + for _, n := range bmap { + expected += n } + received := uint64(0) + for _, n := range st { + received += n + } + if received != uint64(expected) { + t.Fatalf("Expected %d total but got %d", expected, received) + } + } - // Check bmap. - if st := fs.SubjectsTotals("bar.*"); len(st) != len(bmap) { - t.Fatalf("Expected %d subjects for %q, got %d", len(bmap), "bar.*", len(st)) - } else { - expected := 0 - for _, n := range bmap { - expected += n - } - received := uint64(0) - for _, n := range st { - received += n - } - if received != uint64(expected) { - t.Fatalf("Expected %d total but got %d", expected, received) - } - } - - // All with pwc match. - if st, expected := fs.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected { - t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) - } - }) + // All with pwc match. + if st, expected := fs.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected { + t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) + } +} + +func TestFileStoreNumPending(t *testing.T) { + // No need for all permutations here. + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + BlockSize: 2 * 1024, // Create many blocks on purpose. + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*.*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + tokens := []string{"foo", "bar", "baz"} + genSubj := func() string { + return fmt.Sprintf("%s.%s.%s.%s", + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + ) + } + + for i := 0; i < 50_000; i++ { + subj := genSubj() + _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + state := fs.State() + + // Scan one by one for sanity check against other calculations. + sanityCheck := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := fs.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + ss.Last = seq + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + } + } + return ss + } + + check := func(sseq uint64, filter string) { + t.Helper() + np, lvs := fs.NumPending(sseq, filter, false) + ss := fs.FilteredState(sseq, filter) + sss := sanityCheck(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + if ss != sss { + t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss) + } + } + + sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + seen := make(map[string]bool) + for seq := state.LastSeq; seq >= sseq; seq-- { + sm, err := fs.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + if ss.Last == 0 { + ss.Last = seq + } + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + seen[sm.subj] = true + } + } + return ss + } + + checkLastOnly := func(sseq uint64, filter string) { + t.Helper() + np, lvs := fs.NumPending(sseq, filter, true) + ss := sanityCheckLastOnly(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + } + + startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999} + checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"} + + for _, filter := range checkSubs { + for _, start := range startSeqs { + check(start, filter) + checkLastOnly(start, filter) + } + } } diff --git a/server/memstore.go b/server/memstore.go index 7adb59e7..c36acd93 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -289,10 +289,10 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { ms.mu.RLock() defer ms.mu.RUnlock() - return ms.filteredStateLocked(sseq, subj) + return ms.filteredStateLocked(sseq, subj, false) } -func (ms *memStore) filteredStateLocked(sseq uint64, subj string) SimpleState { +func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubject bool) SimpleState { var ss SimpleState if sseq < ms.state.FirstSeq { @@ -304,49 +304,139 @@ func (ms *memStore) filteredStateLocked(sseq uint64, subj string) SimpleState { return ss } - // Empty same as everything. - if subj == _EMPTY_ { - subj = fwcs + isAll := filter == _EMPTY_ || filter == fwcs + + // First check if we can optimize this part. + // This means we want all and the starting sequence was before this block. + if isAll && sseq <= ms.state.FirstSeq { + total := ms.state.Msgs + if lastPerSubject { + total = uint64(len(ms.fss)) + } + return SimpleState{ + Msgs: total, + First: ms.state.FirstSeq, + Last: ms.state.LastSeq, + } } - wc := subjectHasWildcard(subj) - subs := []string{subj} - if wc { - subs = subs[:0] - for fsubj := range ms.fss { - if subjectIsSubsetMatch(fsubj, subj) { - subs = append(subs, fsubj) + tsa := [32]string{} + fsa := [32]string{} + fts := tokenizeSubjectIntoSlice(fsa[:0], filter) + wc := subjectHasWildcard(filter) + + // 1. See if we match any subs from fss. + // 2. If we match and the sseq is past ss.Last then we can use meta only. + // 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending. + + isMatch := func(subj string) bool { + if isAll { + return true + } + if !wc { + return subj == filter + } + tts := tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tts, fts) + } + + update := func(fss *SimpleState) { + msgs, first, last := fss.Msgs, fss.First, fss.Last + if lastPerSubject { + msgs, first = 1, last + } + ss.Msgs += msgs + if ss.First == 0 || first < ss.First { + ss.First = first + } + if last > ss.Last { + ss.Last = last + } + } + + var havePartial bool + // We will track start and end sequences as we go. + for subj, fss := range ms.fss { + if isMatch(subj) { + if sseq <= fss.First { + update(fss) + } else if sseq <= fss.Last { + // We matched but its a partial. + havePartial = true + // Don't break here, we will update to keep tracking last. + update(fss) } } } - fseq, lseq := ms.state.LastSeq, uint64(0) - for _, subj := range subs { - ss := ms.fss[subj] - if ss == nil { - continue - } - if ss.First < fseq { - fseq = ss.First - } - if ss.Last > lseq { - lseq = ss.Last - } - } - if fseq < sseq { - fseq = sseq + + // If we did not encounter any partials we can return here. + if !havePartial { + return ss } - // FIXME(dlc) - Optimize better like filestore. - eq := compareFn(subj) - for seq := fseq; seq <= lseq; seq++ { - if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subj) { - ss.Msgs++ - if ss.First == 0 { - ss.First = seq + // If we are here we need to scan the msgs. + // Capture first and last sequences for scan and then clear what we had. + first, last := ss.First, ss.Last + // To track if we decide to exclude and we need to calculate first. + var needScanFirst bool + if first < sseq { + first = sseq + needScanFirst = true + } + + // Now we want to check if it is better to scan inclusive and recalculate that way + // or leave and scan exclusive and adjust our totals. + // ss.Last is always correct here. + toScan, toExclude := last-first, first-ms.state.FirstSeq+ms.state.LastSeq-ss.Last + var seen map[string]bool + if lastPerSubject { + seen = make(map[string]bool) + } + if toScan < toExclude { + ss.Msgs, ss.First = 0, 0 + for seq := first; seq <= last; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) { + ss.Msgs++ + if ss.First == 0 { + ss.First = seq + } + if seen != nil { + seen[sm.subj] = true + } + } + } + } else { + // We will adjust from the totals above by scanning what we need to exclude. + ss.First = first + var adjust uint64 + for seq := ms.state.FirstSeq; seq < first; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) { + adjust++ + if seen != nil { + seen[sm.subj] = true + } + } + } + // Now do range at end. + for seq := last + 1; seq < ms.state.LastSeq; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) { + adjust++ + if seen != nil { + seen[sm.subj] = true + } + } + } + ss.Msgs -= adjust + if needScanFirst { + for seq := first; seq < last; seq++ { + if sm, ok := ms.msgs[seq]; ok && isMatch(sm.subj) { + ss.First = seq + break + } } - ss.Last = seq } } + return ss } @@ -402,6 +492,15 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 { return fst } +// NumPending will return the number of pending messages matching the filter subject starting at sequence. +func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) { + ms.mu.RLock() + defer ms.mu.RUnlock() + + ss := ms.filteredStateLocked(sseq, filter, lastPerSubject) + return ss.Msgs, ms.state.LastSeq +} + // Will check the msg limit for this tracked subject. // Lock should be held. func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) { @@ -745,7 +844,7 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error if subject == _EMPTY_ || subject == fwcs { sm, ok = ms.msgs[ms.state.LastSeq] - } else if ss := ms.filteredStateLocked(1, subject); ss.Msgs > 0 { + } else if ss := ms.filteredStateLocked(1, subject, true); ss.Msgs > 0 { sm, ok = ms.msgs[ss.Last] } if !ok || sm == nil { diff --git a/server/memstore_test.go b/server/memstore_test.go index db09633e..78adbe43 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -599,5 +599,125 @@ func TestMemStoreSubjectsTotals(t *testing.T) { if st, expected := ms.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected { t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) } - +} + +func TestMemStoreNumPending(t *testing.T) { + cfg := &StreamConfig{ + Name: "TEST", + Storage: MemoryStorage, + Subjects: []string{"*.*.*.*"}, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + + tokens := []string{"foo", "bar", "baz"} + genSubj := func() string { + return fmt.Sprintf("%s.%s.%s.%s", + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + ) + } + + for i := 0; i < 50_000; i++ { + subj := genSubj() + _, _, err := ms.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + state := ms.State() + + // Scan one by one for sanity check against other calculations. + sanityCheck := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := ms.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + ss.Last = seq + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + } + } + return ss + } + + check := func(sseq uint64, filter string) { + t.Helper() + np, lvs := ms.NumPending(sseq, filter, false) + ss := ms.FilteredState(sseq, filter) + sss := sanityCheck(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + if ss != sss { + t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss) + } + } + + sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + seen := make(map[string]bool) + for seq := state.LastSeq; seq >= sseq; seq-- { + sm, err := ms.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + if ss.Last == 0 { + ss.Last = seq + } + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + seen[sm.subj] = true + } + } + return ss + } + + checkLastOnly := func(sseq uint64, filter string) { + t.Helper() + np, lvs := ms.NumPending(sseq, filter, true) + ss := sanityCheckLastOnly(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + } + + startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999} + checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"} + + for _, filter := range checkSubs { + for _, start := range startSeqs { + check(start, filter) + checkLastOnly(start, filter) + } + } } diff --git a/server/store.go b/server/store.go index 77477ba5..e03ad10a 100644 --- a/server/store.go +++ b/server/store.go @@ -96,6 +96,7 @@ type StreamStore interface { FilteredState(seq uint64, subject string) SimpleState SubjectsState(filterSubject string) map[string]SimpleState SubjectsTotals(filterSubject string) map[string]uint64 + NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) State() StreamState FastState(*StreamState) Type() StorageType From 4fa0ea32c36e2ff790411bf03fa350737afff3b9 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 25 Feb 2023 19:07:27 -0800 Subject: [PATCH 3/3] [FIXED] If a truncate for a raft WAL failed we could spin. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 6 +++--- server/raft.go | 30 ++++++++++++++++++++---------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ed2e6d5f..00f935e1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1008,7 +1008,7 @@ func (js *jetStream) monitorCluster() { if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else { + } else if err != errNoSnapAvailable { s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } @@ -1887,7 +1887,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else { + } else if err != errNoSnapAvailable { s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } } @@ -4051,7 +4051,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else { + } else if err != errNoSnapAvailable { s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err) } } diff --git a/server/raft.go b/server/raft.go index eda9bd40..5b05798a 100644 --- a/server/raft.go +++ b/server/raft.go @@ -975,7 +975,6 @@ func (n *raft) InstallSnapshot(data []byte) error { // Remember our latest snapshot file. n.snapfile = sfile - if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) n.Unlock() @@ -1075,9 +1074,8 @@ func (n *raft) setupLastSnapshot() { n.pterm = snap.lastTerm n.commit = snap.lastIndex n.applied = snap.lastIndex - n.apply.push(&CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}}) - if _, err := n.wal.Compact(snap.lastIndex); err != nil { + if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) } } @@ -2737,20 +2735,31 @@ func (n *raft) createCatchup(ae *appendEntry) string { func (n *raft) truncateWAL(term, index uint64) { n.debug("Truncating and repairing WAL") + defer func() { + // Check to see if we invalidated any snapshots that might have held state + // from the entries we are truncating. + if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index { + os.Remove(n.snapfile) + n.snapfile = _EMPTY_ + } + }() + if err := n.wal.Truncate(index); err != nil { - n.setWriteErrLocked(err) + // If we get an invalid sequence, reset our wal all together. + if err == ErrInvalidSequence { + n.debug("Resetting WAL") + n.wal.Truncate(0) + index, n.pterm, n.pindex = 0, 0, 0 + } else { + n.warn("Error truncating WAL: %v", err) + n.setWriteErrLocked(err) + } return } // Set after we know we have truncated properly. n.pterm, n.pindex = term, index - // Check to see if we invalidated any snapshots that might have held state - // from the entries we are truncating. - if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index { - os.Remove(n.snapfile) - n.snapfile = _EMPTY_ - } } // Lock should be held @@ -3140,6 +3149,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error { } else { // Truncate back to our last known. n.truncateWAL(n.pterm, n.pindex) + n.cancelCatchup() } return errEntryStoreFailed }