From 8b2315eadd0a5a85d6ad3f773c129f9440e98252 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 16 Sep 2022 14:45:33 -0700 Subject: [PATCH] When filtering a source stream use new consumer create API subject. Signed-off-by: Derek Collison --- server/stream.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/stream.go b/server/stream.go index d03892b8..e1948961 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2481,13 +2481,21 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T return } - b, _ := json.Marshal(req) - subject := fmt.Sprintf(JSApiConsumerCreateT, si.name) + var subject string + if req.Config.FilterSubject != _EMPTY_ { + req.Config.Name = fmt.Sprintf("src-%s", createConsumerName()) + subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject) + } else { + subject = fmt.Sprintf(JSApiConsumerCreateT, si.name) + } if ext != nil { subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) subject = strings.ReplaceAll(subject, "..", ".") } + // Marshal request. + b, _ := json.Marshal(req) + // We need to create the subscription that will receive the messages prior // to sending the consumer create request, because in some complex topologies // with gateways and optimistic mode, it is possible that the consumer starts