From c46d8093bc52311a24d6185035879efa08d06b68 Mon Sep 17 00:00:00 2001 From: Pierre Mdawar Date: Thu, 12 Oct 2023 09:39:06 +0300 Subject: [PATCH] Fix updating a non unique consumer on workqueue stream not returning an error --- server/consumer.go | 9 +++- server/jetstream_consumer_test.go | 75 +++++++++++++++++++++++++++++++ server/stream.go | 8 +++- 3 files changed, 89 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 15fbd128..94e000a4 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -767,6 +767,13 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if action == ActionCreate && !reflect.DeepEqual(*config, eo.config()) { return nil, NewJSConsumerAlreadyExistsError() } + // Check for overlapping subjects. + if mset.cfg.Retention == WorkQueuePolicy { + subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) + if !mset.partitionUnique(cName, subjects) { + return nil, NewJSConsumerWQConsumerNotUniqueError() + } + } err := eo.updateConfig(config) if err == nil { return eo, nil @@ -805,7 +812,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if len(subjects) == 0 { mset.mu.Unlock() return nil, NewJSConsumerWQMultipleUnfilteredError() - } else if !mset.partitionUnique(subjects) { + } else if !mset.partitionUnique(cName, subjects) { // Prior to v2.9.7, on a stream with WorkQueue policy, the servers // were not catching the error of having multiple consumers with // overlapping filter subjects depending on the scope, for instance diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 9ed8a249..39b188fe 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -534,6 +534,81 @@ func TestJetStreamConsumerActions(t *testing.T) { require_Error(t, err) } +func TestJetStreamConsumerActionsOnWorkQueuePolicyStream(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Name: "TEST", + Retention: WorkQueuePolicy, + Subjects: []string{"one", "two", "three", "four", "five.>"}, + }) + require_NoError(t, err) + + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "C1", + FilterSubjects: []string{"one", "two"}, + AckPolicy: AckExplicit, + }, ActionCreate) + require_NoError(t, err) + + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "C2", + FilterSubjects: []string{"three", "four"}, + AckPolicy: AckExplicit, + }, ActionCreate) + require_NoError(t, err) + + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "C3", + FilterSubjects: []string{"five.*"}, + AckPolicy: AckExplicit, + }, ActionCreate) + require_NoError(t, err) + + // Updating a consumer by removing a previous subject filter. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "C1", + FilterSubjects: []string{"one"}, // Remove a subject. + AckPolicy: AckExplicit, + }, ActionUpdate) + require_NoError(t, err) + + // Updating a consumer without overlapping subjects. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "C2", + FilterSubjects: []string{"three", "four", "two"}, // Add previously removed subject. + AckPolicy: AckExplicit, + }, ActionUpdate) + require_NoError(t, err) + + // Creating a consumer with overlapping subjects should return an error. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "C4", + FilterSubjects: []string{"one", "two", "three", "four"}, + AckPolicy: AckExplicit, + }, ActionCreate) + require_Error(t, err) + if !IsNatsErr(err, JSConsumerWQConsumerNotUniqueErr) { + t.Errorf("want error %q, got %q", ApiErrors[JSConsumerWQConsumerNotUniqueErr], err) + } + + // Updating a consumer with overlapping subjects should return an error. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "C3", + FilterSubjects: []string{"one", "two", "three", "four"}, + AckPolicy: AckExplicit, + }, ActionUpdate) + require_Error(t, err) + if !IsNatsErr(err, JSConsumerWQConsumerNotUniqueErr) { + t.Errorf("want error %q, got %q", ApiErrors[JSConsumerWQConsumerNotUniqueErr], err) + } +} + func TestJetStreamConsumerActionsViaAPI(t *testing.T) { s := RunBasicJetStreamServer(t) diff --git a/server/stream.go b/server/stream.go index 72ca5f35..096c850e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5266,9 +5266,13 @@ func (mset *stream) Store() StreamStore { // Determines if the new proposed partition is unique amongst all consumers. // Lock should be held. -func (mset *stream) partitionUnique(partitions []string) bool { +func (mset *stream) partitionUnique(name string, partitions []string) bool { for _, partition := range partitions { - for _, o := range mset.consumers { + for n, o := range mset.consumers { + // Skip the consumer being checked. + if n == name { + continue + } if o.subjf == nil { return false }