From ac40ecaef92b99359ef85c417df231fbd2ba3c8b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 12 Oct 2019 16:19:17 -0700 Subject: [PATCH] Durable reassignment and takeover Signed-off-by: Derek Collison --- server/observable.go | 56 ++++++++++++++++++++---- test/jetstream_test.go | 97 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 8 deletions(-) diff --git a/server/observable.go b/server/observable.go index 946e6ff5..b35d42b9 100644 --- a/server/observable.go +++ b/server/observable.go @@ -137,10 +137,8 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) } // Check if we are not durable that the delivery subject has interest. - if config.Durable == _EMPTY_ && config.Delivery != _EMPTY_ { - if mset.noInterest(config.Delivery) { - return nil, fmt.Errorf("observable requires interest for delivery subject when ephemeral") - } + if config.Delivery != _EMPTY_ && config.Durable == _EMPTY_ && mset.noInterest(config.Delivery) { + return nil, fmt.Errorf("observable requires interest for delivery subject when ephemeral") } // Hold mset lock here/ @@ -180,16 +178,31 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) // Select starting sequence number o.selectStartingSeqNo() - // Now register with mset and create ack subscription. + // Now register with mset and create the ack subscription. c := mset.client if c == nil { mset.mu.Unlock() return nil, fmt.Errorf("message set not valid") } s, a := c.srv, c.acc - if _, ok := mset.obs[o.name]; ok { + // Check if we already have this one registered. + if eo, ok := mset.obs[o.name]; ok { mset.mu.Unlock() - return nil, fmt.Errorf("observable already exists") + if !o.isDurable() || !o.isPushMode() { + return nil, fmt.Errorf("observable already exists") + } + // If we are here we have already registered this durable. If it is still active that is an error. + if eo.Active() { + return nil, fmt.Errorf("observable already exists and is still active") + } + // Since we are here this means we have a potentially new durable so we should update here. + // Check that configs are the same. + if !configsEqualSansDelivery(o.config, eo.config) { + return nil, fmt.Errorf("observable replacement durable config not the same") + } + // Once we are here we have a replacement push based durable. + eo.updateDelivery(o.config.Delivery) + return eo, nil } // Set up the ack subscription for this observable. Will use wildcard for all acks. // We will remember the template to generate replaies with sequence numbers and use @@ -223,12 +236,35 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) o.athresh = JsNotActiveThresholdDefault o.achk = JsActiveCheckIntervalDefault o.atmr = time.AfterFunc(o.achk, o.checkActive) + // If durable and no interest mark as not active to start. + if o.isDurable() && mset.noInterest(config.Delivery) { + o.active = false + } o.mu.Unlock() } return o, nil } +func (o *Observable) updateDelivery(newDelivery string) { + // Update the config and the dsubj + o.mu.Lock() + o.dsubj = newDelivery + o.config.Delivery = newDelivery + // FIXME(dlc) - check partitions, think we need offset. + o.dseq = o.aseq + o.sseq = o.aseq + o.mu.Unlock() + + o.checkActive() +} + +func configsEqualSansDelivery(a, b ObservableConfig) bool { + // These were copied in so can set Delivery here. + a.Delivery, b.Delivery = _EMPTY_, _EMPTY_ + return a == b +} + func (o *Observable) msgSet() *MsgSet { o.mu.Lock() mset := o.mset @@ -508,7 +544,7 @@ func (o *Observable) checkActive() { if o.mset.noInterest(o.config.Delivery) { o.active = false o.nointerest++ - if o.config.Durable == "" && o.nointerest >= o.athresh { + if !o.isDurable() && o.nointerest >= o.athresh { shouldDelete = true } } else { @@ -649,6 +685,10 @@ func isDurableObservable(config *ObservableConfig) bool { return config != nil && config.Durable != _EMPTY_ } +func (o *Observable) isDurable() bool { + return o.config.Durable != _EMPTY_ +} + // Are we in push mode, delivery subject, etc. func (o *Observable) isPushMode() bool { return o.config.Delivery != _EMPTY_ diff --git a/test/jetstream_test.go b/test/jetstream_test.go index d233905e..e703bc5d 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -1364,3 +1364,100 @@ func TestJetStreamObservableReconnect(t *testing.T) { getMsg(i) } } + +func TestJetStreamDurableObservableReconnect(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "DT", Subjects: []string{"foo.*"}}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc := clientConnectToServer(t, s) + defer nc.Close() + + dname := "d22" + subj1 := nats.NewInbox() + + o, err := mset.AddObservable(&server.ObservableConfig{Durable: dname, Delivery: subj1, AckPolicy: server.AckExplicit}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // For test speed. + o.SetActiveCheckParams(50*time.Millisecond, 2) + + sendMsg := func() { + t.Helper() + if err := nc.Publish("foo.22", []byte("OK!")); err != nil { + return + } + } + + // Send 10 msgs + toSend := 10 + for i := 0; i < toSend; i++ { + sendMsg() + } + + sub, _ := nc.SubscribeSync(subj1) + defer sub.Unsubscribe() + + checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error { + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend) + } + return nil + }) + + getMsg := func(seqno int) *nats.Msg { + t.Helper() + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if seq := o.SeqFromReply(m.Reply); seq != uint64(seqno) { + t.Fatalf("Expected sequence of %d , got %d", seqno, seq) + } + m.Respond(nil) + return m + } + + // Ack first half + for i := 1; i <= toSend/2; i++ { + m := getMsg(i) + m.Respond(nil) + } + + // We should not be able to try to add an observer with the same name. + if _, err := mset.AddObservable(&server.ObservableConfig{Durable: dname, Delivery: subj1, AckPolicy: server.AckExplicit}); err == nil { + t.Fatalf("Expected and error trying to add a new durable observable while first still active") + } + + // Now unsubscribe and wait to become inactive + sub.Unsubscribe() + checkFor(t, 250*time.Millisecond, 50*time.Millisecond, func() error { + if o.Active() { + return fmt.Errorf("Observable is still active") + } + return nil + }) + + // Now we should be able to replace the delivery subject. + subj2 := nats.NewInbox() + sub, _ = nc.SubscribeSync(subj2) + defer sub.Unsubscribe() + nc.Flush() + + o, err = mset.AddObservable(&server.ObservableConfig{Durable: dname, Delivery: subj2, AckPolicy: server.AckExplicit}) + if err != nil { + t.Fatalf("Unexpected error trying to add a new durable observable: %v", err) + } + + // We should get the remaining messages here. + for i := toSend / 2; i <= toSend; i++ { + m := getMsg(i) + m.Respond(nil) + } +}