mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
[FIXED] JetStream: WorkQueue not preventing overlapping consumers
A stream with a WorkQueue retention policy is supposed to allow more than one consumer if they user filtered subjects, but those subjects should not overlap. There was an issue that if a new consumer had a filter subject "wider" than an existing one, the error was not detected and the new consumer was incorrectly accepted. Resolves #3639 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -650,9 +650,25 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
mset.mu.Unlock()
|
||||
return nil, NewJSConsumerWQMultipleUnfilteredError()
|
||||
} else if !mset.partitionUnique(config.FilterSubject) {
|
||||
// We have a partition but it is not unique amongst the others.
|
||||
mset.mu.Unlock()
|
||||
return nil, NewJSConsumerWQConsumerNotUniqueError()
|
||||
// Prior to v2.9.7, on a stream with WorkQueue policy, the servers
|
||||
// were not catching the error of having multiple consumers with
|
||||
// overlapping filter subjects depending on the scope, for instance
|
||||
// creating "foo.*.bar" and then "foo.>" was not detected, while
|
||||
// "foo.>" and then "foo.*.bar" would have been. Failing here
|
||||
// in recovery mode would leave the rejected consumer in a bad state,
|
||||
// so we will simply warn here, asking the user to remove this
|
||||
// consumer administratively. Otherwise, if this is the creation
|
||||
// of a new consumer, we will return the error.
|
||||
if isRecovering {
|
||||
s.Warnf("Consumer %q > %q has a filter subject that overlaps "+
|
||||
"with other consumers, which is not allowed for a stream "+
|
||||
"with WorkQueue policy, it should be administratively deleted",
|
||||
cfg.Name, cName)
|
||||
} else {
|
||||
// We have a partition but it is not unique amongst the others.
|
||||
mset.mu.Unlock()
|
||||
return nil, NewJSConsumerWQConsumerNotUniqueError()
|
||||
}
|
||||
}
|
||||
}
|
||||
if config.DeliverPolicy != DeliverAll {
|
||||
|
||||
@@ -2358,13 +2358,13 @@ func TestJetStreamWorkQueueRetentionStream(t *testing.T) {
|
||||
{name: "MemoryStore", mconfig: &StreamConfig{
|
||||
Name: "MWQ",
|
||||
Storage: MemoryStorage,
|
||||
Subjects: []string{"MY_WORK_QUEUE.*"},
|
||||
Subjects: []string{"MY_WORK_QUEUE.>"},
|
||||
Retention: WorkQueuePolicy},
|
||||
},
|
||||
{name: "FileStore", mconfig: &StreamConfig{
|
||||
Name: "MWQ",
|
||||
Storage: FileStorage,
|
||||
Subjects: []string{"MY_WORK_QUEUE.*"},
|
||||
Subjects: []string{"MY_WORK_QUEUE.>"},
|
||||
Retention: WorkQueuePolicy},
|
||||
},
|
||||
}
|
||||
@@ -2430,7 +2430,7 @@ func TestJetStreamWorkQueueRetentionStream(t *testing.T) {
|
||||
if _, err := mset.addConsumer(pConfig("MY_WORK_QUEUE.A")); err == nil {
|
||||
t.Fatalf("Expected an error on attempt for partitioned consumer for a workqueue")
|
||||
}
|
||||
if _, err := mset.addConsumer(pConfig("MY_WORK_QUEUE.A")); err == nil {
|
||||
if _, err := mset.addConsumer(pConfig("MY_WORK_QUEUE.B")); err == nil {
|
||||
t.Fatalf("Expected an error on attempt for partitioned consumer for a workqueue")
|
||||
}
|
||||
|
||||
@@ -2443,6 +2443,22 @@ func TestJetStreamWorkQueueRetentionStream(t *testing.T) {
|
||||
o2.delete()
|
||||
o3.delete()
|
||||
|
||||
// Test with wildcards, first from wider to narrower
|
||||
o, err = mset.addConsumer(pConfig("MY_WORK_QUEUE.>"))
|
||||
require_NoError(t, err)
|
||||
if _, err := mset.addConsumer(pConfig("MY_WORK_QUEUE.*.BAR")); err == nil {
|
||||
t.Fatalf("Expected an error on attempt for partitioned consumer for a workqueue")
|
||||
}
|
||||
o.delete()
|
||||
|
||||
// Now from narrower to wider
|
||||
o, err = mset.addConsumer(pConfig("MY_WORK_QUEUE.*.BAR"))
|
||||
require_NoError(t, err)
|
||||
if _, err := mset.addConsumer(pConfig("MY_WORK_QUEUE.>")); err == nil {
|
||||
t.Fatalf("Expected an error on attempt for partitioned consumer for a workqueue")
|
||||
}
|
||||
o.delete()
|
||||
|
||||
// Push based will be allowed now, including ephemerals.
|
||||
// They can not overlap etc meaning same rules as above apply.
|
||||
o4, err := mset.addConsumer(&ConsumerConfig{
|
||||
|
||||
@@ -4675,7 +4675,8 @@ func (mset *stream) partitionUnique(partition string) bool {
|
||||
if o.cfg.FilterSubject == _EMPTY_ {
|
||||
return false
|
||||
}
|
||||
if subjectIsSubsetMatch(partition, o.cfg.FilterSubject) {
|
||||
if subjectIsSubsetMatch(partition, o.cfg.FilterSubject) ||
|
||||
subjectIsSubsetMatch(o.cfg.FilterSubject, partition) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user