From 89ff13a5be680fda3fae2f2b53a12776928b82ed Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 7 Jan 2020 13:32:46 -0800 Subject: [PATCH] Add MaxDeliver for observables Signed-off-by: Derek Collison --- server/observable.go | 19 ++++++++++++- test/jetstream_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/server/observable.go b/server/observable.go index b39195bb..57a71908 100644 --- a/server/observable.go +++ b/server/observable.go @@ -42,6 +42,7 @@ type ObservableConfig struct { DeliverLast bool `json:"deliver_last,omitempty"` AckPolicy AckPolicy `json:"ack_policy"` AckWait time.Duration `json:"ack_wait,omitempty"` + MaxDeliver int `json:"max_deliver,omitempty"` Subject string `json:"subject,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` SampleFrequency string `json:"sample_frequency,omitempty"` @@ -135,6 +136,7 @@ type Observable struct { ptmr *time.Timer rdq []uint64 rdc map[uint64]uint64 + maxdc uint64 waiting []string config ObservableConfig store ObservableStore @@ -188,6 +190,10 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) if config.AckPolicy == AckExplicit && config.AckWait == time.Duration(0) { config.AckWait = JsAckWaitDefault } + // Setup default of -1, meaning no limit for MaxDeliver. + if config.MaxDeliver == 0 { + config.MaxDeliver = -1 + } // Make sure any partition subject is also a literal. if config.Subject != "" { @@ -255,7 +261,15 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) } // Set name, which will be durable name if set, otherwise we create one at random. - o := &Observable{mset: mset, config: *config, dsubj: config.Delivery, active: true, qch: make(chan struct{}), fch: make(chan struct{}), sfreq: int32(sampleFreq)} + o := &Observable{mset: mset, + config: *config, + dsubj: config.Delivery, + active: true, + qch: make(chan struct{}), + fch: make(chan struct{}), + sfreq: int32(sampleFreq), + maxdc: uint64(config.MaxDeliver), + } if isDurableObservable(config) { o.name = config.Durable } else { @@ -806,6 +820,9 @@ func (o *Observable) getNextMsg() (string, []byte, uint64, uint64, error) { seq = o.rdq[0] o.rdq = append(o.rdq[:0], o.rdq[1:]...) dcount = o.incDeliveryCount(seq) + if o.maxdc > 0 && dcount > o.maxdc { + continue + } } subj, msg, _, err := o.mset.store.LoadMsg(seq) if err == nil { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 944a8677..5b9b72c2 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -209,6 +209,68 @@ func TestJetStreamAddMsgSet(t *testing.T) { } } +func TestJetStreamObservableMaxDeliveries(t *testing.T) { + cases := []struct { + name string + mconfig *server.MsgSetConfig + }{ + {"MemoryStore", &server.MsgSetConfig{Name: "MY_WQ", Storage: server.MemoryStorage}}, + {"FileStore", &server.MsgSetConfig{Name: "MY_WQ", Storage: server.FileStorage}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.GlobalAccount().AddMsgSet(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer mset.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + // Queue up our work item. + resp, _ := nc.Request(c.mconfig.Name, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + + sub, _ := nc.SubscribeSync(nats.NewInbox()) + defer sub.Unsubscribe() + nc.Flush() + + maxDeliver := 5 + ackWait := 10 * time.Millisecond + + o, err := mset.AddObservable(&server.ObservableConfig{ + Delivery: sub.Subject, + DeliverAll: true, + AckPolicy: server.AckExplicit, + AckWait: ackWait, + MaxDeliver: maxDeliver, + }) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer o.Delete() + + // Wait for redeliveries to pile up. + checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error { + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxDeliver { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, maxDeliver) + } + return nil + }) + + // Now wait a bit longer and make sure we do not have more than maxDeliveries. + time.Sleep(2 * ackWait) + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxDeliver { + t.Fatalf("Did not receive correct number of messages: %d vs %d", nmsgs, maxDeliver) + } + }) + } +} + func TestJetStreamAddMsgSetMaxMsgSize(t *testing.T) { cases := []struct { name string