Removes the single subject transform dest field from StreamSource

Co-authored-by: Jean-Noël Moyne <jnmoyne@gmail.com>
Co-authored-by: Neil Twigg <neil@nats.io>

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Jean-Noël Moyne
2023-09-18 16:45:14 -07:00
committed by Neil Twigg
parent 81c0a14193
commit 9fc2603263

View File

@@ -180,25 +180,23 @@ type PeerInfo struct {
// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"-"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}
// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"-"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
// Internal
iname string // For indexing when stream names are the same for multiple sources.
@@ -313,8 +311,7 @@ type sourceInfo struct {
qch chan struct{}
sip bool // setup in progress
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
sf string // subject filter
sfs []string // subject filters
trs []*subjectTransform // subject transforms
}
@@ -477,12 +474,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the mirror %w", cfg.Mirror.FilterSubject, ErrBadSubject)
}
if cfg.Mirror.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the mirror %w", cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest, err)
}
}
} else {
for _, st := range cfg.Mirror.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
@@ -508,13 +499,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source: %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source: %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
} else {
for _, st := range ssi.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
@@ -703,7 +687,7 @@ func (ssi *StreamSource) composeIName() string {
}
source := ssi.FilterSubject
destination := ssi.SubjectTransformDest
destination := fwcs
if len(ssi.SubjectTransforms) == 0 {
// normalize filter and destination in case they are empty
@@ -1271,7 +1255,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if len(cfg.Sources) > 0 {
return StreamConfig{}, NewJSMirrorWithSourcesError()
}
if (cfg.Mirror.FilterSubject != _EMPTY_ || cfg.Mirror.SubjectTransformDest != _EMPTY_) && len(cfg.Mirror.SubjectTransforms) != 0 {
if cfg.Mirror.FilterSubject != _EMPTY_ && len(cfg.Mirror.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSMirrorMultipleFiltersNotAllowedError()
}
// Check subject filters overlap.
@@ -1351,7 +1335,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}
}
if (src.FilterSubject != _EMPTY_ || src.SubjectTransformDest != _EMPTY_) && len(src.SubjectTransforms) != 0 {
if src.FilterSubject != _EMPTY_ && len(src.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSSourceMultipleFiltersNotAllowedError()
}
@@ -1787,12 +1771,6 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if len(s.SubjectTransforms) == 0 {
si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject}
// set for transform if any
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
}
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
@@ -2073,10 +2051,6 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf}
if si.tr != nil {
ssi.SubjectTransformDest = si.tr.dest
}
trConfigs := make([]SubjectTransformConfig, len(si.sfs))
for i := range si.sfs {
destination := _EMPTY_
@@ -2277,18 +2251,15 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
}
// Do the subject transform if there's one
if mset.mirror.tr != nil {
m.subj = mset.mirror.tr.TransformSubject(m.subj)
} else {
for _, tr := range mset.mirror.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
for _, tr := range mset.mirror.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
@@ -2506,12 +2477,6 @@ func (mset *stream) setupMirrorConsumer() error {
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
mirror.sf = mset.cfg.Mirror.FilterSubject
// Set transform if any
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
}
sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
@@ -3173,18 +3138,15 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))
// Do the subject transform for the source if there's one
if si.tr != nil {
m.subj = si.tr.TransformSubject(m.subj)
} else {
for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
@@ -3345,13 +3307,6 @@ func (mset *stream) startingSequenceForSources() {
if len(ssi.SubjectTransforms) == 0 {
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Set the transform if any
// technically no need to check the error as already validated that it will not before
var err error
si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
sfs := make([]string, len(ssi.SubjectTransforms))
trs := make([]*subjectTransform, len(ssi.SubjectTransforms))