mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
always store the filter subject
This avoids a situation where a consumer asks for ORDERS.new today on a stream of ORDERS.new but later someone makes the same stream ORDERS.* and the new consumer would then get messages for other purposes as well Signed-off-by: R.I.Pienaar <rip@devco.net>
This commit is contained in:
@@ -291,19 +291,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
|
||||
// Make sure any partition subject is also a literal.
|
||||
if config.FilterSubject != _EMPTY_ {
|
||||
var checkSubject bool
|
||||
|
||||
mset.mu.RLock()
|
||||
// If this is a direct match for the streams only subject clear the filter.
|
||||
if len(mset.cfg.Subjects) == 1 && mset.cfg.Subjects[0] == config.FilterSubject {
|
||||
config.FilterSubject = _EMPTY_
|
||||
} else {
|
||||
checkSubject = true
|
||||
}
|
||||
mset.mu.RUnlock()
|
||||
|
||||
// Make sure this is a valid partition of the interest subjects.
|
||||
if checkSubject && !mset.validSubject(config.FilterSubject) {
|
||||
if !mset.validSubject(config.FilterSubject) {
|
||||
return nil, fmt.Errorf("consumer filter subject is not a valid subset of the interest subjects")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9283,9 +9283,9 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
|
||||
}
|
||||
|
||||
// This test is in support fo clients that want to match on subject, they
|
||||
// can set the filter subject always and if the stream only has one subject
|
||||
// and they match the filter is cleared automatically. This eliminates us
|
||||
// needing to know if a subject is a subset of a stream when looking it up.
|
||||
// can set the filter subject always. We always store the subject so that
|
||||
// should the stream later be edited to expand into more subjects the consumer
|
||||
// still gets what was actually requested
|
||||
func TestJetStreamConsumerFilterSubject(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
@@ -9310,8 +9310,8 @@ func TestJetStreamConsumerFilterSubject(t *testing.T) {
|
||||
}
|
||||
defer o.delete()
|
||||
|
||||
if o.info().Config.FilterSubject != "" {
|
||||
t.Fatalf("Expected the filter to be cleared")
|
||||
if o.info().Config.FilterSubject != "foo" {
|
||||
t.Fatalf("Expected the filter to be stored")
|
||||
}
|
||||
|
||||
// Now use the original cfg with updated delivery subject and make sure that works ok.
|
||||
|
||||
Reference in New Issue
Block a user