Added support for DiscardPolicy on streams

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-04-29 20:11:53 -07:00
parent 911e7ef35d
commit 7f41b5a6ae
5 changed files with 158 additions and 14 deletions

View File

@@ -554,6 +554,18 @@ func (fs *fileStore) StoreMsg(subj string, msg []byte) (seq uint64, ts int64, er
return 0, 0, ErrStoreClosed
}
// Check if we are discarding new messages when we reach the limit.
if fs.cfg.Discard == DiscardNew {
if fs.cfg.MaxMsgs > 0 && fs.state.Msgs >= uint64(fs.cfg.MaxMsgs) {
fs.mu.Unlock()
return 0, 0, ErrMaxMsgs
}
if fs.cfg.MaxBytes > 0 && fs.state.Bytes+uint64(len(msg)) >= uint64(fs.cfg.MaxBytes) {
fs.mu.Unlock()
return 0, 0, ErrMaxBytes
}
}
seq = fs.state.LastSeq + 1
if fs.state.FirstSeq == 0 {
fs.state.FirstSeq = seq

View File

@@ -25,11 +25,11 @@ import (
// TODO(dlc) - This is a fairly simplistic approach but should do for now.
type memStore struct {
mu sync.RWMutex
cfg StreamConfig
state StreamState
msgs map[uint64]*storedMsg
scb func(int64)
ageChk *time.Timer
config StreamConfig
consumers int
}
@@ -47,7 +47,7 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
if cfg.Storage != MemoryStorage {
return nil, fmt.Errorf("memStore requires memory storage type in config")
}
return &memStore{msgs: make(map[uint64]*storedMsg), config: *cfg}, nil
return &memStore{msgs: make(map[uint64]*storedMsg), cfg: *cfg}, nil
}
func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
@@ -59,15 +59,15 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
}
ms.mu.Lock()
ms.config = *cfg
ms.cfg = *cfg
// Limits checks and enforcement.
ms.enforceMsgLimit()
ms.enforceBytesLimit()
// Do age timers.
if ms.ageChk == nil && ms.config.MaxAge != 0 {
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
ms.startAgeChk()
}
if ms.ageChk != nil && ms.config.MaxAge == 0 {
if ms.ageChk != nil && ms.cfg.MaxAge == 0 {
ms.ageChk.Stop()
ms.ageChk = nil
}
@@ -82,6 +82,19 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
// Store stores a message.
func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
ms.mu.Lock()
// Check if we are discarding new messages when we reach the limit.
if ms.cfg.Discard == DiscardNew {
if ms.cfg.MaxMsgs > 0 && ms.state.Msgs >= uint64(ms.cfg.MaxMsgs) {
ms.mu.Unlock()
return 0, 0, ErrMaxMsgs
}
if ms.cfg.MaxBytes > 0 && ms.state.Bytes+uint64(len(msg)) >= uint64(ms.cfg.MaxBytes) {
ms.mu.Unlock()
return 0, 0, ErrMaxBytes
}
}
seq := ms.state.LastSeq + 1
if ms.state.FirstSeq == 0 {
ms.state.FirstSeq = seq
@@ -105,7 +118,7 @@ func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
ms.enforceBytesLimit()
// Check if we have and need the age expiration timer running.
if ms.ageChk == nil && ms.config.MaxAge != 0 {
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
ms.startAgeChk()
}
cb := ms.scb
@@ -155,10 +168,10 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
// Will check the msg limit and drop firstSeq msg if needed.
// Lock should be held.
func (ms *memStore) enforceMsgLimit() {
if ms.config.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.config.MaxMsgs) {
if ms.cfg.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.cfg.MaxMsgs) {
return
}
for nmsgs := ms.state.Msgs; nmsgs > uint64(ms.config.MaxMsgs); nmsgs = ms.state.Msgs {
for nmsgs := ms.state.Msgs; nmsgs > uint64(ms.cfg.MaxMsgs); nmsgs = ms.state.Msgs {
ms.deleteFirstMsgOrPanic()
}
}
@@ -166,10 +179,10 @@ func (ms *memStore) enforceMsgLimit() {
// Will check the bytes limit and drop msgs if needed.
// Lock should be held.
func (ms *memStore) enforceBytesLimit() {
if ms.config.MaxBytes <= 0 || ms.state.Bytes <= uint64(ms.config.MaxBytes) {
if ms.cfg.MaxBytes <= 0 || ms.state.Bytes <= uint64(ms.cfg.MaxBytes) {
return
}
for bs := ms.state.Bytes; bs > uint64(ms.config.MaxBytes); bs = ms.state.Bytes {
for bs := ms.state.Bytes; bs > uint64(ms.cfg.MaxBytes); bs = ms.state.Bytes {
ms.deleteFirstMsgOrPanic()
}
}
@@ -177,8 +190,8 @@ func (ms *memStore) enforceBytesLimit() {
// Will start the age check timer.
// Lock should be held.
func (ms *memStore) startAgeChk() {
if ms.ageChk == nil && ms.config.MaxAge != 0 {
ms.ageChk = time.AfterFunc(ms.config.MaxAge, ms.expireMsgs)
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
ms.ageChk = time.AfterFunc(ms.cfg.MaxAge, ms.expireMsgs)
}
}
@@ -188,7 +201,7 @@ func (ms *memStore) expireMsgs() {
defer ms.mu.Unlock()
now := time.Now().UnixNano()
minAge := now - int64(ms.config.MaxAge)
minAge := now - int64(ms.cfg.MaxAge)
for {
if sm, ok := ms.msgs[ms.state.FirstSeq]; ok && sm.ts <= minAge {
ms.deleteFirstMsgOrPanic()
@@ -197,7 +210,7 @@ func (ms *memStore) expireMsgs() {
ms.ageChk.Stop()
ms.ageChk = nil
} else {
fireIn := time.Duration(sm.ts-now) + ms.config.MaxAge
fireIn := time.Duration(sm.ts-now) + ms.cfg.MaxAge
ms.ageChk.Reset(fireIn)
}
return

View File

@@ -39,6 +39,12 @@ var (
ErrStoreMsgNotFound = errors.New("no message found")
// ErrStoreEOF is returned when message seq is greater than the last sequence.
ErrStoreEOF = errors.New("stream EOF")
// ErrMaxMsgs is returned when we have discard new as a policy and we reached
// the message limit.
ErrMaxMsgs = errors.New("maximum messages exceeded")
// ErrMaxBytes is returned when we have discard new as a policy and we reached
// the bytes limit.
ErrMaxBytes = errors.New("maximum bytes exceeded")
// ErrStoreSnapshotInProgress is returned when RemoveMsg or EraseMsg is called
// while a snapshot is in progress.
ErrStoreSnapshotInProgress = errors.New("snapshot in progress")
@@ -73,6 +79,17 @@ const (
WorkQueuePolicy
)
// Discard Policy determines how we proceed when limits of messages or bytes are hit. The default, DicscardOld will
// remove older messages. DiscardNew will fail to store the new message.
type DiscardPolicy int
const (
// DiscardOld will remove older messages to return to the limits.
DiscardOld = iota
//DiscardNew will error on a StoreMsg call
DiscardNew
)
// StreamStats is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
@@ -166,6 +183,40 @@ func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
return nil
}
func (dp DiscardPolicy) String() string {
switch dp {
case DiscardOld:
return "DiscardOld"
case DiscardNew:
return "DiscardNew"
default:
return "Unknown Discard Policy"
}
}
func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
switch dp {
case DiscardOld:
return json.Marshal("old")
case DiscardNew:
return json.Marshal("new")
default:
return nil, fmt.Errorf("can not marshal %v", dp)
}
}
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
switch strings.ToLower(string(data)) {
case jsonString("old"):
*dp = DiscardOld
case jsonString("new"):
*dp = DiscardNew
default:
return fmt.Errorf("can not unmarshal %q", data)
}
return nil
}
const (
memoryStorageString = "memory"
fileStorageString = "file"

View File

@@ -33,6 +33,7 @@ type StreamConfig struct {
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`

View File

@@ -222,6 +222,73 @@ func TestJetStreamAddStream(t *testing.T) {
}
}
func TestJetStreamAddStreamDiscardNew(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{name: "MemoryStore",
mconfig: &server.StreamConfig{
Name: "foo",
MaxMsgs: 10,
MaxBytes: 4096,
Discard: server.DiscardNew,
Storage: server.MemoryStorage,
Replicas: 1,
}},
{name: "FileStore",
mconfig: &server.StreamConfig{
Name: "foo",
MaxMsgs: 10,
MaxBytes: 4096,
Discard: server.DiscardNew,
Storage: server.FileStorage,
Replicas: 1,
}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
subj := "foo"
toSend := 10
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, subj, fmt.Sprintf("MSG: %d", i+1))
}
// We expect this one to fail due to discard policy.
resp, _ := nc.Request(subj, []byte("discard me"), 100*time.Millisecond)
if resp == nil {
t.Fatalf("No response, possible timeout?")
}
if string(resp.Data) != "-ERR 'maximum messages exceeded'" {
t.Fatalf("Expected to get an error about maximum messages, got %q", resp.Data)
}
// Now do bytes.
mset.Purge()
big := make([]byte, 8192)
resp, _ = nc.Request(subj, big, 100*time.Millisecond)
if resp == nil {
t.Fatalf("No response, possible timeout?")
}
if string(resp.Data) != "-ERR 'maximum bytes exceeded'" {
t.Fatalf("Expected to get an error about maximum bytes, got %q", resp.Data)
}
})
}
}
func TestJetStreamPubAck(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()