From 1fa5e731779f3b5e43243c8dbf61251c71aedc92 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 31 Oct 2022 17:25:20 -0700 Subject: [PATCH] Honor MaxMsgsPerSubject when a stream config is updated, including enforcing a lower limit. Signed-off-by: Derek Collison --- server/filestore.go | 6 +++++- server/filestore_test.go | 42 ++++++++++++++++++++++++++++++++++++++++ server/memstore.go | 12 ++++++++++++ server/memstore_test.go | 37 +++++++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 1 deletion(-) diff --git a/server/filestore.go b/server/filestore.go index 1897a44f..c23eeca5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -421,6 +421,10 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.ageChk.Stop() fs.ageChk = nil } + + if cfg.MaxMsgsPer > 0 && cfg.MaxMsgsPer < old_cfg.MaxMsgsPer { + fs.enforceMsgPerSubjectLimit() + } fs.mu.Unlock() if cfg.MaxAge != 0 { @@ -2141,7 +2145,7 @@ func (fs *fileStore) enforceBytesLimit() { } } -// Will make sure we have limits honored for max msgs per subject on recovery. +// Will make sure we have limits honored for max msgs per subject on recovery or config update. // We will make sure to go through all msg blocks etc. but in practice this // will most likely only be the last one, so can take a more conservative approach. // Lock should be held. diff --git a/server/filestore_test.go b/server/filestore_test.go index a3bb3bfe..c26d8f67 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -4798,3 +4798,45 @@ func TestFileStoreRebuildStateProperlyWithMaxMsgsPerSubject(t *testing.T) { } fs.mu.RUnlock() } + +func TestFileStoreUpdateMaxMsgsPerSubject(t *testing.T) { + cfg := StreamConfig{ + Name: "TEST", + Storage: FileStorage, + Subjects: []string{"foo"}, + MaxMsgsPer: 10, + } + + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, cfg) + require_NoError(t, err) + defer fs.Stop() + + // Make sure this is honored on an update. + cfg.MaxMsgsPer = 50 + err = fs.UpdateConfig(&cfg) + require_NoError(t, err) + + numStored := 22 + for i := 0; i < numStored; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + ss := fs.SubjectsState("foo")["foo"] + if ss.Msgs != uint64(numStored) { + t.Fatalf("Expected to have %d stored, got %d", numStored, ss.Msgs) + } + + // Now make sure we trunk if setting to lower value. + cfg.MaxMsgsPer = 10 + err = fs.UpdateConfig(&cfg) + require_NoError(t, err) + + ss = fs.SubjectsState("foo")["foo"] + if ss.Msgs != 10 { + t.Fatalf("Expected to have %d stored, got %d", 10, ss.Msgs) + } +} diff --git a/server/memstore.go b/server/memstore.go index ab2c646b..dcfda376 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -72,6 +72,18 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { ms.ageChk.Stop() ms.ageChk = nil } + // Make sure to update MaxMsgsPer + maxp := ms.maxp + ms.maxp = cfg.MaxMsgsPer + // If the value is smaller we need to enforce that. + if ms.maxp != 0 && ms.maxp < maxp { + lm := uint64(ms.maxp) + for _, ss := range ms.fss { + if ss.Msgs > lm { + ms.enforcePerSubjectLimit(ss) + } + } + } ms.mu.Unlock() if cfg.MaxAge != 0 { diff --git a/server/memstore_test.go b/server/memstore_test.go index 92fa00c5..c19dad85 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -405,3 +405,40 @@ func TestMemStorePurgeExWithSubject(t *testing.T) { ms.PurgeEx("foo", 1, 0) require_True(t, ms.State().Msgs == 0) } + +func TestMemStoreUpdateMaxMsgsPerSubject(t *testing.T) { + cfg := &StreamConfig{ + Name: "TEST", + Storage: MemoryStorage, + Subjects: []string{"foo"}, + MaxMsgsPer: 10, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + + // Make sure this is honored on an update. + cfg.MaxMsgsPer = 50 + err = ms.UpdateConfig(cfg) + require_NoError(t, err) + + numStored := 22 + for i := 0; i < numStored; i++ { + _, _, err = ms.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + ss := ms.SubjectsState("foo")["foo"] + if ss.Msgs != uint64(numStored) { + t.Fatalf("Expected to have %d stored, got %d", numStored, ss.Msgs) + } + + // Now make sure we trunk if setting to lower value. + cfg.MaxMsgsPer = 10 + err = ms.UpdateConfig(cfg) + require_NoError(t, err) + + ss = ms.SubjectsState("foo")["foo"] + if ss.Msgs != 10 { + t.Fatalf("Expected to have %d stored, got %d", 10, ss.Msgs) + } +}