Also require MaxMsgsPerSubject to be set per peer review feedback.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-09-22 06:56:32 -07:00
parent 2d737edba6
commit 61a3cff274
2 changed files with 23 additions and 5 deletions

View File

@@ -134,7 +134,6 @@ func TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject(t *testing.T) {
Name: "KV",
Subjects: []string{"KV.>"},
Storage: test.storage,
MaxMsgsPer: 1,
AllowDirect: true,
DiscardNewPer: true,
MaxMsgs: 10,
@@ -145,8 +144,17 @@ func TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject(t *testing.T) {
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires discard new policy") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
// Set broad discard new policy to engage DiscardNewPer
cfg.Discard = DiscardNew
// We should also error here since we have not setup max msgs per subject.
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires max msgs per subject > 0") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
cfg.MaxMsgsPer = 1
addStream(t, nc, cfg)
// We want to test that we reject new messages on a per subject basis if the

View File

@@ -996,8 +996,13 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}
// Check for new discard new per subject, we require the discard policy to also be new.
if cfg.DiscardNewPer && cfg.Discard != DiscardNew {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires discard new policy to be set"))
if cfg.DiscardNewPer {
if cfg.Discard != DiscardNew {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires discard new policy to be set"))
}
if cfg.MaxMsgsPer <= 0 {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires max msgs per subject > 0"))
}
}
getStream := func(streamName string) (bool, StreamConfig) {
@@ -1326,8 +1331,13 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
}
// Check on new discard new per subject.
if cfg.DiscardNewPer && old.Discard != DiscardNew {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires discard new policy to be set"))
if cfg.DiscardNewPer {
if cfg.Discard != DiscardNew {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires discard new policy to be set"))
}
if cfg.MaxMsgsPer <= 0 {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires max msgs per subject > 0"))
}
}
// Do some adjustments for being sealed.