[ADDED] Support for multi-filter in stream sources (#4276)

- [X] Tests added
- [X] Branch rebased on top of current main (`git pull --rebase origin
main`)
- [X] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [X] Build is green in Travis CI
- [X] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)

### Changes proposed in this pull request:

Adds support for multi-filter (and associated transform destinations) to
stream sources

---------

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
This commit is contained in:
Jean-Noël Moyne
2023-08-01 10:50:11 -07:00
committed by GitHub
parent e51a42963a
commit 449b27535e
8 changed files with 359 additions and 91 deletions

View File

@@ -1418,5 +1418,45 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSSourceMultipleFiltersNotAllowed",
"code": 400,
"error_code": 10143,
"description": "source with multiple subject filters cannot also have a single subject filter",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSSourceInvalidSubjectFilter",
"code": 400,
"error_code": 10144,
"description": "source subject filter is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSSourceInvalidTransformDestination",
"code": 400,
"error_code": 10145,
"description": "source transform destination is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSSourceOverlappingSubjectFilters",
"code": 400,
"error_code": 10146,
"description": "source filters can not overlap",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]

View File

@@ -1377,6 +1377,8 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
}
resp.DidCreate = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))

View File

@@ -290,9 +290,21 @@ const (
// JSSourceInvalidStreamName sourced stream name is invalid
JSSourceInvalidStreamName ErrorIdentifier = 10141
// JSSourceInvalidSubjectFilter source subject filter is invalid
JSSourceInvalidSubjectFilter ErrorIdentifier = 10144
// JSSourceInvalidTransformDestination source transform destination is invalid
JSSourceInvalidTransformDestination ErrorIdentifier = 10145
// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046
// JSSourceMultipleFiltersNotAllowed source with multiple subject filters cannot also have a single subject filter
JSSourceMultipleFiltersNotAllowed ErrorIdentifier = 10143
// JSSourceOverlappingSubjectFilters source filters can not overlap
JSSourceOverlappingSubjectFilters ErrorIdentifier = 10146
// JSStorageResourcesExceededErr insufficient storage resources available
JSStorageResourcesExceededErr ErrorIdentifier = 10047
@@ -529,7 +541,11 @@ var (
JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"},
JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"},
JSSourceInvalidStreamName: {Code: 400, ErrCode: 10141, Description: "sourced stream name is invalid"},
JSSourceInvalidSubjectFilter: {Code: 400, ErrCode: 10144, Description: "source subject filter is invalid"},
JSSourceInvalidTransformDestination: {Code: 400, ErrCode: 10145, Description: "source transform destination is invalid"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10143, Description: "source with multiple subject filters cannot also have a single subject filter"},
JSSourceOverlappingSubjectFilters: {Code: 400, ErrCode: 10146, Description: "source filters can not overlap"},
JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"},
JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"},
JSStreamCreateErrF: {Code: 500, ErrCode: 10049, Description: "{err}"},
@@ -1641,6 +1657,26 @@ func NewJSSourceInvalidStreamNameError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSSourceInvalidStreamName]
}
// NewJSSourceInvalidSubjectFilterError creates a new JSSourceInvalidSubjectFilter error: "source subject filter is invalid"
func NewJSSourceInvalidSubjectFilterError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSSourceInvalidSubjectFilter]
}
// NewJSSourceInvalidTransformDestinationError creates a new JSSourceInvalidTransformDestination error: "source transform destination is invalid"
func NewJSSourceInvalidTransformDestinationError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSSourceInvalidTransformDestination]
}
// NewJSSourceMaxMessageSizeTooBigError creates a new JSSourceMaxMessageSizeTooBigErr error: "stream source must have max message size >= target"
func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
@@ -1651,6 +1687,26 @@ func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSSourceMaxMessageSizeTooBigErr]
}
// NewJSSourceMultipleFiltersNotAllowedError creates a new JSSourceMultipleFiltersNotAllowed error: "source with multiple subject filters cannot also have a single subject filter"
func NewJSSourceMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSSourceMultipleFiltersNotAllowed]
}
// NewJSSourceOverlappingSubjectFiltersError creates a new JSSourceOverlappingSubjectFilters error: "source filters can not overlap"
func NewJSSourceOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSSourceOverlappingSubjectFilters]
}
// NewJSStorageResourcesExceededError creates a new JSStorageResourcesExceededErr error: "insufficient storage resources available"
func NewJSStorageResourcesExceededError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)

View File

