diff --git a/server/filestore.go b/server/filestore.go index e9e08a86..2f391a46 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1879,6 +1879,10 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in if fs.cfg.Discard == DiscardNew { var asl bool if psmax && psmc >= uint64(fs.cfg.MaxMsgsPer) { + // If we are instructed to discard new per subject, this is an error. + if fs.cfg.DiscardNewPer { + return ErrMaxMsgsPerSubject + } fseq, err = fs.firstSeqForSubj(subj) if err != nil { return err diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 576616d8..9fec4764 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -18,6 +18,8 @@ package server import ( "encoding/json" + "errors" + "strings" "testing" "time" @@ -106,3 +108,64 @@ func TestJetStreamClusterRemovePeerByID(t *testing.T) { require_True(t, resp.Error == nil) require_True(t, resp.Success) } + +func TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client for API requests. + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + for _, test := range []struct { + name string + storage StorageType + replicas int + }{ + {"MEM-R1", MemoryStorage, 1}, + {"FILE-R1", FileStorage, 1}, + {"MEM-R3", MemoryStorage, 3}, + {"FILE-R3", FileStorage, 3}, + } { + t.Run(test.name, func(t *testing.T) { + js.DeleteStream("KV") + // Make sure setting new without DiscardPolicy also being new is error. + cfg := &StreamConfig{ + Name: "KV", + Subjects: []string{"KV.>"}, + Storage: test.storage, + AllowDirect: true, + DiscardNewPer: true, + MaxMsgs: 10, + Replicas: test.replicas, + } + if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil { + t.Fatalf("Expected API error but got none") + } else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires discard new policy") { + t.Fatalf("Got wrong error: %+v", apiErr) + } + + // Set broad discard new policy to engage DiscardNewPer + cfg.Discard = DiscardNew + // We should also error here since we have not setup max msgs per subject. + if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil { + t.Fatalf("Expected API error but got none") + } else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires max msgs per subject > 0") { + t.Fatalf("Got wrong error: %+v", apiErr) + } + + cfg.MaxMsgsPer = 1 + addStream(t, nc, cfg) + + // We want to test that we reject new messages on a per subject basis if the + // max msgs per subject limit has been hit, even if other limits have not. + _, err := js.Publish("KV.foo", nil) + require_NoError(t, err) + + _, err = js.Publish("KV.foo", nil) + // Go client does not have const for this one. + require_Error(t, err, errors.New("nats: maximum messages per subject exceeded")) + }) + } +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index d79c3a4b..0bc6a525 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1460,6 +1460,15 @@ func (c *cluster) stableTotalSubs() (total int) { } func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { + t.Helper() + si, err := addStreamWithError(t, nc, cfg) + if err != nil { + t.Fatalf("Unexpected error: %+v", err) + } + return si +} + +func addStreamWithError(t *testing.T, nc *nats.Conn, cfg *StreamConfig) (*StreamInfo, *ApiError) { t.Helper() req, err := json.Marshal(cfg) require_NoError(t, err) @@ -1471,10 +1480,7 @@ func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { if resp.Type != JSApiStreamCreateResponseType { t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamCreateResponseType) } - if resp.Error != nil { - t.Fatalf("Unexpected error: %+v", resp.Error) - } - return resp.StreamInfo + return resp.StreamInfo, resp.Error } func updateStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { diff --git a/server/memstore.go b/server/memstore.go index 1cbed57c..ab2c646b 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -98,6 +98,9 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int // Check if we are discarding new messages when we reach the limit. if ms.cfg.Discard == DiscardNew { + if asl && ms.cfg.DiscardNewPer { + return ErrMaxMsgsPerSubject + } if ms.cfg.MaxMsgs > 0 && ms.state.Msgs >= uint64(ms.cfg.MaxMsgs) { // If we are tracking max messages per subject and are at the limit we will replace, so this is ok. if !asl { diff --git a/server/stream.go b/server/stream.go index e1948961..be645873 100644 --- a/server/stream.go +++ b/server/stream.go @@ -65,6 +65,9 @@ type StreamConfig struct { // Allow higher performance and unified direct access for mirrors as well. MirrorDirect bool `json:"mirror_direct"` + // Allow KV like semantics to also discard new on a per subject basis + DiscardNewPer bool `json:"discard_new_per_subject,omitempty"` + // Optional qualifiers. These can not be modified after set to true. // Sealed will seal a stream so no messages can get out or in. @@ -992,6 +995,16 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("roll-ups require the purge permission")) } + // Check for new discard new per subject, we require the discard policy to also be new. + if cfg.DiscardNewPer { + if cfg.Discard != DiscardNew { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires discard new policy to be set")) + } + if cfg.MaxMsgsPer <= 0 { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires max msgs per subject > 0")) + } + } + getStream := func(streamName string) (bool, StreamConfig) { var exists bool var cfg StreamConfig @@ -1317,6 +1330,16 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change RePublish")) } + // Check on new discard new per subject. + if cfg.DiscardNewPer { + if cfg.Discard != DiscardNew { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires discard new policy to be set")) + } + if cfg.MaxMsgsPer <= 0 { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires max msgs per subject > 0")) + } + } + // Do some adjustments for being sealed. if cfg.Sealed { cfg.MaxAge = 0