Merge pull request #3595 from nats-io/max-msgs-per-update

[FIXED] Honor MaxMsgsPerSubject during a stream config update.
This commit is contained in:
Derek Collison
2022-11-01 09:41:55 -07:00
committed by GitHub
4 changed files with 96 additions and 1 deletions

View File

@@ -421,6 +421,10 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.ageChk.Stop()
fs.ageChk = nil
}
if cfg.MaxMsgsPer > 0 && cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
fs.enforceMsgPerSubjectLimit()
}
fs.mu.Unlock()
if cfg.MaxAge != 0 {
@@ -2141,7 +2145,7 @@ func (fs *fileStore) enforceBytesLimit() {
}
}
// Will make sure we have limits honored for max msgs per subject on recovery.
// Will make sure we have limits honored for max msgs per subject on recovery or config update.
// We will make sure to go through all msg blocks etc. but in practice this
// will most likely only be the last one, so can take a more conservative approach.
// Lock should be held.

View File

@@ -4798,3 +4798,45 @@ func TestFileStoreRebuildStateProperlyWithMaxMsgsPerSubject(t *testing.T) {
}
fs.mu.RUnlock()
}
func TestFileStoreUpdateMaxMsgsPerSubject(t *testing.T) {
cfg := StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"foo"},
MaxMsgsPer: 10,
}
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, cfg)
require_NoError(t, err)
defer fs.Stop()
// Make sure this is honored on an update.
cfg.MaxMsgsPer = 50
err = fs.UpdateConfig(&cfg)
require_NoError(t, err)
numStored := 22
for i := 0; i < numStored; i++ {
_, _, err = fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}
ss := fs.SubjectsState("foo")["foo"]
if ss.Msgs != uint64(numStored) {
t.Fatalf("Expected to have %d stored, got %d", numStored, ss.Msgs)
}
// Now make sure we trunk if setting to lower value.
cfg.MaxMsgsPer = 10
err = fs.UpdateConfig(&cfg)
require_NoError(t, err)
ss = fs.SubjectsState("foo")["foo"]
if ss.Msgs != 10 {
t.Fatalf("Expected to have %d stored, got %d", 10, ss.Msgs)
}
}

View File

@@ -72,6 +72,18 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
ms.ageChk.Stop()
ms.ageChk = nil
}
// Make sure to update MaxMsgsPer
maxp := ms.maxp
ms.maxp = cfg.MaxMsgsPer
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
lm := uint64(ms.maxp)
for _, ss := range ms.fss {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(ss)
}
}
}
ms.mu.Unlock()
if cfg.MaxAge != 0 {

View File

@@ -405,3 +405,40 @@ func TestMemStorePurgeExWithSubject(t *testing.T) {
ms.PurgeEx("foo", 1, 0)
require_True(t, ms.State().Msgs == 0)
}
func TestMemStoreUpdateMaxMsgsPerSubject(t *testing.T) {
cfg := &StreamConfig{
Name: "TEST",
Storage: MemoryStorage,
Subjects: []string{"foo"},
MaxMsgsPer: 10,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)
// Make sure this is honored on an update.
cfg.MaxMsgsPer = 50
err = ms.UpdateConfig(cfg)
require_NoError(t, err)
numStored := 22
for i := 0; i < numStored; i++ {
_, _, err = ms.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}
ss := ms.SubjectsState("foo")["foo"]
if ss.Msgs != uint64(numStored) {
t.Fatalf("Expected to have %d stored, got %d", numStored, ss.Msgs)
}
// Now make sure we trunk if setting to lower value.
cfg.MaxMsgsPer = 10
err = ms.UpdateConfig(cfg)
require_NoError(t, err)
ss = ms.SubjectsState("foo")["foo"]
if ss.Msgs != 10 {
t.Fatalf("Expected to have %d stored, got %d", 10, ss.Msgs)
}
}