Move Subject to FilterSubject

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-01-08 05:48:13 -08:00
parent 89ff13a5be
commit 1028798d3a
4 changed files with 36 additions and 36 deletions

View File

@@ -739,10 +739,10 @@ func TestFileStoreMeta(t *testing.T) {
// Now create an observable. Same deal for them.
oconfig := ObservableConfig{
Delivery: "d",
DeliverAll: true,
Subject: "foo",
AckPolicy: AckAll,
Delivery: "d",
DeliverAll: true,
FilterSubject: "foo",
AckPolicy: AckAll,
}
oname := "obs22"
obs, err := fs.ObservableStore(oname, &oconfig)

View File

@@ -617,10 +617,10 @@ func (mset *MsgSet) waitForMsgs() {
// Lock should be held.
func (mset *MsgSet) partitionUnique(partition string) bool {
for _, o := range mset.obs {
if o.config.Subject == _EMPTY_ {
if o.config.FilterSubject == _EMPTY_ {
return false
}
if subjectIsSubsetMatch(partition, o.config.Subject) {
if subjectIsSubsetMatch(partition, o.config.FilterSubject) {
return false
}
}

View File

@@ -43,7 +43,7 @@ type ObservableConfig struct {
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
Subject string `json:"subject,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
SampleFrequency string `json:"sample_frequency,omitempty"`
}
@@ -196,16 +196,16 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
}
// Make sure any partition subject is also a literal.
if config.Subject != "" {
if !subjectIsLiteral(config.Subject) {
return nil, fmt.Errorf("observable partition subject has wildcards")
if config.FilterSubject != "" {
if !subjectIsLiteral(config.FilterSubject) {
return nil, fmt.Errorf("observable filter subject has wildcards")
}
// Make sure this is a valid partition of the interest subjects.
if !mset.validSubject(config.Subject) {
return nil, fmt.Errorf("observable partition not a valid subset of the interest subjects")
if !mset.validSubject(config.FilterSubject) {
return nil, fmt.Errorf("observable filter subject is not a valid subset of the interest subjects")
}
if config.AckPolicy == AckAll {
return nil, fmt.Errorf("observable with partition can not have an ack policy of ack all")
return nil, fmt.Errorf("observable with filter subject can not have an ack policy of ack all")
}
}
@@ -245,13 +245,13 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
return nil, fmt.Errorf("delivery subject not allowed on workqueue message set")
}
if len(mset.obs) > 0 {
if config.Subject == _EMPTY_ {
if config.FilterSubject == _EMPTY_ {
mset.mu.Unlock()
return nil, fmt.Errorf("multiple non-partioned observables not allowed on workqueue message set")
} else if !mset.partitionUnique(config.Subject) {
return nil, fmt.Errorf("multiple non-filtered observables not allowed on workqueue message set")
} else if !mset.partitionUnique(config.FilterSubject) {
// We have a partition but it is not unique amongst the others.
mset.mu.Unlock()
return nil, fmt.Errorf("partioned observable not unique on workqueue message set")
return nil, fmt.Errorf("filtered observable not unique on workqueue message set")
}
}
if !config.DeliverAll {
@@ -828,7 +828,7 @@ func (o *Observable) getNextMsg() (string, []byte, uint64, uint64, error) {
if err == nil {
if dcount == 1 { // First delivery.
o.sseq++
if o.config.Subject != "" && subj != o.config.Subject {
if o.config.FilterSubject != _EMPTY_ && subj != o.config.FilterSubject {
continue
}
}
@@ -881,7 +881,7 @@ func (o *Observable) processReplay() error {
o.mu.Lock()
mset := o.mset
partition := o.config.Subject
partition := o.config.FilterSubject
pullMode := o.isPullMode()
o.mu.Unlock()
@@ -1063,7 +1063,7 @@ func (o *Observable) deliverCurrentMsg(subj string, msg []byte, seq uint64) bool
// If we are partitioned and we do not match, do not consider this a failure.
// Go ahead and return true.
if o.config.Subject != "" && subj != o.config.Subject {
if o.config.FilterSubject != _EMPTY_ && subj != o.config.FilterSubject {
o.mu.Unlock()
return true
}
@@ -1233,7 +1233,7 @@ func (o *Observable) selectSubjectLast() {
if err == ErrStoreMsgNotFound {
continue
}
if subj == o.config.Subject {
if subj == o.config.FilterSubject {
o.sseq = seq
return
}
@@ -1250,7 +1250,7 @@ func (o *Observable) selectStartingSeqNo() {
} else if o.config.DeliverLast {
o.sseq = stats.LastSeq
// If we are partitioned here we may need to walk backwards.
if o.config.Subject != _EMPTY_ {
if o.config.FilterSubject != _EMPTY_ {
o.selectSubjectLast()
}
} else if o.config.StartTime != noTime {

View File

@@ -497,10 +497,10 @@ func TestJetStreamCreateObservable(t *testing.T) {
// Subjects can not be AckAll.
if _, err := mset.AddObservable(&server.ObservableConfig{
Delivery: delivery,
DeliverAll: true,
Subject: "foo",
AckPolicy: server.AckAll,
Delivery: delivery,
DeliverAll: true,
FilterSubject: "foo",
AckPolicy: server.AckAll,
}); err == nil {
t.Fatalf("Expected an error on partitioned observable with ack policy of all")
}
@@ -807,7 +807,7 @@ func TestJetStreamSubjecting(t *testing.T) {
defer sub.Unsubscribe()
nc.Flush()
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, Subject: subjB, DeliverAll: true})
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, FilterSubject: subjB, DeliverAll: true})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
@@ -877,7 +877,7 @@ func TestJetStreamWorkQueueSubjecting(t *testing.T) {
}
oname := "WQ"
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, Subject: subjA, DeliverAll: true, AckPolicy: server.AckExplicit})
o, err := mset.AddObservable(&server.ObservableConfig{Durable: oname, FilterSubject: subjA, DeliverAll: true, AckPolicy: server.AckExplicit})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
@@ -1111,7 +1111,7 @@ func TestJetStreamWorkQueueRetentionMsgSet(t *testing.T) {
pConfig := func(pname string) *server.ObservableConfig {
dname := fmt.Sprintf("PPBO-%d", pindex)
pindex += 1
return &server.ObservableConfig{Durable: dname, DeliverAll: true, Subject: pname, AckPolicy: server.AckExplicit}
return &server.ObservableConfig{Durable: dname, DeliverAll: true, FilterSubject: pname, AckPolicy: server.AckExplicit}
}
o, err = mset.AddObservable(pConfig("MY_WORK_QUEUE.A"))
if err != nil {
@@ -1873,12 +1873,12 @@ func TestJetStreamDurableSubjectedObservableReconnect(t *testing.T) {
// Now create an observable for foo.AA, only requesting the last one.
o, err := mset.AddObservable(&server.ObservableConfig{
Durable: dname,
Delivery: dsubj,
Subject: "foo.AA",
DeliverLast: true,
AckPolicy: server.AckExplicit,
AckWait: 100 * time.Millisecond,
Durable: dname,
Delivery: dsubj,
FilterSubject: "foo.AA",
DeliverLast: true,
AckPolicy: server.AckExplicit,
AckWait: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -3470,7 +3470,7 @@ func TestJetStreamDeleteMsg(t *testing.T) {
sub, _ := nc.SubscribeSync(delivery)
nc.Flush()
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverAll: true, Subject: "foo"})
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverAll: true, FilterSubject: "foo"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}