mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #3377 from nats-io/no-fss
When filestore is used for raft layer do not attempt to track subjects
This commit is contained in:
@@ -161,12 +161,13 @@ type msgBlock struct {
|
||||
cexp time.Duration
|
||||
ctmr *time.Timer
|
||||
werr error
|
||||
loading bool
|
||||
flusher bool
|
||||
dmap map[uint64]struct{}
|
||||
fch chan struct{}
|
||||
qch chan struct{}
|
||||
lchk [8]byte
|
||||
loading bool
|
||||
flusher bool
|
||||
noSubj bool
|
||||
closed bool
|
||||
}
|
||||
|
||||
@@ -646,6 +647,11 @@ const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize
|
||||
func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, error) {
|
||||
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire}
|
||||
|
||||
// Check if we should be monitoring subjects.
|
||||
if len(fs.cfg.Subjects) == 0 {
|
||||
mb.noSubj = true
|
||||
}
|
||||
|
||||
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
|
||||
mb.mfn = filepath.Join(mdir, fi.Name())
|
||||
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, index))
|
||||
@@ -742,7 +748,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e
|
||||
// Quick sanity check here.
|
||||
// Note this only checks that the message blk file is not newer then this file, or is empty and we expect empty.
|
||||
if (mb.rbytes == 0 && mb.msgs == 0) || bytes.Equal(lchk[:], mb.lchk[:]) {
|
||||
if mb.msgs > 0 && fs.psim != nil {
|
||||
if mb.msgs > 0 && !mb.noSubj && fs.psim != nil {
|
||||
fs.populateGlobalPerSubjectInfo(mb)
|
||||
}
|
||||
fs.addMsgBlock(mb)
|
||||
@@ -754,7 +760,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint32) (*msgBlock, e
|
||||
if ld, _ := mb.rebuildState(); ld != nil {
|
||||
fs.rebuildStateLocked(ld)
|
||||
}
|
||||
if mb.msgs > 0 && fs.psim != nil {
|
||||
if mb.msgs > 0 && !mb.noSubj && fs.psim != nil {
|
||||
fs.populateGlobalPerSubjectInfo(mb)
|
||||
}
|
||||
|
||||
@@ -1784,8 +1790,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
|
||||
|
||||
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire}
|
||||
|
||||
// If we only have one subject registered we can optimize filtered lookups here.
|
||||
if len(fs.cfg.Subjects) == 1 {
|
||||
// Optimize lookups if we have zero or one subject.
|
||||
if nsubj := len(fs.cfg.Subjects); nsubj == 0 {
|
||||
mb.noSubj = true
|
||||
} else if nsubj == 1 {
|
||||
mb.sfilter = fs.cfg.Subjects[0]
|
||||
}
|
||||
|
||||
@@ -5090,12 +5098,10 @@ func (mb *msgBlock) generatePerSubjectInfo(hasLock bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var shouldExpire bool
|
||||
if mb.cacheNotLoaded() {
|
||||
if err := mb.loadMsgsWithLock(); err != nil {
|
||||
return err
|
||||
}
|
||||
shouldExpire = true
|
||||
}
|
||||
|
||||
// Create new one regardless.
|
||||
@@ -5124,9 +5130,11 @@ func (mb *msgBlock) generatePerSubjectInfo(hasLock bool) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
if shouldExpire {
|
||||
// Expire this cache before moving on.
|
||||
mb.tryForceExpireCacheLocked()
|
||||
|
||||
if len(mb.fss) > 0 {
|
||||
// Make sure we run the cache expire timer.
|
||||
mb.llts = time.Now().UnixNano()
|
||||
mb.startCacheExpireTimer()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -5196,9 +5204,9 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) {
|
||||
|
||||
// readPerSubjectInfo will attempt to restore the per subject information.
|
||||
func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error {
|
||||
// Make sure we run the cache expire timer.
|
||||
mb.llts = time.Now().UnixNano()
|
||||
mb.startCacheExpireTimer()
|
||||
if mb.noSubj {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if !hasLock {
|
||||
@@ -5249,6 +5257,12 @@ func (mb *msgBlock) readPerSubjectInfo(hasLock bool) error {
|
||||
}
|
||||
mb.fss = fss
|
||||
|
||||
// Make sure we run the cache expire timer.
|
||||
if len(mb.fss) > 0 {
|
||||
mb.llts = time.Now().UnixNano()
|
||||
mb.startCacheExpireTimer()
|
||||
}
|
||||
|
||||
if !hasLock {
|
||||
mb.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -980,7 +980,7 @@ func TestFileStoreStreamTruncate(t *testing.T) {
|
||||
|
||||
fs, err := newFileStoreWithCreated(
|
||||
FileStoreConfig{StoreDir: storeDir, BlockSize: 350},
|
||||
StreamConfig{Name: "zzz", Storage: FileStorage},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
time.Now(),
|
||||
prf,
|
||||
)
|
||||
@@ -1037,7 +1037,7 @@ func TestFileStoreStreamTruncate(t *testing.T) {
|
||||
|
||||
fs, err = newFileStoreWithCreated(
|
||||
FileStoreConfig{StoreDir: storeDir, BlockSize: 350},
|
||||
StreamConfig{Name: "zzz", Storage: FileStorage},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
time.Now(),
|
||||
prf,
|
||||
)
|
||||
@@ -1059,7 +1059,10 @@ func TestFileStoreRemovePartialRecovery(t *testing.T) {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
defer removeDir(t, storeDir)
|
||||
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
fs, err := newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -1088,7 +1091,10 @@ func TestFileStoreRemovePartialRecovery(t *testing.T) {
|
||||
// Make sure we recover same state.
|
||||
fs.Stop()
|
||||
|
||||
fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
fs, err = newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -1104,7 +1110,10 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
defer removeDir(t, storeDir)
|
||||
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
fs, err := newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -1145,7 +1154,10 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) {
|
||||
// Make sure we recover same state.
|
||||
fs.Stop()
|
||||
|
||||
fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
fs, err = newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -1765,7 +1777,7 @@ func TestFileStoreSnapshot(t *testing.T) {
|
||||
|
||||
fs, err := newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir},
|
||||
StreamConfig{Name: "zzz", Storage: FileStorage},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
@@ -1848,7 +1860,7 @@ func TestFileStoreSnapshot(t *testing.T) {
|
||||
|
||||
fsr, err := newFileStore(
|
||||
FileStoreConfig{StoreDir: rstoreDir},
|
||||
StreamConfig{Name: "zzz", Storage: FileStorage},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Error restoring from snapshot: %v", err)
|
||||
@@ -4149,7 +4161,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) {
|
||||
|
||||
fs, err := newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir, BlockSize: 1024, CacheExpire: time.Second},
|
||||
StreamConfig{Name: "zzz", Storage: FileStorage, MaxMsgsPer: 1},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 1},
|
||||
)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
@@ -4173,7 +4185,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) {
|
||||
fs.Stop()
|
||||
fs, err = newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir, BlockSize: 1024, CacheExpire: time.Second},
|
||||
StreamConfig{Name: "zzz", Storage: FileStorage, MaxMsgsPer: 1},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 1},
|
||||
)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
@@ -4366,3 +4378,48 @@ func TestFileStoreEncryptedAES(t *testing.T) {
|
||||
t.Fatalf("Bad recovered consumer state, expected %+v got %+v", state, rstate)
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we do not go through block loads when we know no subjects will exists, e.g. raft.
|
||||
func TestFileStoreNoFSSWhenNoSubjects(t *testing.T) {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
defer os.RemoveAll(storeDir)
|
||||
|
||||
fs, err := newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir},
|
||||
StreamConfig{Name: "zzz", Storage: FileStorage},
|
||||
)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
n, msg := 100, []byte("raft state")
|
||||
for i := 0; i < n; i++ {
|
||||
_, _, err := fs.StoreMsg(_EMPTY_, nil, msg)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
state := fs.State()
|
||||
require_True(t, state.Msgs == uint64(n))
|
||||
|
||||
fs.Stop()
|
||||
fs, err = newFileStore(
|
||||
FileStoreConfig{StoreDir: storeDir},
|
||||
StreamConfig{Name: "zzz", Storage: FileStorage},
|
||||
)
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
// Make sure we did not load the block trying to generate fss.
|
||||
fs.mu.RLock()
|
||||
mb := fs.blks[0]
|
||||
fs.mu.RUnlock()
|
||||
|
||||
mb.mu.Lock()
|
||||
defer mb.mu.Unlock()
|
||||
|
||||
if mb.cloads > 0 {
|
||||
t.Fatalf("Expected no cache loads but got %d", mb.cloads)
|
||||
}
|
||||
if mb.fss != nil {
|
||||
t.Fatalf("Expected fss to be nil")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user