diff --git a/server/stream.go b/server/stream.go index 9fe4b92f..f7b8f203 100644 --- a/server/stream.go +++ b/server/stream.go @@ -304,6 +304,7 @@ type sourceInfo struct { wg sync.WaitGroup sf string // subject filter tr *subjectTransform + sfs []string // subject filters trs []*subjectTransform // subject transforms } @@ -1739,8 +1740,10 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) } else { si = &sourceInfo{name: s.Name, iname: s.iname} si.trs = make([]*subjectTransform, len(s.SubjectTransforms)) + si.sfs = make([]string, len(s.SubjectTransforms)) for i := range s.SubjectTransforms { // err can be ignored as already validated in config check + si.sfs[i] = s.SubjectTransforms[i].Source var err error si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination) if err != nil { @@ -2012,18 +2015,23 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo { return nil } - var trConfigs []SubjectTransformConfig - for _, tr := range si.trs { - if tr != nil { - trConfigs = append(trConfigs, SubjectTransformConfig{Source: tr.src, Destination: tr.dest}) - } - } - var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf, SubjectTransforms: trConfigs} + var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf} if si.tr != nil { ssi.SubjectTransformDest = si.tr.dest } + trConfigs := make([]SubjectTransformConfig, len(si.sfs)) + for i := range si.sfs { + destination := _EMPTY_ + if si.trs[i] != nil { + destination = si.trs[i].dest + } + trConfigs[i] = SubjectTransformConfig{si.sfs[i], destination} + } + + ssi.SubjectTransforms = trConfigs + // If we have not heard from the source, set Active to -1. if si.last.IsZero() { ssi.Active = -1 @@ -2442,6 +2450,7 @@ func (mset *stream) setupMirrorConsumer() error { // Filters if mset.cfg.Mirror.FilterSubject != _EMPTY_ { req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject + mirror.sf = mset.cfg.Mirror.FilterSubject // Set transform if any var err error mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest) @@ -2450,17 +2459,22 @@ func (mset *stream) setupMirrorConsumer() error { } } - var filters []string - for _, tr := range mset.cfg.Mirror.SubjectTransforms { + sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms)) + trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms)) + + for i, tr := range mset.cfg.Mirror.SubjectTransforms { // will not fail as already checked before that the transform will work subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination) if err != nil { mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) } - mirror.trs = append(mirror.trs, subjectTransform) - filters = append(filters, tr.Source) + + sfs[i] = tr.Source + trs[i] = subjectTransform } - req.Config.FilterSubjects = filters + mirror.sfs = sfs + mirror.trs = trs + req.Config.FilterSubjects = sfs respCh := make(chan *JSApiConsumerCreateResponse, 1) reply := infoReplySubject() @@ -3265,15 +3279,17 @@ func (mset *stream) startingSequenceForSources() { mset.srv.Errorf("Unable to get subject transform for source: %v", err) } } else { - var trs []*subjectTransform - for _, str := range ssi.SubjectTransforms { + sfs := make([]string, len(ssi.SubjectTransforms)) + trs := make([]*subjectTransform, len(ssi.SubjectTransforms)) + for i, str := range ssi.SubjectTransforms { tr, err := NewSubjectTransform(str.Source, str.Destination) if err != nil { mset.srv.Errorf("Unable to get subject transform for source: %v", err) } - trs = append(trs, tr) + sfs[i] = str.Source + trs[i] = tr } - si = &sourceInfo{name: ssi.Name, iname: ssi.iname, trs: trs} + si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sfs: sfs, trs: trs} } mset.sources[ssi.iname] = si }