Add MaxDeliver for observables

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-01-07 13:32:46 -08:00
parent b3739f9bbf
commit 89ff13a5be
2 changed files with 80 additions and 1 deletions

View File

@@ -42,6 +42,7 @@ type ObservableConfig struct {
DeliverLast bool `json:"deliver_last,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
Subject string `json:"subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
SampleFrequency string `json:"sample_frequency,omitempty"`
@@ -135,6 +136,7 @@ type Observable struct {
ptmr *time.Timer
rdq []uint64
rdc map[uint64]uint64
maxdc uint64
waiting []string
config ObservableConfig
store ObservableStore
@@ -188,6 +190,10 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
if config.AckPolicy == AckExplicit && config.AckWait == time.Duration(0) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// Make sure any partition subject is also a literal.
if config.Subject != "" {
@@ -255,7 +261,15 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
}
// Set name, which will be durable name if set, otherwise we create one at random.
o := &Observable{mset: mset, config: *config, dsubj: config.Delivery, active: true, qch: make(chan struct{}), fch: make(chan struct{}), sfreq: int32(sampleFreq)}
o := &Observable{mset: mset,
config: *config,
dsubj: config.Delivery,
active: true,
qch: make(chan struct{}),
fch: make(chan struct{}),
sfreq: int32(sampleFreq),
maxdc: uint64(config.MaxDeliver),
}
if isDurableObservable(config) {
o.name = config.Durable
} else {
@@ -806,6 +820,9 @@ func (o *Observable) getNextMsg() (string, []byte, uint64, uint64, error) {
seq = o.rdq[0]
o.rdq = append(o.rdq[:0], o.rdq[1:]...)
dcount = o.incDeliveryCount(seq)
if o.maxdc > 0 && dcount > o.maxdc {
continue
}
}
subj, msg, _, err := o.mset.store.LoadMsg(seq)
if err == nil {

View File

@@ -209,6 +209,68 @@ func TestJetStreamAddMsgSet(t *testing.T) {
}
}
func TestJetStreamObservableMaxDeliveries(t *testing.T) {
cases := []struct {
name string
mconfig *server.MsgSetConfig
}{
{"MemoryStore", &server.MsgSetConfig{Name: "MY_WQ", Storage: server.MemoryStorage}},
{"FileStore", &server.MsgSetConfig{Name: "MY_WQ", Storage: server.FileStorage}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.GlobalAccount().AddMsgSet(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// Queue up our work item.
resp, _ := nc.Request(c.mconfig.Name, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
sub, _ := nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc.Flush()
maxDeliver := 5
ackWait := 10 * time.Millisecond
o, err := mset.AddObservable(&server.ObservableConfig{
Delivery: sub.Subject,
DeliverAll: true,
AckPolicy: server.AckExplicit,
AckWait: ackWait,
MaxDeliver: maxDeliver,
})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
defer o.Delete()
// Wait for redeliveries to pile up.
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxDeliver {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, maxDeliver)
}
return nil
})
// Now wait a bit longer and make sure we do not have more than maxDeliveries.
time.Sleep(2 * ackWait)
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxDeliver {
t.Fatalf("Did not receive correct number of messages: %d vs %d", nmsgs, maxDeliver)
}
})
}
}
func TestJetStreamAddMsgSetMaxMsgSize(t *testing.T) {
cases := []struct {
name string