mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Fix stream config update of source transforms (#4400)
- [x] Build is green in Travis CI - [X] You have certified that the contribution is your original work and that you license the work to the project under the [Apache 2 license](https://github.com/nats-io/nats-server/blob/main/LICENSE) Fixes potential out of range access during some stream source transform configuration updates and tiny clean up Fixes stream sourcing message header parsing for multi-subject transform in sources
This commit is contained in:
@@ -1730,21 +1730,21 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
|
||||
|
||||
if len(s.SubjectTransforms) == 0 {
|
||||
si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject}
|
||||
// Check for transform.
|
||||
if s.SubjectTransformDest != _EMPTY_ {
|
||||
var err error
|
||||
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
|
||||
mset.mu.Unlock()
|
||||
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
|
||||
}
|
||||
// set for transform if any
|
||||
var err error
|
||||
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
|
||||
mset.mu.Unlock()
|
||||
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
|
||||
}
|
||||
} else {
|
||||
si = &sourceInfo{name: s.Name, iname: s.iname}
|
||||
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
|
||||
for i := range s.SubjectTransforms {
|
||||
// err can be ignored as already validated in config check
|
||||
var err error
|
||||
si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination)
|
||||
if err != nil {
|
||||
mset.mu.Unlock()
|
||||
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -2442,12 +2442,11 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
// Filters
|
||||
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
|
||||
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
|
||||
if mset.cfg.Mirror.SubjectTransformDest != _EMPTY_ {
|
||||
var err error
|
||||
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
|
||||
if err != nil {
|
||||
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
|
||||
}
|
||||
// Set transform if any
|
||||
var err error
|
||||
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
|
||||
if err != nil {
|
||||
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3138,7 +3137,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
|
||||
// Generate a new (2.10) style source header (stream name, sequence number, source filter, source destination transform).
|
||||
func (si *sourceInfo) genSourceHeader(reply string) string {
|
||||
var b strings.Builder
|
||||
iNameParts := strings.Fields(si.iname)
|
||||
iNameParts := strings.Split(si.iname, " ")
|
||||
|
||||
b.WriteString(iNameParts[0])
|
||||
b.WriteByte(' ')
|
||||
@@ -3187,7 +3186,7 @@ func streamAndSeq(shdr string) (string, string, uint64) {
|
||||
return streamAndSeqFromAckReply(shdr)
|
||||
}
|
||||
// New version which is stream index name <SPC> sequence
|
||||
fields := strings.Fields(shdr)
|
||||
fields := strings.Split(shdr, " ")
|
||||
nFields := len(fields)
|
||||
|
||||
if nFields != 2 && nFields <= 3 {
|
||||
@@ -3258,14 +3257,12 @@ func (mset *stream) startingSequenceForSources() {
|
||||
|
||||
if len(ssi.SubjectTransforms) == 0 {
|
||||
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
|
||||
// Check for transform.
|
||||
if ssi.SubjectTransformDest != _EMPTY_ {
|
||||
// no need to check the error as already validated that it will not before
|
||||
var err error
|
||||
si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
|
||||
if err != nil {
|
||||
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
|
||||
}
|
||||
// Set the transform if any
|
||||
// technically no need to check the error as already validated that it will not before
|
||||
var err error
|
||||
si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
|
||||
if err != nil {
|
||||
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
|
||||
}
|
||||
} else {
|
||||
var trs []*subjectTransform
|
||||
|
||||
Reference in New Issue
Block a user