From 1028798d3aef18334fa321c6fcefc2d3d3e2eb9b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 8 Jan 2020 05:48:13 -0800 Subject: [PATCH] Move Subject to FilterSubject Signed-off-by: Derek Collison --- server/filestore_test.go | 8 ++++---- server/msgset.go | 4 ++-- server/observable.go | 32 ++++++++++++++++---------------- test/jetstream_test.go | 28 ++++++++++++++-------------- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/server/filestore_test.go b/server/filestore_test.go index 96984c12..8c2f4444 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -739,10 +739,10 @@ func TestFileStoreMeta(t *testing.T) { // Now create an observable. Same deal for them. oconfig := ObservableConfig{ - Delivery: "d", - DeliverAll: true, - Subject: "foo", - AckPolicy: AckAll, + Delivery: "d", + DeliverAll: true, + FilterSubject: "foo", + AckPolicy: AckAll, } oname := "obs22" obs, err := fs.ObservableStore(oname, &oconfig) diff --git a/server/msgset.go b/server/msgset.go index f5c4b45a..e3a90e58 100644 --- a/server/msgset.go +++ b/server/msgset.go @@ -617,10 +617,10 @@ func (mset *MsgSet) waitForMsgs() { // Lock should be held. func (mset *MsgSet) partitionUnique(partition string) bool { for _, o := range mset.obs { - if o.config.Subject == _EMPTY_ { + if o.config.FilterSubject == _EMPTY_ { return false } - if subjectIsSubsetMatch(partition, o.config.Subject) { + if subjectIsSubsetMatch(partition, o.config.FilterSubject) { return false } } diff --git a/server/observable.go b/server/observable.go index 57a71908..273cc182 100644 --- a/server/observable.go +++ b/server/observable.go @@ -43,7 +43,7 @@ type ObservableConfig struct { AckPolicy AckPolicy `json:"ack_policy"` AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` - Subject string `json:"subject,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` SampleFrequency string `json:"sample_frequency,omitempty"` } @@ -196,16 +196,16 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) } // Make sure any partition subject is also a literal. - if config.Subject != "" { - if !subjectIsLiteral(config.Subject) { - return nil, fmt.Errorf("observable partition subject has wildcards") + if config.FilterSubject != "" { + if !subjectIsLiteral(config.FilterSubject) { + return nil, fmt.Errorf("observable filter subject has wildcards") } // Make sure this is a valid partition of the interest subjects. - if !mset.validSubject(config.Subject) { - return nil, fmt.Errorf("observable partition not a valid subset of the interest subjects") + if !mset.validSubject(config.FilterSubject) { + return nil, fmt.Errorf("observable filter subject is not a valid subset of the interest subjects") } if config.AckPolicy == AckAll { - return nil, fmt.Errorf("observable with partition can not have an ack policy of ack all") + return nil, fmt.Errorf("observable with filter subject can not have an ack policy of ack all") } } @@ -245,13 +245,13 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) return nil, fmt.Errorf("delivery subject not allowed on workqueue message set") } if len(mset.obs) > 0 { - if config.Subject == _EMPTY_ { + if config.FilterSubject == _EMPTY_ { mset.mu.Unlock() - return nil, fmt.Errorf("multiple non-partioned observables not allowed on workqueue message set") - } else if !mset.partitionUnique(config.Subject) { + return nil, fmt.Errorf("multiple non-filtered observables not allowed on workqueue message set") + } else if !mset.partitionUnique(config.FilterSubject) { // We have a partition but it is not unique amongst the others. mset.mu.Unlock() - return nil, fmt.Errorf("partioned observable not unique on workqueue message set") + return nil, fmt.Errorf("filtered observable not unique on workqueue message set") } } if !config.DeliverAll { @@ -828,7 +828,7 @@ func (o *Observable) getNextMsg() (string, []byte, uint64, uint64, error) { if err == nil { if dcount == 1 { // First delivery. o.sseq++ - if o.config.Subject != "" && subj != o.config.Subject { + if o.config.FilterSubject != _EMPTY_ && subj != o.config.FilterSubject { continue } } @@ -881,7 +881,7 @@ func (o *Observable) processReplay() error { o.mu.Lock() mset := o.mset - partition := o.config.Subject + partition := o.config.FilterSubject pullMode := o.isPullMode() o.mu.Unlock() @@ -1063,7 +1063,7 @@ func (o *Observable) deliverCurrentMsg(subj string, msg []byte, seq uint64) bool // If we are partitioned and we do not match, do not consider this a failure. // Go ahead and return true. - if o.config.Subject != "" && subj != o.config.Subject { + if o.config.FilterSubject != _EMPTY_ && subj != o.config.FilterSubject { o.mu.Unlock() return true } @@ -1233,7 +1233,7 @@ func (o *Observable) selectSubjectLast() { if err == ErrStoreMsgNotFound { continue } - if subj == o.config.Subject { + if subj == o.config.FilterSubject { o.sseq = seq return } @@ -1250,7 +1250,7 @@ func (o *Observable) selectStartingSeqNo() { } else if o.config.DeliverLast { o.sseq = stats.LastSeq // If we are partitioned here we may need to walk backwards. - if o.config.Subject != _EMPTY_ { + if o.config.FilterSubject != _EMPTY_ { o.selectSubjectLast() } } else if o.config.StartTime != noTime { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 5b9b72c2..790453eb 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -497,10 +497,10 @@ func TestJetStreamCreateObservable(t *testing.T) { // Subjects can not be AckAll. if _, err := mset.AddObservable(&server.ObservableConfig{ - Delivery: delivery, - DeliverAll: true, - Subject: "foo", - AckPolicy: server.AckAll, + Delivery: delivery, + DeliverAll: true, + FilterSubject: "foo", + AckPolicy: server.AckAll, }); err == nil { t.Fatalf("Expected an error on partitioned observable with ack policy of all") } @@ -807,7 +807,7 @@ func TestJetStreamSubjecting(t *testing.T) { defer sub.Unsubscribe() nc.Flush() - o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, Subject: subjB, DeliverAll: true}) + o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, FilterSubject: subjB, DeliverAll: true}) if err != nil { t.Fatalf("Expected no error with registered interest, got %v", err) } @@ -877,7 +877,7 @@ func TestJetStreamWorkQueueSubjecting(t *testing.T) { } oname := "WQ" - o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, Subject: subjA, DeliverAll: true, AckPolicy: server.AckExplicit}) + o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, FilterSubject: subjA, DeliverAll: true, AckPolicy: server.AckExplicit}) if err != nil { t.Fatalf("Expected no error with registered interest, got %v", err) } @@ -1111,7 +1111,7 @@ func TestJetStreamWorkQueueRetentionMsgSet(t *testing.T) { pConfig := func(pname string) *server.ObservableConfig { dname := fmt.Sprintf("PPBO-%d", pindex) pindex += 1 - return &server.ObservableConfig{Durable: dname, DeliverAll: true, Subject: pname, AckPolicy: server.AckExplicit} + return &server.ObservableConfig{Durable: dname, DeliverAll: true, FilterSubject: pname, AckPolicy: server.AckExplicit} } o, err = mset.AddObservable(pConfig("MY_WORK_QUEUE.A")) if err != nil { @@ -1873,12 +1873,12 @@ func TestJetStreamDurableSubjectedObservableReconnect(t *testing.T) { // Now create an observable for foo.AA, only requesting the last one. o, err := mset.AddObservable(&server.ObservableConfig{ - Durable: dname, - Delivery: dsubj, - Subject: "foo.AA", - DeliverLast: true, - AckPolicy: server.AckExplicit, - AckWait: 100 * time.Millisecond, + Durable: dname, + Delivery: dsubj, + FilterSubject: "foo.AA", + DeliverLast: true, + AckPolicy: server.AckExplicit, + AckWait: 100 * time.Millisecond, }) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3470,7 +3470,7 @@ func TestJetStreamDeleteMsg(t *testing.T) { sub, _ := nc.SubscribeSync(delivery) nc.Flush() - o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverAll: true, Subject: "foo"}) + o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverAll: true, FilterSubject: "foo"}) if err != nil { t.Fatalf("Unexpected error: %v", err) }