When filtering a source stream use new consumer create API subject.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-09-16 14:45:33 -07:00
parent 28e5aeb570
commit 8b2315eadd

View File

@@ -2481,13 +2481,21 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
return
}
b, _ := json.Marshal(req)
subject := fmt.Sprintf(JSApiConsumerCreateT, si.name)
var subject string
if req.Config.FilterSubject != _EMPTY_ {
req.Config.Name = fmt.Sprintf("src-%s", createConsumerName())
subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject)
} else {
subject = fmt.Sprintf(JSApiConsumerCreateT, si.name)
}
if ext != nil {
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
subject = strings.ReplaceAll(subject, "..", ".")
}
// Marshal request.
b, _ := json.Marshal(req)
// We need to create the subscription that will receive the messages prior
// to sending the consumer create request, because in some complex topologies
// with gateways and optimistic mode, it is possible that the consumer starts