diff --git a/server/stream.go b/server/stream.go index d03892b8..e1948961 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2481,13 +2481,21 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T return } - b, _ := json.Marshal(req) - subject := fmt.Sprintf(JSApiConsumerCreateT, si.name) + var subject string + if req.Config.FilterSubject != _EMPTY_ { + req.Config.Name = fmt.Sprintf("src-%s", createConsumerName()) + subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject) + } else { + subject = fmt.Sprintf(JSApiConsumerCreateT, si.name) + } if ext != nil { subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) subject = strings.ReplaceAll(subject, "..", ".") } + // Marshal request. + b, _ := json.Marshal(req) + // We need to create the subscription that will receive the messages prior // to sending the consumer create request, because in some complex topologies // with gateways and optimistic mode, it is possible that the consumer starts