From 62f62d40713b5c8b2910d9ceb7c41baecaee4818 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Wed, 16 Aug 2023 12:06:47 -0700 Subject: [PATCH] Adds sfs to sourceInfo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds sfs to SourceInfo such that transforms with just a subject filter (and no transformation, meaning that the transform pointer in streamInfo is nil) can still be reflected in SourceInfo, which is important since the filtering is still happening, just no transformation as well. Signed-off-by: Jean-Noël Moyne --- server/stream.go | 48 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/server/stream.go b/server/stream.go index 9fe4b92f..f7b8f203 100644 --- a/server/stream.go +++ b/server/stream.go @@ -304,6 +304,7 @@ type sourceInfo struct { wg sync.WaitGroup sf string // subject filter tr *subjectTransform + sfs []string // subject filters trs []*subjectTransform // subject transforms } @@ -1739,8 +1740,10 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) } else { si = &sourceInfo{name: s.Name, iname: s.iname} si.trs = make([]*subjectTransform, len(s.SubjectTransforms)) + si.sfs = make([]string, len(s.SubjectTransforms)) for i := range s.SubjectTransforms { // err can be ignored as already validated in config check + si.sfs[i] = s.SubjectTransforms[i].Source var err error si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination) if err != nil { @@ -2012,18 +2015,23 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo { return nil } - var trConfigs []SubjectTransformConfig - for _, tr := range si.trs { - if tr != nil { - trConfigs = append(trConfigs, SubjectTransformConfig{Source: tr.src, Destination: tr.dest}) - } - } - var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf, SubjectTransforms: trConfigs} + var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf} if si.tr != nil { ssi.SubjectTransformDest = si.tr.dest } + trConfigs := make([]SubjectTransformConfig, len(si.sfs)) + for i := range si.sfs { + destination := _EMPTY_ + if si.trs[i] != nil { + destination = si.trs[i].dest + } + trConfigs[i] = SubjectTransformConfig{si.sfs[i], destination} + } + + ssi.SubjectTransforms = trConfigs + // If we have not heard from the source, set Active to -1. if si.last.IsZero() { ssi.Active = -1 @@ -2442,6 +2450,7 @@ func (mset *stream) setupMirrorConsumer() error { // Filters if mset.cfg.Mirror.FilterSubject != _EMPTY_ { req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject + mirror.sf = mset.cfg.Mirror.FilterSubject // Set transform if any var err error mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest) @@ -2450,17 +2459,22 @@ func (mset *stream) setupMirrorConsumer() error { } } - var filters []string - for _, tr := range mset.cfg.Mirror.SubjectTransforms { + sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms)) + trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms)) + + for i, tr := range mset.cfg.Mirror.SubjectTransforms { // will not fail as already checked before that the transform will work subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination) if err != nil { mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) } - mirror.trs = append(mirror.trs, subjectTransform) - filters = append(filters, tr.Source) + + sfs[i] = tr.Source + trs[i] = subjectTransform } - req.Config.FilterSubjects = filters + mirror.sfs = sfs + mirror.trs = trs + req.Config.FilterSubjects = sfs respCh := make(chan *JSApiConsumerCreateResponse, 1) reply := infoReplySubject() @@ -3265,15 +3279,17 @@ func (mset *stream) startingSequenceForSources() { mset.srv.Errorf("Unable to get subject transform for source: %v", err) } } else { - var trs []*subjectTransform - for _, str := range ssi.SubjectTransforms { + sfs := make([]string, len(ssi.SubjectTransforms)) + trs := make([]*subjectTransform, len(ssi.SubjectTransforms)) + for i, str := range ssi.SubjectTransforms { tr, err := NewSubjectTransform(str.Source, str.Destination) if err != nil { mset.srv.Errorf("Unable to get subject transform for source: %v", err) } - trs = append(trs, tr) + sfs[i] = str.Source + trs[i] = tr } - si = &sourceInfo{name: ssi.Name, iname: ssi.iname, trs: trs} + si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sfs: sfs, trs: trs} } mset.sources[ssi.iname] = si }