@@ -11663,12 +11663,13 @@ func TestJetStreamSourceBasics(t *testing.T) {
}
// Test optional start times, filtered subjects etc.
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"dlc", "rip"}}); err != nil {
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"dlc", "rip", "jnm"}}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sendBatch("dlc", 20)
sendBatch("rip", 20)
sendBatch("dlc", 10)
sendBatch("jnm", 10)
cfg = &StreamConfig{
Name: "FMS",
@@ -11681,8 +11682,8 @@ func TestJetStreamSourceBasics(t *testing.T) {
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("FMS")
require_NoError(t, err)
if si.State.Msgs != 25 {
return fmt.Errorf("Expected 25 msgs, got state: %+v", si.State)
if si.State.Msgs != 35 {
return fmt.Errorf("Expected 35 msgs, got state: %+v", si.State)
}
return nil
})
@@ -11700,7 +11701,7 @@ func TestJetStreamSourceBasics(t *testing.T) {
Name: "FMS2",
Storage: FileStorage,
Sources: []*StreamSource{
{Name: "TEST", OptStartSeq: 11, FilterSubject: "dlc"},
{Name: "TEST", OptStartSeq: 11, FilterSubject: "dlc", SubjectTransformDest: "dlc2"},
},
}
createStream(cfg)
@@ -11722,6 +11723,56 @@ func TestJetStreamSourceBasics(t *testing.T) {
} else if _, _, sseq := streamAndSeq(shdr); sseq != 11 {
t.Fatalf("Expected header sequence of 11, got %d", sseq)
}
if m.Subject != "dlc2" {
t.Fatalf("Expected transformed subject dlc2, but got %s instead", m.Subject)
}
// Test Filters
cfg = &StreamConfig{
Name: "FMS3",
Storage: FileStorage,
Sources: []*StreamSource{
{Name: "TEST", SubjectTransforms: []SubjectTransformConfig{{Source: "dlc", Destination: "dlc2"}, {Source: "rip", Destination: ""}}},
},
}
createStream(cfg)
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("FMS3")
require_NoError(t, err)
if si.State.Msgs != 50 {
return fmt.Errorf("Expected 50 msgs, got state: %+v", si.State)
}
return nil
})
// Double check first message
if m, err = js.GetMsg("FMS3", 1); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if shdr := m.Header.Get(JSStreamSource); shdr == _EMPTY_ {
t.Fatalf("Expected a header, got none")
} else if m.Subject != "dlc2" {
t.Fatalf("Expected subject 'dlc2' and got %s", m.Subject)
}
// Double check first message with the other subject
if m, err = js.GetMsg("FMS3", 21); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if shdr := m.Header.Get(JSStreamSource); shdr == _EMPTY_ {
t.Fatalf("Expected a header, got none")
} else if m.Subject != "rip" {
t.Fatalf("Expected subject 'rip' and got %s", m.Subject)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("FMS3")
require_NoError(t, err)
if si.State.Subjects["jnm"] != 0 {
return fmt.Errorf("Unexpected messages from the source found")
}
return nil
})
}
func TestJetStreamInputTransform(t *testing.T) {

View File

@@ -92,7 +92,7 @@ type StreamConfig struct {
// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received
type SubjectTransformConfig struct {
Source string `json:"src,omitempty"`
Source string `json:"src"`
Destination string `json:"dest"`
}
@@ -170,23 +170,25 @@ type PeerInfo struct {
// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}
// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
// Internal
iname string // For indexing when stream names are the same for multiple sources.
@@ -302,6 +304,7 @@ type sourceInfo struct {
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
trs []*subjectTransform // subject transforms
}
// For mirrors and direct get
@@ -456,18 +459,33 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}
// Setup our internal indexed names here for sources and check if the transform (if any) is valid.
for _, ssi := range cfg.Sources {
// check the filter, if any, is valid
if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
if len(ssi.SubjectTransforms) == 0 {
// check the filter, if any, is valid
if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
return nil, fmt.Errorf("subject filter '%s' for the source: %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source: %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
} else {
for _, st := range ssi.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source: %w", st.Source, ErrBadSubject)
}
// check the transform, if any, is valid
if st.Destination != _EMPTY_ {
if _, err = NewSubjectTransform(st.Source, st.Destination); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source: %w", st.Source, st.Destination, err)
}
}
}
}
}
@@ -521,7 +539,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
return nil, fmt.Errorf("stream subject transform from '%s' to '%s': %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
}
@@ -531,7 +549,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream republish transform from '%s' to '%s' %w", cfg.RePublish.Source, cfg.RePublish.Destination, err)
return nil, fmt.Errorf("stream republish transform from '%s' to '%s': %w", cfg.RePublish.Source, cfg.RePublish.Destination, err)
}
// Assign our transform for republishing.
mset.tr = tr
@@ -640,17 +658,36 @@ func (ssi *StreamSource) composeIName() string {
iName = iName + ":" + getHash(ssi.External.ApiPrefix)
}
filter := ssi.FilterSubject
// normalize filter and destination in case they are empty
if filter == _EMPTY_ {
filter = fwcs
}
source := ssi.FilterSubject
destination := ssi.SubjectTransformDest
if destination == _EMPTY_ {
destination = fwcs
if len(ssi.SubjectTransforms) == 0 {
// normalize filter and destination in case they are empty
if source == _EMPTY_ {
source = fwcs
}
if destination == _EMPTY_ {
destination = fwcs
}
} else {
var sources, destinations []string
for _, tr := range ssi.SubjectTransforms {
trsrc, trdest := tr.Source, tr.Destination
if trsrc == _EMPTY_ {
trsrc = fwcs
}
if trdest == _EMPTY_ {
trdest = fwcs
}
sources = append(sources, trsrc)
destinations = append(destinations, trdest)
}
source = strings.Join(sources, "\f")
destination = strings.Join(destinations, "\f")
}
return strings.Join([]string{iName, filter, destination}, " ")
return strings.Join([]string{iName, source, destination}, " ")
}
// Sets the index name.
@@ -1210,43 +1247,63 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}
}
if len(cfg.Sources) > 0 {
// check for duplicates
var iNames = make(map[string]struct{})
for _, src := range cfg.Sources {
if !isValidName(src.Name) {
return StreamConfig{}, NewJSSourceInvalidStreamNameError()
}
if _, ok := iNames[src.composeIName()]; !ok {
iNames[src.composeIName()] = struct{}{}
} else {
return StreamConfig{}, NewJSSourceDuplicateDetectedError()
}
// check for duplicates
var iNames = make(map[string]struct{})
for _, src := range cfg.Sources {
if !isValidName(src.Name) {
return StreamConfig{}, NewJSSourceInvalidStreamNameError()
}
for _, src := range cfg.Sources {
// Do not perform checks if External is provided, as it could lead to
// checking against itself (if sourced stream name is the same on different JetStream)
if src.External == nil {
exists, maxMsgSize, subs := hasStream(src.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
if _, ok := iNames[src.composeIName()]; !ok {
iNames[src.composeIName()] = struct{}{}
} else {
return StreamConfig{}, NewJSSourceDuplicateDetectedError()
}
// Do not perform checks if External is provided, as it could lead to
// checking against itself (if sourced stream name is the same on different JetStream)
if src.External == nil {
exists, maxMsgSize, subs := hasStream(src.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError()
}
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError()
}
if (src.FilterSubject != _EMPTY_ || src.SubjectTransformDest != _EMPTY_) && len(src.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSSourceMultipleFiltersNotAllowedError()
}
for _, tr := range src.SubjectTransforms {
err := ValidateMappingDestination(tr.Destination)
if err != nil {
return StreamConfig{}, NewJSSourceInvalidTransformDestinationError()
}
}
// Check subject filters overlap.
for outer, tr := range src.SubjectTransforms {
if !IsValidSubject(tr.Source) {
return StreamConfig{}, NewJSSourceInvalidSubjectFilterError()
}
for inner, innertr := range src.SubjectTransforms {
if inner != outer && subjectIsSubsetMatch(tr.Source, innertr.Source) {
return StreamConfig{}, NewJSSourceOverlappingSubjectFiltersError()
}
}
continue
}
if src.External.DeliverPrefix != _EMPTY_ {
deliveryPrefixes = append(deliveryPrefixes, src.External.DeliverPrefix)
}
if src.External.ApiPrefix != _EMPTY_ {
apiPrefixes = append(apiPrefixes, src.External.ApiPrefix)
}
continue
}
if src.External.DeliverPrefix != _EMPTY_ {
deliveryPrefixes = append(deliveryPrefixes, src.External.DeliverPrefix)
}
if src.External.ApiPrefix != _EMPTY_ {
apiPrefixes = append(apiPrefixes, src.External.ApiPrefix)
}
}
// check prefix overlap with subjects
for _, pfx := range deliveryPrefixes {
if !IsValidPublishSubject(pfx) {
@@ -1614,15 +1671,27 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
mset.sources = make(map[string]*sourceInfo)
}
mset.cfg.Sources = append(mset.cfg.Sources, s)
si := &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject}
// Check for transform.
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err)
var si *sourceInfo
if len(s.SubjectTransforms) == 0 {
si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject}
// Check for transform.
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
}
}
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
for i := range s.SubjectTransforms {
// err can be ignored as already validated in config check
si.trs[i], _ = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination)
}
}
mset.sources[s.iname] = si
mset.setStartingSequenceForSource(s.iname, s.External)
mset.setSourceConsumer(s.iname, si.sseq+1, time.Time{})
@@ -1662,7 +1731,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream configuration for republish from '%s' to '%s' %w", cfg.RePublish.Source, cfg.RePublish.Destination, err)
return fmt.Errorf("stream configuration for republish from '%s' to '%s': %w", cfg.RePublish.Source, cfg.RePublish.Destination, err)
}
// Assign our transform for republishing.
mset.tr = tr
@@ -1675,7 +1744,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s': %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
} else if ocfg.SubjectTransform != nil && cfg.SubjectTransform != nil &&
@@ -1683,7 +1752,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s': %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
} else if ocfg.SubjectTransform != nil && cfg.SubjectTransform == nil {
@@ -1853,10 +1922,18 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
return nil
}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf}
var trConfigs []SubjectTransformConfig
for _, tr := range si.trs {
if tr != nil {
trConfigs = append(trConfigs, SubjectTransformConfig{Source: tr.src, Destination: tr.dest})
}
}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf, SubjectTransforms: trConfigs}
if si.tr != nil {
ssi.SubjectTransformDest = si.tr.dest
}
// If we have not heard from the source, set Active to -1.
if si.last.IsZero() {
ssi.Active = -1
@@ -2582,6 +2659,12 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
req.Config.FilterSubject = ssi.FilterSubject
}
var filterSubjects []string
for _, tr := range ssi.SubjectTransforms {
filterSubjects = append(filterSubjects, tr.Source)
}
req.Config.FilterSubjects = filterSubjects
respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
crSub, err := mset.subscribeInternal(reply, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
@@ -2879,6 +2962,18 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
// Do the subject transform for the source if there's one
if si.tr != nil {
m.subj = si.tr.TransformSubject(m.subj)
} else {
for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
}
var err error
@@ -3032,12 +3127,24 @@ func (mset *stream) startingSequenceForSources() {
if ssi.iname == _EMPTY_ {
ssi.setIndexName()
}
si := &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Check for transform.
if ssi.SubjectTransformDest != _EMPTY_ {
si.tr, _ = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) // can not return an error because validated in AddStream
}
var si *sourceInfo
if len(ssi.SubjectTransforms) == 0 {
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Check for transform.
if ssi.SubjectTransformDest != _EMPTY_ {
// no need to check the error as already validated that it will not before
si.tr, _ = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
}
} else {
var trs []*subjectTransform
for _, str := range ssi.SubjectTransforms {
tr, _ := NewSubjectTransform(str.Source, str.Destination)
trs = append(trs, tr)
}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, trs: trs}
}
mset.sources[ssi.iname] = si
}

