From d48ccf4c5a62b40495fafc4a464740a94d4166af Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 17 Aug 2022 13:16:32 -0700 Subject: [PATCH 1/2] When filestore is used for raft layer do not attempt to track subject metadata. Signed-off-by: Derek Collison --- server/filestore.go | 30 +++++++++++----- server/filestore_test.go | 77 ++++++++++++++++++++++++++++++++++------ 2 files changed, 88 insertions(+), 19 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 64c2db53..c2fc3ebe 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -161,12 +161,13 @@ type msgBlock struct { cexp time.Duration ctmr *time.Timer werr error - loading bool - flusher bool dmap map[uint64]struct{} fch chan struct{} qch chan struct{} lchk [8]byte + loading bool + flusher bool + noSubj bool closed bool } @@ -646,6 +647,11 @@ const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, error) { mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire} + // Check if we should be monitoring subjects. + if len(fs.cfg.Subjects) == 0 { + mb.noSubj = true + } + mdir := filepath.Join(fs.fcfg.StoreDir, msgDir) mb.mfn = filepath.Join(mdir, fi.Name()) mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, index)) @@ -742,7 +748,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e // Quick sanity check here. // Note this only checks that the message blk file is not newer then this file, or is empty and we expect empty. if (mb.rbytes == 0 && mb.msgs == 0) || bytes.Equal(lchk[:], mb.lchk[:]) { - if mb.msgs > 0 && fs.psim != nil { + if mb.msgs > 0 && !mb.noSubj && fs.psim != nil { fs.populateGlobalPerSubjectInfo(mb) } fs.addMsgBlock(mb) @@ -754,7 +760,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e if ld, _ := mb.rebuildState(); ld != nil { fs.rebuildStateLocked(ld) } - if mb.msgs > 0 && fs.psim != nil { + if mb.msgs > 0 && !mb.noSubj && fs.psim != nil { fs.populateGlobalPerSubjectInfo(mb) } @@ -1784,8 +1790,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire} - // If we only have one subject registered we can optimize filtered lookups here. - if len(fs.cfg.Subjects) == 1 { + // Optimize lookups if we have zero or one subject. + if nsubj := len(fs.cfg.Subjects); nsubj == 0 { + mb.noSubj = true + } else if nsubj == 1 { mb.sfilter = fs.cfg.Subjects[0] } @@ -5196,9 +5204,9 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { // readPerSubjectInfo will attempt to restore the per subject information. func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error { - // Make sure we run the cache expire timer. - mb.llts = time.Now().UnixNano() - mb.startCacheExpireTimer() + if mb.noSubj { + return nil + } defer func() { if !hasLock { @@ -5213,6 +5221,10 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error { } }() + // Make sure we run the cache expire timer. + mb.llts = time.Now().UnixNano() + mb.startCacheExpireTimer() + buf, err := mb.loadPerSubjectInfo() // On failure re-generate. if err != nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index 33db1505..82da85e4 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -980,7 +980,7 @@ func TestFileStoreStreamTruncate(t *testing.T) { fs, err := newFileStoreWithCreated( FileStoreConfig{StoreDir: storeDir, BlockSize: 350}, - StreamConfig{Name: "zzz", Storage: FileStorage}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, time.Now(), prf, ) @@ -1037,7 +1037,7 @@ func TestFileStoreStreamTruncate(t *testing.T) { fs, err = newFileStoreWithCreated( FileStoreConfig{StoreDir: storeDir, BlockSize: 350}, - StreamConfig{Name: "zzz", Storage: FileStorage}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, time.Now(), prf, ) @@ -1059,7 +1059,10 @@ func TestFileStoreRemovePartialRecovery(t *testing.T) { storeDir := createDir(t, JetStreamStoreDir) defer removeDir(t, storeDir) - fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, + ) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1088,7 +1091,10 @@ func TestFileStoreRemovePartialRecovery(t *testing.T) { // Make sure we recover same state. fs.Stop() - fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + fs, err = newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, + ) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1104,7 +1110,10 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) { storeDir := createDir(t, JetStreamStoreDir) defer removeDir(t, storeDir) - fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, + ) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1145,7 +1154,10 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) { // Make sure we recover same state. fs.Stop() - fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + fs, err = newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, + ) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1765,7 +1777,7 @@ func TestFileStoreSnapshot(t *testing.T) { fs, err := newFileStore( FileStoreConfig{StoreDir: storeDir}, - StreamConfig{Name: "zzz", Storage: FileStorage}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, ) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1848,7 +1860,7 @@ func TestFileStoreSnapshot(t *testing.T) { fsr, err := newFileStore( FileStoreConfig{StoreDir: rstoreDir}, - StreamConfig{Name: "zzz", Storage: FileStorage}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, ) if err != nil { t.Fatalf("Error restoring from snapshot: %v", err) @@ -4149,7 +4161,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) { fs, err := newFileStore( FileStoreConfig{StoreDir: storeDir, BlockSize: 1024, CacheExpire: time.Second}, - StreamConfig{Name: "zzz", Storage: FileStorage, MaxMsgsPer: 1}, + StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 1}, ) require_NoError(t, err) defer fs.Stop() @@ -4173,7 +4185,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) { fs.Stop() fs, err = newFileStore( FileStoreConfig{StoreDir: storeDir, BlockSize: 1024, CacheExpire: time.Second}, - StreamConfig{Name: "zzz", Storage: FileStorage, MaxMsgsPer: 1}, + StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 1}, ) require_NoError(t, err) defer fs.Stop() @@ -4366,3 +4378,48 @@ func TestFileStoreEncryptedAES(t *testing.T) { t.Fatalf("Bad recovered consumer state, expected %+v got %+v", state, rstate) } } + +// Make sure we do not go through block loads when we know no subjects will exists, e.g. raft. +func TestFileStoreNoFSSWhenNoSubjects(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + n, msg := 100, []byte("raft state") + for i := 0; i < n; i++ { + _, _, err := fs.StoreMsg(_EMPTY_, nil, msg) + require_NoError(t, err) + } + + state := fs.State() + require_True(t, state.Msgs == uint64(n)) + + fs.Stop() + fs, err = newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + // Make sure we did not load the block trying to generate fss. + fs.mu.RLock() + mb := fs.blks[0] + fs.mu.RUnlock() + + mb.mu.Lock() + defer mb.mu.Unlock() + + if mb.cloads > 0 { + t.Fatalf("Expected no cache loads but got %d", mb.cloads) + } + if mb.fss != nil { + t.Fatalf("Expected fss to be nil") + } +} From 35135948a02189a6d0e1c1feb64d55ce956c4341 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 17 Aug 2022 14:54:35 -0700 Subject: [PATCH 2/2] Make sure llts update under lock, fss can be force expired so remove. Signed-off-by: Derek Collison --- server/filestore.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index c2fc3ebe..35574cf5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5098,12 +5098,10 @@ func (mb *msgBlock) generatePerSubjectInfo(hasLock bool) error { return nil } - var shouldExpire bool if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { return err } - shouldExpire = true } // Create new one regardless. @@ -5132,9 +5130,11 @@ func (mb *msgBlock) generatePerSubjectInfo(hasLock bool) error { } } } - if shouldExpire { - // Expire this cache before moving on. - mb.tryForceExpireCacheLocked() + + if len(mb.fss) > 0 { + // Make sure we run the cache expire timer. + mb.llts = time.Now().UnixNano() + mb.startCacheExpireTimer() } return nil } @@ -5221,10 +5221,6 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error { } }() - // Make sure we run the cache expire timer. - mb.llts = time.Now().UnixNano() - mb.startCacheExpireTimer() - buf, err := mb.loadPerSubjectInfo() // On failure re-generate. if err != nil { @@ -5261,6 +5257,12 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error { } mb.fss = fss + // Make sure we run the cache expire timer. + if len(mb.fss) > 0 { + mb.llts = time.Now().UnixNano() + mb.startCacheExpireTimer() + } + if !hasLock { mb.mu.Unlock() }