diff --git a/server/accounts.go b/server/accounts.go index 94074edc..e1322041 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1903,7 +1903,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im } else { to, _ = transformUntokenize(to) // Create a transform. Do so in reverse such that $ symbols only exist in to - if tr, err = NewSubjectTransform(to, transformTokenize(from)); err != nil { + if tr, err = NewSubjectTransformStrict(to, transformTokenize(from)); err != nil { a.mu.Unlock() return nil, fmt.Errorf("failed to create mapping transform for service import subject from %q to %q: %v", from, to, err) @@ -2256,7 +2256,7 @@ func (si *serviceImport) isRespServiceImport() bool { return si != nil && si.response } -// Sets the response theshold timer for a service export. +// Sets the response threshold timer for a service export. // Account lock should be held func (se *serviceExport) setResponseThresholdTimer() { if se.rtmr != nil { @@ -2459,7 +2459,7 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri usePub = true } else { // Create a transform - if tr, err = NewSubjectTransform(from, transformTokenize(to)); err != nil { + if tr, err = NewSubjectTransformStrict(from, transformTokenize(to)); err != nil { return fmt.Errorf("failed to create mapping transform for stream import subject from %q to %q: %v", from, to, err) } diff --git a/server/errors.go b/server/errors.go index 0dd73b89..275c0e36 100644 --- a/server/errors.go +++ b/server/errors.go @@ -219,6 +219,9 @@ var ( // ErrorMappingDestinationFunctionTooManyArguments is returned when the mapping destination function is passed too many arguments ErrorMappingDestinationFunctionTooManyArguments = fmt.Errorf("%w: too many arguments passed to the function", ErrInvalidMappingDestination) + + // ErrorMappingDestinationFunctionNotSupportedForImport is returned when you try to use a mapping function other than wildcard in a transform that needs to be reversible (i.e. an import) + ErrorMappingDestinationFunctionNotSupportedForImport = fmt.Errorf("%w: the only mapping function allowed for import transforms is {{Wildcard()}}", ErrInvalidMappingDestination) ) // mappingDestinationErr is a type of subject mapping destination error diff --git a/server/subject_transform.go b/server/subject_transform.go index 03c093ce..4ac93144 100644 --- a/server/subject_transform.go +++ b/server/subject_transform.go @@ -69,7 +69,9 @@ type SubjectTransformer interface { TransformTokenizedSubject(tokens []string) string } -func NewSubjectTransform(src, dest string) (*subjectTransform, error) { +func NewSubjectTransformWithStrict(src, dest string, strict bool) (*subjectTransform, error) { + // strict = true for import subject mappings that need to be reversible + // (meaning can only use the Wildcard function and must use all the pwcs that are present in the source) // No source given is equivalent to the source being ">" if src == _EMPTY_ { src = fwcs @@ -106,6 +108,12 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) { return nil, err } + if strict { + if tranformType != NoTransform && tranformType != Wildcard { + return nil, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotSupportedForImport} + } + } + if tranformType == NoTransform { dtokMappingFunctionTypes = append(dtokMappingFunctionTypes, NoTransform) dtokMappingFunctionTokenIndexes = append(dtokMappingFunctionTokenIndexes, []int{-1}) @@ -128,7 +136,7 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) { } } - if nphs < npwcs { + if strict && nphs < npwcs { // not all wildcards are being used in the destination return nil, &mappingDestinationErr{dest, ErrMappingDestinationNotUsingAllWildcards} } @@ -146,6 +154,14 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) { }, nil } +func NewSubjectTransform(src, dest string) (*subjectTransform, error) { + return NewSubjectTransformWithStrict(src, dest, false) +} + +func NewSubjectTransformStrict(src, dest string) (*subjectTransform, error) { + return NewSubjectTransformWithStrict(src, dest, true) +} + func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) []string { commandStrings := functionRegEx.FindStringSubmatch(token) if len(commandStrings) > 1 { @@ -304,14 +320,15 @@ func transformTokenize(subject string) string { // Helper function to go from transform destination to a subject with partial wildcards and ordered list of placeholders // E.g.: // -// "bar" -> "bar", [] -// "foo.$2.$1" -> "foo.*.*", ["$2","$1"] +// "bar" -> "bar", [] +// "foo.$2.$1" -> "foo.*.*", ["$2","$1"] +// "foo.{{wildcard(2)}}.{{wildcard(1)}}" -> "foo.*.*", ["{{wildcard(2)}}","{{wildcard(1)}}"] func transformUntokenize(subject string) (string, []string) { var phs []string var nda []string for _, token := range strings.Split(subject, tsep) { - if len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9' { + if args := getMappingFunctionArgs(wildcardMappingFunctionRegEx, token); (len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9') || (len(args) == 1 && args[0] != _EMPTY_) { phs = append(phs, token) nda = append(nda, pwcs) } else { @@ -496,7 +513,7 @@ func (tr *subjectTransform) TransformTokenizedSubject(tokens []string) string { // Reverse a subjectTransform. func (tr *subjectTransform) reverse() *subjectTransform { if len(tr.dtokmftokindexesargs) == 0 { - rtr, _ := NewSubjectTransform(tr.dest, tr.src) + rtr, _ := NewSubjectTransformStrict(tr.dest, tr.src) return rtr } // If we are here we need to dynamically get the correct reverse @@ -516,6 +533,6 @@ func (tr *subjectTransform) reverse() *subjectTransform { } } ndest := strings.Join(nda, tsep) - rtr, _ := NewSubjectTransform(nsrc, ndest) + rtr, _ := NewSubjectTransformStrict(nsrc, ndest) return rtr } diff --git a/server/subject_transform_test.go b/server/subject_transform_test.go index d2ce5523..48ebcf55 100644 --- a/server/subject_transform_test.go +++ b/server/subject_transform_test.go @@ -73,58 +73,107 @@ func TestPlaceHolderIndex(t *testing.T) { } } +func TestSubjectTransformHelpers(t *testing.T) { + equals := func(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i, v := range a { + if v != b[i] { + return false + } + } + return true + } + + filter, placeHolders := transformUntokenize("bar") + if filter != "bar" || len(placeHolders) != 0 { + t.Fatalf("transformUntokenize for not returning expected result") + } + + filter, placeHolders = transformUntokenize("foo.$2.$1") + if filter != "foo.*.*" || !equals(placeHolders, []string{"$2", "$1"}) { + t.Fatalf("transformUntokenize for not returning expected result") + } + + filter, placeHolders = transformUntokenize("foo.{{wildcard(2)}}.{{wildcard(1)}}") + if filter != "foo.*.*" || !equals(placeHolders, []string{"{{wildcard(2)}}", "{{wildcard(1)}}"}) { + t.Fatalf("transformUntokenize for not returning expected result") + } + + newReversibleTransform := func(src, dest string) *subjectTransform { + tr, err := NewSubjectTransformStrict(src, dest) + if err != nil { + t.Fatalf("Error getting reversible transform: %s to %s", src, dest) + } + return tr + } + + tr := newReversibleTransform("foo.*.*", "bar.$2.{{Wildcard(1)}}") + subject := "foo.b.a" + transformed := tr.TransformSubject(subject) + reverse := tr.reverse() + if reverse.TransformSubject(transformed) != subject { + t.Fatal("Reversed transform subject not matching") + } +} + func TestSubjectTransforms(t *testing.T) { - shouldErr := func(src, dest string) { + shouldErr := func(src, dest string, strict bool) { t.Helper() - if _, err := NewSubjectTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) { + if _, err := NewSubjectTransformWithStrict(src, dest, strict); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) { t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest) } } - shouldErr("foo.*.*", "bar.$2") // Must place all pwcs. - // Must be valid subjects. - shouldErr("foo", "") - shouldErr("foo..", "bar") + shouldErr("foo", "", false) + shouldErr("foo..", "bar", false) // Wildcards are allowed in src, but must be matched by token placements on the other side. // e.g. foo.* -> bar.$1. - // Need to have as many pwcs as placements on other side. - shouldErr("foo.*", "bar.*") - shouldErr("foo.*", "bar.$2") // Bad pwc token identifier - shouldErr("foo.*", "bar.$1.>") // fwcs have to match. - shouldErr("foo.>", "bar.baz") // fwcs have to match. - shouldErr("foo.*.*", "bar.$2") // Must place all pwcs. - shouldErr("foo.*", "foo.$foo") // invalid $ value - shouldErr("foo.*", "foo.{{wildcard(2)}}") // Mapping function being passed an out of range wildcard index - shouldErr("foo.*", "foo.{{unimplemented(1)}}") // Mapping trying to use an unknown mapping function - shouldErr("foo.*", "foo.{{partition(10)}}") // Not enough arguments passed to the mapping function - shouldErr("foo.*", "foo.{{wildcard(foo)}}") // Invalid argument passed to the mapping function - shouldErr("foo.*", "foo.{{wildcard()}}") // Not enough arguments passed to the mapping function - shouldErr("foo.*", "foo.{{wildcard(1,2)}}") // Too many arguments passed to the mapping function - shouldErr("foo.*", "foo.{{ wildcard5) }}") // Bad mapping function - shouldErr("foo.*", "foo.{{splitLeft(2,2}}") // arg out of range + // Need to have as many pwcs as placements on other side - shouldBeOK := func(src, dest string) *subjectTransform { + shouldErr("foo.*", "bar.*", false) + shouldErr("foo.*", "bar.$2", false) // Bad pwc token identifier + shouldErr("foo.*", "bar.$1.>", false) // fwcs have to match. + shouldErr("foo.>", "bar.baz", false) // fwcs have to match. + shouldErr("foo.*.*", "bar.$2", true) // Must place all pwcs. + shouldErr("foo.*", "foo.$foo", true) // invalid $ value + shouldErr("foo.*", "bar.{{Partition(2,1)}}", true) // can only use Wildcard function (and old-style $x) in import transform + shouldErr("foo.*", "foo.{{wildcard(2)}}", false) // Mapping function being passed an out of range wildcard index + shouldErr("foo.*", "foo.{{unimplemented(1)}}", false) // Mapping trying to use an unknown mapping function + shouldErr("foo.*", "foo.{{partition(10)}}", false) // Not enough arguments passed to the mapping function + shouldErr("foo.*", "foo.{{wildcard(foo)}}", false) // Invalid argument passed to the mapping function + shouldErr("foo.*", "foo.{{wildcard()}}", false) // Not enough arguments passed to the mapping function + shouldErr("foo.*", "foo.{{wildcard(1,2)}}", false) // Too many arguments passed to the mapping function + shouldErr("foo.*", "foo.{{ wildcard5) }}", false) // Bad mapping function + shouldErr("foo.*", "foo.{{splitLeft(2,2}}", false) // arg out of range + + shouldBeOK := func(src, dest string, strict bool) *subjectTransform { t.Helper() - tr, err := NewSubjectTransform(src, dest) + tr, err := NewSubjectTransformWithStrict(src, dest, strict) if err != nil { t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest) } return tr } - shouldBeOK("foo", "bar") - shouldBeOK("foo.*.bar.*.baz", "req.$2.$1") - shouldBeOK("baz.>", "mybaz.>") - shouldBeOK("*", "{{splitfromleft(1,1)}}") - shouldBeOK("", "prefix.>") - shouldBeOK("*.*", "{{partition(10,1,2)}}") - shouldBeOK("foo.*.*", "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}") + shouldBeOK("foo.*", "bar.{{Wildcard(1)}}", true) + + shouldBeOK("foo.*.*", "bar.$2", false) // don't have to use all pwcs. + shouldBeOK("foo.*.*", "bar.{{wildcard(1)}}", false) // don't have to use all pwcs. + shouldBeOK("foo", "bar", false) + shouldBeOK("foo.*.bar.*.baz", "req.$2.$1", false) + shouldBeOK("baz.>", "mybaz.>", false) + shouldBeOK("*", "{{splitfromleft(1,1)}}", false) + shouldBeOK("", "prefix.>", false) + shouldBeOK("*.*", "{{partition(10,1,2)}}", false) + shouldBeOK("foo.*.*", "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}", false) shouldMatch := func(src, dest, sample, expected string) { t.Helper() - tr := shouldBeOK(src, dest) + tr := shouldBeOK(src, dest, false) s, err := tr.Match(sample) if err != nil { t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected) @@ -137,6 +186,7 @@ func TestSubjectTransforms(t *testing.T) { shouldMatch("", "prefix.>", "foo", "prefix.foo") shouldMatch("foo", "bar", "foo", "bar") shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A") + shouldMatch("foo.*.bar.*.baz", "req.{{wildcard(2)}}.{{wildcard(1)}}", "foo.A.bar.B.baz", "req.B.A") shouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3") shouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3") shouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo") diff --git a/test/accounts_cycles_test.go b/test/accounts_cycles_test.go index 00ff0059..7dab78c7 100644 --- a/test/accounts_cycles_test.go +++ b/test/accounts_cycles_test.go @@ -415,7 +415,7 @@ func TestAccountSubjectMapping(t *testing.T) { } } -// test token and partition subject mapping within an account +// test token subject mapping within an account // Alice imports from Bob with subject mapping func TestAccountImportSubjectMapping(t *testing.T) { conf := createConfFile(t, []byte(` @@ -423,7 +423,7 @@ func TestAccountImportSubjectMapping(t *testing.T) { accounts { A { users: [{user: a, pass: x}] - imports [ {stream: {account: B, subject: "foo.*.*"}, to : "foo.$1.{{wildcard(2)}}.{{partition(10,1,2)}}"}] + imports [ {stream: {account: B, subject: "foo.*.*"}, to : "foo.$1.{{wildcard(2)}}"}] } B { users: [{user: b, pass x}] @@ -442,30 +442,26 @@ func TestAccountImportSubjectMapping(t *testing.T) { subjectsReceived := make(chan string) msg := []byte("HELLO") - sub1, err := ncA.Subscribe("foo.*.*.*", func(m *nats.Msg) { + sub1, err := ncA.Subscribe("foo.*.*", func(m *nats.Msg) { subjectsReceived <- m.Subject }) if err != nil { t.Fatalf("Unexpected error: %v", err) } - sub1.AutoUnsubscribe(numMessages * 2) + sub1.AutoUnsubscribe(numMessages) ncB := clientConnectToServerWithUP(t, opts, "b", "x") defer ncB.Close() - // publish numMessages with an increasing id (should map to partition numbers with the range of 10 partitions) - twice - for j := 0; j < 2; j++ { - for i := 0; i < numMessages; i++ { - err = ncB.Publish(fmt.Sprintf("foo.%d.%d", i, numMessages-i), msg) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + // publish numMessages with an increasing id + + for i := 0; i < numMessages; i++ { + err = ncB.Publish(fmt.Sprintf("foo.%d.%d", i, numMessages-i), msg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) } } - // verify all the partition numbers are in the expected range - partitionsReceived := make([]int, numMessages) - for i := 0; i < numMessages; i++ { subject := <-subjectsReceived sTokens := strings.Split(subject, ".") @@ -474,25 +470,9 @@ func TestAccountImportSubjectMapping(t *testing.T) { } t1, _ := strconv.Atoi(sTokens[1]) t2, _ := strconv.Atoi(sTokens[2]) - partitionsReceived[i], err = strconv.Atoi(sTokens[3]) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if partitionsReceived[i] > 9 || partitionsReceived[i] < 0 || t1 != i || t2 != numMessages-i { - t.Fatalf("Error received unexpected %d.%d to partition %d", t1, t2, partitionsReceived[i]) - } - } - - // verify hashing is deterministic by checking it produces the same exact result twice - for i := 0; i < numMessages; i++ { - subject := <-subjectsReceived - partitionNumber, err := strconv.Atoi(strings.Split(subject, ".")[3]) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if partitionsReceived[i] != partitionNumber { - t.Fatalf("Error: same id mapped to two different partitions") + if t1 != i || t2 != numMessages-i { + t.Fatalf("Error received unexpected %d.%d", t1, t2) } } }