From e07ccf9cc16c0e221780be57ed9e501edfead78c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Sun, 14 May 2023 14:02:19 -0700 Subject: [PATCH] [ADDED] Ability to drop partial wildcard tokens in some subject transforms (#4152) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [X] Tests added - [X] Branch rebased on top of current main (`git pull --rebase origin main`) - [X] Changes squashed to a single commit (described [here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html)) - [X] Build is green in Travis CI - [X] You have certified that the contribution is your original work and that you license the work to the project under the [Apache 2 license](https://github.com/nats-io/nats-server/blob/main/LICENSE) ### Changes proposed in this pull request: There is currently a blanket requirement that subject transforms destinations MUST use ALL of the partial wildcards defined in the source of the transform. This is because the subject transformed defined for imports must be 'reversible' and therefore the destination transform must use all of the partial wildcard tokens defined in the source of the transform. This reversing of a transform is only used for transforms used by imports, where in any case it doesn't make sense to use any transform other than Wildcard. This PR: - relaxes this requirement to only apply when the transform is used by an import, adding the ability to drop a wildcard token in transforms other than as part of an import. - Improves transform reverse to support both legacy style wildcards $X and the new transform function {{Wildcard(X)}}- Improves reversible transform checking to only allow the use of wildcards in the destination. --------- Signed-off-by: Jean-Noël Moyne --- server/accounts.go | 6 +- server/errors.go | 3 + server/subject_transform.go | 31 +++++++-- server/subject_transform_test.go | 112 ++++++++++++++++++++++--------- test/accounts_cycles_test.go | 44 ++++-------- 5 files changed, 123 insertions(+), 73 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 94074edc..e1322041 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1903,7 +1903,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im } else { to, _ = transformUntokenize(to) // Create a transform. Do so in reverse such that $ symbols only exist in to - if tr, err = NewSubjectTransform(to, transformTokenize(from)); err != nil { + if tr, err = NewSubjectTransformStrict(to, transformTokenize(from)); err != nil { a.mu.Unlock() return nil, fmt.Errorf("failed to create mapping transform for service import subject from %q to %q: %v", from, to, err) @@ -2256,7 +2256,7 @@ func (si *serviceImport) isRespServiceImport() bool { return si != nil && si.response } -// Sets the response theshold timer for a service export. +// Sets the response threshold timer for a service export. // Account lock should be held func (se *serviceExport) setResponseThresholdTimer() { if se.rtmr != nil { @@ -2459,7 +2459,7 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri usePub = true } else { // Create a transform - if tr, err = NewSubjectTransform(from, transformTokenize(to)); err != nil { + if tr, err = NewSubjectTransformStrict(from, transformTokenize(to)); err != nil { return fmt.Errorf("failed to create mapping transform for stream import subject from %q to %q: %v", from, to, err) } diff --git a/server/errors.go b/server/errors.go index 0dd73b89..275c0e36 100644 --- a/server/errors.go +++ b/server/errors.go @@ -219,6 +219,9 @@ var ( // ErrorMappingDestinationFunctionTooManyArguments is returned when the mapping destination function is passed too many arguments ErrorMappingDestinationFunctionTooManyArguments = fmt.Errorf("%w: too many arguments passed to the function", ErrInvalidMappingDestination) + + // ErrorMappingDestinationFunctionNotSupportedForImport is returned when you try to use a mapping function other than wildcard in a transform that needs to be reversible (i.e. an import) + ErrorMappingDestinationFunctionNotSupportedForImport = fmt.Errorf("%w: the only mapping function allowed for import transforms is {{Wildcard()}}", ErrInvalidMappingDestination) ) // mappingDestinationErr is a type of subject mapping destination error diff --git a/server/subject_transform.go b/server/subject_transform.go index 03c093ce..4ac93144 100644 --- a/server/subject_transform.go +++ b/server/subject_transform.go @@ -69,7 +69,9 @@ type SubjectTransformer interface { TransformTokenizedSubject(tokens []string) string } -func NewSubjectTransform(src, dest string) (*subjectTransform, error) { +func NewSubjectTransformWithStrict(src, dest string, strict bool) (*subjectTransform, error) { + // strict = true for import subject mappings that need to be reversible + // (meaning can only use the Wildcard function and must use all the pwcs that are present in the source) // No source given is equivalent to the source being ">" if src == _EMPTY_ { src = fwcs @@ -106,6 +108,12 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) { return nil, err } + if strict { + if tranformType != NoTransform && tranformType != Wildcard { + return nil, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotSupportedForImport} + } + } + if tranformType == NoTransform { dtokMappingFunctionTypes = append(dtokMappingFunctionTypes, NoTransform) dtokMappingFunctionTokenIndexes = append(dtokMappingFunctionTokenIndexes, []int{-1}) @@ -128,7 +136,7 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) { } } - if nphs < npwcs { + if strict && nphs < npwcs { // not all wildcards are being used in the destination return nil, &mappingDestinationErr{dest, ErrMappingDestinationNotUsingAllWildcards} } @@ -146,6 +154,14 @@ func NewSubjectTransform(src, dest string) (*subjectTransform, error) { }, nil } +func NewSubjectTransform(src, dest string) (*subjectTransform, error) { + return NewSubjectTransformWithStrict(src, dest, false) +} + +func NewSubjectTransformStrict(src, dest string) (*subjectTransform, error) { + return NewSubjectTransformWithStrict(src, dest, true) +} + func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) []string { commandStrings := functionRegEx.FindStringSubmatch(token) if len(commandStrings) > 1 { @@ -304,14 +320,15 @@ func transformTokenize(subject string) string { // Helper function to go from transform destination to a subject with partial wildcards and ordered list of placeholders // E.g.: // -// "bar" -> "bar", [] -// "foo.$2.$1" -> "foo.*.*", ["$2","$1"] +// "bar" -> "bar", [] +// "foo.$2.$1" -> "foo.*.*", ["$2","$1"] +// "foo.{{wildcard(2)}}.{{wildcard(1)}}" -> "foo.*.*", ["{{wildcard(2)}}","{{wildcard(1)}}"] func transformUntokenize(subject string) (string, []string) { var phs []string var nda []string for _, token := range strings.Split(subject, tsep) { - if len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9' { + if args := getMappingFunctionArgs(wildcardMappingFunctionRegEx, token); (len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9') || (len(args) == 1 && args[0] != _EMPTY_) { phs = append(phs, token) nda = append(nda, pwcs) } else { @@ -496,7 +513,7 @@ func (tr *subjectTransform) TransformTokenizedSubject(tokens []string) string { // Reverse a subjectTransform. func (tr *subjectTransform) reverse() *subjectTransform { if len(tr.dtokmftokindexesargs) == 0 { - rtr, _ := NewSubjectTransform(tr.dest, tr.src) + rtr, _ := NewSubjectTransformStrict(tr.dest, tr.src) return rtr } // If we are here we need to dynamically get the correct reverse @@ -516,6 +533,6 @@ func (tr *subjectTransform) reverse() *subjectTransform { } } ndest := strings.Join(nda, tsep) - rtr, _ := NewSubjectTransform(nsrc, ndest) + rtr, _ := NewSubjectTransformStrict(nsrc, ndest) return rtr } diff --git a/server/subject_transform_test.go b/server/subject_transform_test.go index d2ce5523..48ebcf55 100644 --- a/server/subject_transform_test.go +++ b/server/subject_transform_test.go @@ -73,58 +73,107 @@ func TestPlaceHolderIndex(t *testing.T) { } } +func TestSubjectTransformHelpers(t *testing.T) { + equals := func(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i, v := range a { + if v != b[i] { + return false + } + } + return true + } + + filter, placeHolders := transformUntokenize("bar") + if filter != "bar" || len(placeHolders) != 0 { + t.Fatalf("transformUntokenize for not returning expected result") + } + + filter, placeHolders = transformUntokenize("foo.$2.$1") + if filter != "foo.*.*" || !equals(placeHolders, []string{"$2", "$1"}) { + t.Fatalf("transformUntokenize for not returning expected result") + } + + filter, placeHolders = transformUntokenize("foo.{{wildcard(2)}}.{{wildcard(1)}}") + if filter != "foo.*.*" || !equals(placeHolders, []string{"{{wildcard(2)}}", "{{wildcard(1)}}"}) { + t.Fatalf("transformUntokenize for not returning expected result") + } + + newReversibleTransform := func(src, dest string) *subjectTransform { + tr, err := NewSubjectTransformStrict(src, dest) + if err != nil { + t.Fatalf("Error getting reversible transform: %s to %s", src, dest) + } + return tr + } + + tr := newReversibleTransform("foo.*.*", "bar.$2.{{Wildcard(1)}}") + subject := "foo.b.a" + transformed := tr.TransformSubject(subject) + reverse := tr.reverse() + if reverse.TransformSubject(transformed) != subject { + t.Fatal("Reversed transform subject not matching") + } +} + func TestSubjectTransforms(t *testing.T) { - shouldErr := func(src, dest string) { + shouldErr := func(src, dest string, strict bool) { t.Helper() - if _, err := NewSubjectTransform(src, dest); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) { + if _, err := NewSubjectTransformWithStrict(src, dest, strict); err != ErrBadSubject && !errors.Is(err, ErrInvalidMappingDestination) { t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest) } } - shouldErr("foo.*.*", "bar.$2") // Must place all pwcs. - // Must be valid subjects. - shouldErr("foo", "") - shouldErr("foo..", "bar") + shouldErr("foo", "", false) + shouldErr("foo..", "bar", false) // Wildcards are allowed in src, but must be matched by token placements on the other side. // e.g. foo.* -> bar.$1. - // Need to have as many pwcs as placements on other side. - shouldErr("foo.*", "bar.*") - shouldErr("foo.*", "bar.$2") // Bad pwc token identifier - shouldErr("foo.*", "bar.$1.>") // fwcs have to match. - shouldErr("foo.>", "bar.baz") // fwcs have to match. - shouldErr("foo.*.*", "bar.$2") // Must place all pwcs. - shouldErr("foo.*", "foo.$foo") // invalid $ value - shouldErr("foo.*", "foo.{{wildcard(2)}}") // Mapping function being passed an out of range wildcard index - shouldErr("foo.*", "foo.{{unimplemented(1)}}") // Mapping trying to use an unknown mapping function - shouldErr("foo.*", "foo.{{partition(10)}}") // Not enough arguments passed to the mapping function - shouldErr("foo.*", "foo.{{wildcard(foo)}}") // Invalid argument passed to the mapping function - shouldErr("foo.*", "foo.{{wildcard()}}") // Not enough arguments passed to the mapping function - shouldErr("foo.*", "foo.{{wildcard(1,2)}}") // Too many arguments passed to the mapping function - shouldErr("foo.*", "foo.{{ wildcard5) }}") // Bad mapping function - shouldErr("foo.*", "foo.{{splitLeft(2,2}}") // arg out of range + // Need to have as many pwcs as placements on other side - shouldBeOK := func(src, dest string) *subjectTransform { + shouldErr("foo.*", "bar.*", false) + shouldErr("foo.*", "bar.$2", false) // Bad pwc token identifier + shouldErr("foo.*", "bar.$1.>", false) // fwcs have to match. + shouldErr("foo.>", "bar.baz", false) // fwcs have to match. + shouldErr("foo.*.*", "bar.$2", true) // Must place all pwcs. + shouldErr("foo.*", "foo.$foo", true) // invalid $ value + shouldErr("foo.*", "bar.{{Partition(2,1)}}", true) // can only use Wildcard function (and old-style $x) in import transform + shouldErr("foo.*", "foo.{{wildcard(2)}}", false) // Mapping function being passed an out of range wildcard index + shouldErr("foo.*", "foo.{{unimplemented(1)}}", false) // Mapping trying to use an unknown mapping function + shouldErr("foo.*", "foo.{{partition(10)}}", false) // Not enough arguments passed to the mapping function + shouldErr("foo.*", "foo.{{wildcard(foo)}}", false) // Invalid argument passed to the mapping function + shouldErr("foo.*", "foo.{{wildcard()}}", false) // Not enough arguments passed to the mapping function + shouldErr("foo.*", "foo.{{wildcard(1,2)}}", false) // Too many arguments passed to the mapping function + shouldErr("foo.*", "foo.{{ wildcard5) }}", false) // Bad mapping function + shouldErr("foo.*", "foo.{{splitLeft(2,2}}", false) // arg out of range + + shouldBeOK := func(src, dest string, strict bool) *subjectTransform { t.Helper() - tr, err := NewSubjectTransform(src, dest) + tr, err := NewSubjectTransformWithStrict(src, dest, strict) if err != nil { t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest) } return tr } - shouldBeOK("foo", "bar") - shouldBeOK("foo.*.bar.*.baz", "req.$2.$1") - shouldBeOK("baz.>", "mybaz.>") - shouldBeOK("*", "{{splitfromleft(1,1)}}") - shouldBeOK("", "prefix.>") - shouldBeOK("*.*", "{{partition(10,1,2)}}") - shouldBeOK("foo.*.*", "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}") + shouldBeOK("foo.*", "bar.{{Wildcard(1)}}", true) + + shouldBeOK("foo.*.*", "bar.$2", false) // don't have to use all pwcs. + shouldBeOK("foo.*.*", "bar.{{wildcard(1)}}", false) // don't have to use all pwcs. + shouldBeOK("foo", "bar", false) + shouldBeOK("foo.*.bar.*.baz", "req.$2.$1", false) + shouldBeOK("baz.>", "mybaz.>", false) + shouldBeOK("*", "{{splitfromleft(1,1)}}", false) + shouldBeOK("", "prefix.>", false) + shouldBeOK("*.*", "{{partition(10,1,2)}}", false) + shouldBeOK("foo.*.*", "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}", false) shouldMatch := func(src, dest, sample, expected string) { t.Helper() - tr := shouldBeOK(src, dest) + tr := shouldBeOK(src, dest, false) s, err := tr.Match(sample) if err != nil { t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected) @@ -137,6 +186,7 @@ func TestSubjectTransforms(t *testing.T) { shouldMatch("", "prefix.>", "foo", "prefix.foo") shouldMatch("foo", "bar", "foo", "bar") shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A") + shouldMatch("foo.*.bar.*.baz", "req.{{wildcard(2)}}.{{wildcard(1)}}", "foo.A.bar.B.baz", "req.B.A") shouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3") shouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3") shouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo") diff --git a/test/accounts_cycles_test.go b/test/accounts_cycles_test.go index 00ff0059..7dab78c7 100644 --- a/test/accounts_cycles_test.go +++ b/test/accounts_cycles_test.go @@ -415,7 +415,7 @@ func TestAccountSubjectMapping(t *testing.T) { } } -// test token and partition subject mapping within an account +// test token subject mapping within an account // Alice imports from Bob with subject mapping func TestAccountImportSubjectMapping(t *testing.T) { conf := createConfFile(t, []byte(` @@ -423,7 +423,7 @@ func TestAccountImportSubjectMapping(t *testing.T) { accounts { A { users: [{user: a, pass: x}] - imports [ {stream: {account: B, subject: "foo.*.*"}, to : "foo.$1.{{wildcard(2)}}.{{partition(10,1,2)}}"}] + imports [ {stream: {account: B, subject: "foo.*.*"}, to : "foo.$1.{{wildcard(2)}}"}] } B { users: [{user: b, pass x}] @@ -442,30 +442,26 @@ func TestAccountImportSubjectMapping(t *testing.T) { subjectsReceived := make(chan string) msg := []byte("HELLO") - sub1, err := ncA.Subscribe("foo.*.*.*", func(m *nats.Msg) { + sub1, err := ncA.Subscribe("foo.*.*", func(m *nats.Msg) { subjectsReceived <- m.Subject }) if err != nil { t.Fatalf("Unexpected error: %v", err) } - sub1.AutoUnsubscribe(numMessages * 2) + sub1.AutoUnsubscribe(numMessages) ncB := clientConnectToServerWithUP(t, opts, "b", "x") defer ncB.Close() - // publish numMessages with an increasing id (should map to partition numbers with the range of 10 partitions) - twice - for j := 0; j < 2; j++ { - for i := 0; i < numMessages; i++ { - err = ncB.Publish(fmt.Sprintf("foo.%d.%d", i, numMessages-i), msg) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + // publish numMessages with an increasing id + + for i := 0; i < numMessages; i++ { + err = ncB.Publish(fmt.Sprintf("foo.%d.%d", i, numMessages-i), msg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) } } - // verify all the partition numbers are in the expected range - partitionsReceived := make([]int, numMessages) - for i := 0; i < numMessages; i++ { subject := <-subjectsReceived sTokens := strings.Split(subject, ".") @@ -474,25 +470,9 @@ func TestAccountImportSubjectMapping(t *testing.T) { } t1, _ := strconv.Atoi(sTokens[1]) t2, _ := strconv.Atoi(sTokens[2]) - partitionsReceived[i], err = strconv.Atoi(sTokens[3]) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if partitionsReceived[i] > 9 || partitionsReceived[i] < 0 || t1 != i || t2 != numMessages-i { - t.Fatalf("Error received unexpected %d.%d to partition %d", t1, t2, partitionsReceived[i]) - } - } - - // verify hashing is deterministic by checking it produces the same exact result twice - for i := 0; i < numMessages; i++ { - subject := <-subjectsReceived - partitionNumber, err := strconv.Atoi(strings.Split(subject, ".")[3]) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if partitionsReceived[i] != partitionNumber { - t.Fatalf("Error: same id mapped to two different partitions") + if t1 != i || t2 != numMessages-i { + t.Fatalf("Error received unexpected %d.%d", t1, t2) } } }