View File

@@ -73,6 +73,11 @@ func NewSubjectTransformWithStrict(src, dest string, strict bool) (*subjectTrans
// strict = true for import subject mappings that need to be reversible
// (meaning can only use the Wildcard function and must use all the pwcs that are present in the source)
// No source given is equivalent to the source being ">"
if dest == _EMPTY_ {
return nil, nil
}
if src == _EMPTY_ {
src = fwcs
}
@@ -383,6 +388,8 @@ func (tr *subjectTransform) Match(subject string) (string, error) {
}
tts := tokenizeSubject(subject)
// TODO(jnm): optimization -> not sure this is actually needed but was there in initial code
if !isValidLiteralSubject(tts) {
return _EMPTY_, ErrBadSubject
}

View File

@@ -127,7 +127,6 @@ func TestSubjectTransforms(t *testing.T) {
}
// Must be valid subjects.
shouldErr("foo", "", false)
shouldErr("foo..", "bar", false)
// Wildcards are allowed in src, but must be matched by token placements on the other side.
@@ -175,16 +174,19 @@ func TestSubjectTransforms(t *testing.T) {
shouldMatch := func(src, dest, sample, expected string) {
t.Helper()
tr := shouldBeOK(src, dest, false)
s, err := tr.Match(sample)
if err != nil {
t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected)
}
if s != expected {
t.Fatalf("Dest does not match what was expected. Got %q, expected %q", s, expected)
if tr != nil {
s, err := tr.Match(sample)
if err != nil {
t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected)
}
if s != expected {
t.Fatalf("Dest does not match what was expected. Got %q, expected %q", s, expected)
}
}
}
shouldMatch("", "prefix.>", "foo", "prefix.foo")
shouldMatch("foo", "", "foo", "foo")
shouldMatch("foo", "bar", "foo", "bar")
shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A")
shouldMatch("foo.*.bar.*.baz", "req.{{wildcard(2)}}.{{wildcard(1)}}", "foo.A.bar.B.baz", "req.B.A")

View File

@@ -1142,6 +1142,9 @@ func isValidLiteralSubject(tokens []string) bool {
// ValidateMappingDestination returns nil error if the subject is a valid subject mapping destination subject
func ValidateMappingDestination(subject string) error {
if subject == _EMPTY_ {
return nil
}
subjectTokens := strings.Split(subject, tsep)
sfwc := false
for _, t := range subjectTokens {