Merge pull request #3487 from nats-io/discard-new-per

Allow discard new per subject for certain KV type scenarios.
This commit is contained in:
Derek Collison
2022-09-22 08:26:10 -07:00
committed by GitHub
5 changed files with 103 additions and 4 deletions

View File

@@ -1879,6 +1879,10 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
if fs.cfg.Discard == DiscardNew {
var asl bool
if psmax && psmc >= uint64(fs.cfg.MaxMsgsPer) {
// If we are instructed to discard new per subject, this is an error.
if fs.cfg.DiscardNewPer {
return ErrMaxMsgsPerSubject
}
fseq, err = fs.firstSeqForSubj(subj)
if err != nil {
return err

View File

@@ -18,6 +18,8 @@ package server
import (
"encoding/json"
"errors"
"strings"
"testing"
"time"
@@ -106,3 +108,64 @@ func TestJetStreamClusterRemovePeerByID(t *testing.T) {
require_True(t, resp.Error == nil)
require_True(t, resp.Success)
}
func TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client for API requests.
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
for _, test := range []struct {
name string
storage StorageType
replicas int
}{
{"MEM-R1", MemoryStorage, 1},
{"FILE-R1", FileStorage, 1},
{"MEM-R3", MemoryStorage, 3},
{"FILE-R3", FileStorage, 3},
} {
t.Run(test.name, func(t *testing.T) {
js.DeleteStream("KV")
// Make sure setting new without DiscardPolicy also being new is error.
cfg := &StreamConfig{
Name: "KV",
Subjects: []string{"KV.>"},
Storage: test.storage,
AllowDirect: true,
DiscardNewPer: true,
MaxMsgs: 10,
Replicas: test.replicas,
}
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires discard new policy") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
// Set broad discard new policy to engage DiscardNewPer
cfg.Discard = DiscardNew
// We should also error here since we have not setup max msgs per subject.
if _, apiErr := addStreamWithError(t, nc, cfg); apiErr == nil {
t.Fatalf("Expected API error but got none")
} else if apiErr.ErrCode != 10052 || !strings.Contains(apiErr.Description, "discard new per subject requires max msgs per subject > 0") {
t.Fatalf("Got wrong error: %+v", apiErr)
}
cfg.MaxMsgsPer = 1
addStream(t, nc, cfg)
// We want to test that we reject new messages on a per subject basis if the
// max msgs per subject limit has been hit, even if other limits have not.
_, err := js.Publish("KV.foo", nil)
require_NoError(t, err)
_, err = js.Publish("KV.foo", nil)
// Go client does not have const for this one.
require_Error(t, err, errors.New("nats: maximum messages per subject exceeded"))
})
}
}

View File

@@ -1460,6 +1460,15 @@ func (c *cluster) stableTotalSubs() (total int) {
}
func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
t.Helper()
si, err := addStreamWithError(t, nc, cfg)
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
return si
}
func addStreamWithError(t *testing.T, nc *nats.Conn, cfg *StreamConfig) (*StreamInfo, *ApiError) {
t.Helper()
req, err := json.Marshal(cfg)
require_NoError(t, err)
@@ -1471,10 +1480,7 @@ func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
if resp.Type != JSApiStreamCreateResponseType {
t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamCreateResponseType)
}
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}
return resp.StreamInfo
return resp.StreamInfo, resp.Error
}
func updateStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {

View File

@@ -98,6 +98,9 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
// Check if we are discarding new messages when we reach the limit.
if ms.cfg.Discard == DiscardNew {
if asl && ms.cfg.DiscardNewPer {
return ErrMaxMsgsPerSubject
}
if ms.cfg.MaxMsgs > 0 && ms.state.Msgs >= uint64(ms.cfg.MaxMsgs) {
// If we are tracking max messages per subject and are at the limit we will replace, so this is ok.
if !asl {

View File

@@ -65,6 +65,9 @@ type StreamConfig struct {
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct"`
// Allow KV like semantics to also discard new on a per subject basis
DiscardNewPer bool `json:"discard_new_per_subject,omitempty"`
// Optional qualifiers. These can not be modified after set to true.
// Sealed will seal a stream so no messages can get out or in.
@@ -992,6 +995,16 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("roll-ups require the purge permission"))
}
// Check for new discard new per subject, we require the discard policy to also be new.
if cfg.DiscardNewPer {
if cfg.Discard != DiscardNew {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires discard new policy to be set"))
}
if cfg.MaxMsgsPer <= 0 {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires max msgs per subject > 0"))
}
}
getStream := func(streamName string) (bool, StreamConfig) {
var exists bool
var cfg StreamConfig
@@ -1317,6 +1330,16 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change RePublish"))
}
// Check on new discard new per subject.
if cfg.DiscardNewPer {
if cfg.Discard != DiscardNew {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires discard new policy to be set"))
}
if cfg.MaxMsgsPer <= 0 {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("discard new per subject requires max msgs per subject > 0"))
}
}
// Do some adjustments for being sealed.
if cfg.Sealed {
cfg.MaxAge = 0