Adds sfs to sourceInfo

Adds sfs to SourceInfo such that transforms with just a subject filter (and no transformation, meaning that the transform pointer in streamInfo is nil) can still be reflected in SourceInfo, which is important since the filtering is still happening, just no transformation as well.

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
This commit is contained in:
Jean-Noël Moyne
2023-08-16 12:06:47 -07:00
parent 3f28de8e83
commit 62f62d4071

View File

@@ -304,6 +304,7 @@ type sourceInfo struct {
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
sfs []string // subject filters
trs []*subjectTransform // subject transforms
}
@@ -1739,8 +1740,10 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
si.sfs = make([]string, len(s.SubjectTransforms))
for i := range s.SubjectTransforms {
// err can be ignored as already validated in config check
si.sfs[i] = s.SubjectTransforms[i].Source
var err error
si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination)
if err != nil {
@@ -2012,18 +2015,23 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
return nil
}
var trConfigs []SubjectTransformConfig
for _, tr := range si.trs {
if tr != nil {
trConfigs = append(trConfigs, SubjectTransformConfig{Source: tr.src, Destination: tr.dest})
}
}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf, SubjectTransforms: trConfigs}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf}
if si.tr != nil {
ssi.SubjectTransformDest = si.tr.dest
}
trConfigs := make([]SubjectTransformConfig, len(si.sfs))
for i := range si.sfs {
destination := _EMPTY_
if si.trs[i] != nil {
destination = si.trs[i].dest
}
trConfigs[i] = SubjectTransformConfig{si.sfs[i], destination}
}
ssi.SubjectTransforms = trConfigs
// If we have not heard from the source, set Active to -1.
if si.last.IsZero() {
ssi.Active = -1
@@ -2442,6 +2450,7 @@ func (mset *stream) setupMirrorConsumer() error {
// Filters
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
mirror.sf = mset.cfg.Mirror.FilterSubject
// Set transform if any
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
@@ -2450,17 +2459,22 @@ func (mset *stream) setupMirrorConsumer() error {
}
}
var filters []string
for _, tr := range mset.cfg.Mirror.SubjectTransforms {
sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms))
for i, tr := range mset.cfg.Mirror.SubjectTransforms {
// will not fail as already checked before that the transform will work
subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
mirror.trs = append(mirror.trs, subjectTransform)
filters = append(filters, tr.Source)
sfs[i] = tr.Source
trs[i] = subjectTransform
}
req.Config.FilterSubjects = filters
mirror.sfs = sfs
mirror.trs = trs
req.Config.FilterSubjects = sfs
respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
@@ -3265,15 +3279,17 @@ func (mset *stream) startingSequenceForSources() {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
var trs []*subjectTransform
for _, str := range ssi.SubjectTransforms {
sfs := make([]string, len(ssi.SubjectTransforms))
trs := make([]*subjectTransform, len(ssi.SubjectTransforms))
for i, str := range ssi.SubjectTransforms {
tr, err := NewSubjectTransform(str.Source, str.Destination)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
trs = append(trs, tr)
sfs[i] = str.Source
trs[i] = tr
}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, trs: trs}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sfs: sfs, trs: trs}
}
mset.sources[ssi.iname] = si
}