From ddd665b036c5d86cd5b92b3bc72d4b9a821773a2 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Fri, 16 Jul 2021 17:52:46 -0400 Subject: [PATCH 1/4] [fixed] subscription on wildcard import that is not a subset fixes #2361 The subject used was not a subset of the import. Nor the other way around. Instead it is an overlap that needs to be dynamically computed. Signed-off-by: Matthias Hanel --- server/accounts_test.go | 83 ++++++++++++++++++++++++++++++ server/client.go | 110 +++++++++++++++++++++++++++++----------- server/sublist.go | 47 +++++++++-------- 3 files changed, 187 insertions(+), 53 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index 64db8b0c..9e010122 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -3294,3 +3294,86 @@ func TestAccountSystemPermsWithGlobalAccess(t *testing.T) { } defer sc.Close() } + +const importSubscriptionOverlapTemplate = ` +listen: 127.0.0.1:-1 +accounts: { + ACCOUNT_X: { + users: [ + {user: publisher} + ] + exports: [ + {stream: %s} + ] + }, + ACCOUNT_Y: { + users: [ + {user: subscriber} + ] + imports: [ + {stream: {account: ACCOUNT_X, subject: %s }, %s} + ] + } +}` + +func TestImportSubscriptionPartialOverlapWithPrefix(t *testing.T) { + cf := createConfFile(t, []byte(fmt.Sprintf(importSubscriptionOverlapTemplate, ">", ">", "prefix: myprefix"))) + defer removeFile(t, cf) + + s, opts := RunServerWithConfig(cf) + defer s.Shutdown() + + ncX := natsConnect(t, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", "publisher", "", opts.Port)) + defer ncX.Close() + + ncY := natsConnect(t, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", "subscriber", "", opts.Port)) + defer ncY.Close() + + for _, subj := range []string{">", "myprefix.*", "myprefix.>", "myprefix.test", "*.>", "*.*", "*.test"} { + t.Run(subj, func(t *testing.T) { + sub, err := ncY.SubscribeSync(subj) + sub.AutoUnsubscribe(1) + require_NoError(t, err) + require_NoError(t, ncY.Flush()) + + ncX.Publish("test", []byte("hello")) + + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_True(t, string(m.Data) == "hello") + }) + } +} + +func TestImportSubscriptionPartialOverlapWithTransform(t *testing.T) { + cf := createConfFile(t, []byte(fmt.Sprintf(importSubscriptionOverlapTemplate, "*.*.>", "*.*.>", " to: myprefix.$2.$1.>"))) + defer removeFile(t, cf) + + s, opts := RunServerWithConfig(cf) + defer s.Shutdown() + + ncX := natsConnect(t, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", "publisher", "", opts.Port)) + defer ncX.Close() + + ncY := natsConnect(t, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", "subscriber", "", opts.Port)) + defer ncY.Close() + + for _, subj := range []string{">", "*.*.*.>", "*.2.*.>", "*.*.1.>", "*.2.1.>", "*.*.*.*", "*.2.1.*", "*.*.*.test", + "*.*.1.test", "*.2.*.test", "*.2.1.test", "myprefix.*.*.*", "myprefix.>", "myprefix.*.>", "myprefix.*.*.>", + "myprefix.2.>", "myprefix.2.1.>", "myprefix.*.1.>", "myprefix.2.*.>", "myprefix.2.1.*", "myprefix.*.*.test", + "myprefix.2.1.test"} { + t.Run(subj, func(t *testing.T) { + sub, err := ncY.SubscribeSync(subj) + sub.AutoUnsubscribe(1) + require_NoError(t, err) + require_NoError(t, ncY.Flush()) + + ncX.Publish("1.2.test", []byte("hello")) + + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_True(t, string(m.Data) == "hello") + require_Equal(t, m.Subject, "myprefix.2.1.test") + }) + } +} diff --git a/server/client.go b/server/client.go index d83ae379..f99e012e 100644 --- a/server/client.go +++ b/server/client.go @@ -2478,8 +2478,9 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw // Used to pass stream import matches to addShadowSub type ime struct { - im *streamImport - dyn bool + im *streamImport + overlapSubj string + dyn bool } // If the client's account has stream imports and there are matches for @@ -2491,47 +2492,86 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { } var ( - _ims [16]ime - ims = _ims[:0] - tokens []string - tsa [32]string - hasWC bool + _ims [16]ime + ims = _ims[:0] + imTsa [32]string + tokens []string + tsa [32]string + hasWC bool + tokensModified bool ) acc.mu.RLock() - // Loop over the import subjects. We have 3 scenarios. If we have an + subj := string(sub.subject) + if len(acc.imports.streams) > 0 { + tokens = tsa[:0] + tokenizeSubjectIntoSlice(subj, &tokens) + for _, tk := range tokens { + if tk == pwcs { + hasWC = true + } + } + if tokens[len(tokens)-1] == fwcs { + hasWC = true + } + } + // Loop over the import subjects. We have 4 scenarios. If we have an // exact match or a superset match we should use the from field from - // the import. If we are a subset, we have to dynamically calculate - // the subject. + // the import. If we are a subset or overlap, we have to dynamically calculate + // the subject. On overlap, ime requires the overlap subject. for _, im := range acc.imports.streams { if im.invalid { continue } - subj := string(sub.subject) if subj == im.to { - ims = append(ims, ime{im, false}) + ims = append(ims, ime{im, "", false}) continue } - if tokens == nil { + if tokensModified { + // re-tokenize subj to overwrite modifications from a previous iteration tokens = tsa[:0] - start := 0 - for i := 0; i < len(subj); i++ { - // This is not perfect, but the test below will - // be more exact, this is just to trigger the - // additional test. - if subj[i] == pwc || subj[i] == fwc { - hasWC = true - } else if subj[i] == btsep { - tokens = append(tokens, subj[start:i]) - start = i + 1 + tokenizeSubjectIntoSlice(subj, &tokens) + tokensModified = false + } + imTokens := imTsa[:0] + tokenizeSubjectIntoSlice(im.to, &imTokens) + + if isSubsetMatchTokenized(tokens, imTokens) { + ims = append(ims, ime{im, "", true}) + } else if hasWC { + if isSubsetMatchTokenized(imTokens, tokens) { + ims = append(ims, ime{im, "", false}) + } else { + imTokensLen := len(imTokens) + for i, t := range tokens { + if i >= imTokensLen { + break + } + if t == pwcs && imTokens[i] != fwcs { + tokens[i] = imTokens[i] + tokensModified = true + } + } + tokensLen := len(tokens) + lastIdx := tokensLen - 1 + if tokens[lastIdx] == fwcs { + if imTokensLen >= tokensLen { + // rewrite ">" in tokens to be more specific + tokens[lastIdx] = imTokens[lastIdx] + tokensModified = true + if imTokensLen > tokensLen { + // copy even more specific parts from import + tokens = append(tokens, imTokens[tokensLen:]...) + } + } + } + if isSubsetMatchTokenized(tokens, imTokens) { + // As isSubsetMatchTokenized was already called with tokens and imTokens, + // we wouldn't be here if it where not for tokens being modified. + // Hence, Join to re compute the subject string + ims = append(ims, ime{im, strings.Join(tokens, tsep), true}) } } - tokens = append(tokens, subj[start:]) - } - if isSubsetMatch(tokens, im.to) { - ims = append(ims, ime{im, true}) - } else if hasWC && subjectIsSubsetMatch(im.to, subj) { - ims = append(ims, ime{im, false}) } } acc.mu.RUnlock() @@ -2572,13 +2612,21 @@ func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error if im.rtr == nil { im.rtr = im.tr.reverse() } - subj, err := im.rtr.transformSubject(string(nsub.subject)) + s := string(nsub.subject) + if ime.overlapSubj != _EMPTY_ { + s = ime.overlapSubj + } + subj, err := im.rtr.transformSubject(s) if err != nil { return nil, err } nsub.subject = []byte(subj) } else if !im.usePub || !ime.dyn { - nsub.subject = []byte(im.from) + if ime.overlapSubj != _EMPTY_ { + nsub.subject = []byte(ime.overlapSubj) + } else { + nsub.subject = []byte(im.from) + } } // Else use original subject c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name) diff --git a/server/sublist.go b/server/sublist.go index 28b5b9ff..bf9b91f7 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -29,6 +29,7 @@ import ( // Common byte variables for wildcards and token separator. const ( pwc = '*' + pwcs = "*" fwc = '>' fwcs = ">" tsep = "." @@ -1186,39 +1187,41 @@ func tokenAt(subject string, index uint8) string { return _EMPTY_ } +func tokenizeSubjectIntoSlice(subject string, tts *[]string) { + start := 0 + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + *tts = append(*tts, subject[start:i]) + start = i + 1 + } + } + *tts = append(*tts, subject[start:]) +} + // Calls into the function isSubsetMatch() func subjectIsSubsetMatch(subject, test string) bool { tsa := [32]string{} tts := tsa[:0] - start := 0 - for i := 0; i < len(subject); i++ { - if subject[i] == btsep { - tts = append(tts, subject[start:i]) - start = i + 1 - } - } - tts = append(tts, subject[start:]) + tokenizeSubjectIntoSlice(subject, &tts) return isSubsetMatch(tts, test) } // This will test a subject as an array of tokens against a test subject -// and determine if the tokens are matched. Both test subject and tokens -// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"], -// but not of foo.bar, etc. +// Calls into the function isSubsetMatchTokenized func isSubsetMatch(tokens []string, test string) bool { tsa := [32]string{} tts := tsa[:0] - start := 0 - for i := 0; i < len(test); i++ { - if test[i] == btsep { - tts = append(tts, test[start:i]) - start = i + 1 - } - } - tts = append(tts, test[start:]) + tokenizeSubjectIntoSlice(test, &tts) + return isSubsetMatchTokenized(tokens, tts) +} +// This will test a subject as an array of tokens against a test subject (also encoded as array of tokens) +// and determine if the tokens are matched. Both test subject and tokens +// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"], +// but not of foo.bar, etc. +func isSubsetMatchTokenized(tokens, test []string) bool { // Walk the target tokens - for i, t2 := range tts { + for i, t2 := range test { if i >= len(tokens) { return false } @@ -1241,7 +1244,7 @@ func isSubsetMatch(tokens []string, test string) bool { if !m { return false } - if i >= len(tts) { + if i >= len(test) { return true } continue @@ -1250,7 +1253,7 @@ func isSubsetMatch(tokens []string, test string) bool { return false } } - return len(tokens) == len(tts) + return len(tokens) == len(test) } // matchLiteral is used to test literal subjects, those that do not have any From 06ac80cdbeaa2a58ab05fe932608b3da9b9836f7 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 22 Jul 2021 11:25:56 -0400 Subject: [PATCH 2/4] Review comments Signed-off-by: Matthias Hanel --- server/accounts_test.go | 2 +- server/client.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index 9e010122..0b952fe6 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -3346,7 +3346,7 @@ func TestImportSubscriptionPartialOverlapWithPrefix(t *testing.T) { } func TestImportSubscriptionPartialOverlapWithTransform(t *testing.T) { - cf := createConfFile(t, []byte(fmt.Sprintf(importSubscriptionOverlapTemplate, "*.*.>", "*.*.>", " to: myprefix.$2.$1.>"))) + cf := createConfFile(t, []byte(fmt.Sprintf(importSubscriptionOverlapTemplate, "*.*.>", "*.*.>", "to: myprefix.$2.$1.>"))) defer removeFile(t, cf) s, opts := RunServerWithConfig(cf) diff --git a/server/client.go b/server/client.go index f99e012e..4886369b 100644 --- a/server/client.go +++ b/server/client.go @@ -2509,9 +2509,10 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { for _, tk := range tokens { if tk == pwcs { hasWC = true + break } } - if tokens[len(tokens)-1] == fwcs { + if !hasWC && tokens[len(tokens)-1] == fwcs { hasWC = true } } @@ -2524,7 +2525,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { continue } if subj == im.to { - ims = append(ims, ime{im, "", false}) + ims = append(ims, ime{im, _EMPTY_, false}) continue } if tokensModified { @@ -2537,10 +2538,10 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { tokenizeSubjectIntoSlice(im.to, &imTokens) if isSubsetMatchTokenized(tokens, imTokens) { - ims = append(ims, ime{im, "", true}) + ims = append(ims, ime{im, _EMPTY_, true}) } else if hasWC { if isSubsetMatchTokenized(imTokens, tokens) { - ims = append(ims, ime{im, "", false}) + ims = append(ims, ime{im, _EMPTY_, false}) } else { imTokensLen := len(imTokens) for i, t := range tokens { From 0a33f040e910338156e95957de611b07474e95ca Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 22 Jul 2021 16:46:57 -0400 Subject: [PATCH 3/4] fix performance by changing signature of tokenizeSubjectIntoSlice Signed-off-by: Matthias Hanel --- server/client.go | 9 +++------ server/sublist.go | 12 +++++++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/server/client.go b/server/client.go index 4886369b..f1b60db3 100644 --- a/server/client.go +++ b/server/client.go @@ -2504,8 +2504,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { acc.mu.RLock() subj := string(sub.subject) if len(acc.imports.streams) > 0 { - tokens = tsa[:0] - tokenizeSubjectIntoSlice(subj, &tokens) + tokens = tokenizeSubjectIntoSlice(tsa[:0], subj) for _, tk := range tokens { if tk == pwcs { hasWC = true @@ -2530,12 +2529,10 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { } if tokensModified { // re-tokenize subj to overwrite modifications from a previous iteration - tokens = tsa[:0] - tokenizeSubjectIntoSlice(subj, &tokens) + tokens = tokenizeSubjectIntoSlice(tsa[:0], subj) tokensModified = false } - imTokens := imTsa[:0] - tokenizeSubjectIntoSlice(im.to, &imTokens) + imTokens := tokenizeSubjectIntoSlice(imTsa[:0], im.to) if isSubsetMatchTokenized(tokens, imTokens) { ims = append(ims, ime{im, _EMPTY_, true}) diff --git a/server/sublist.go b/server/sublist.go index bf9b91f7..16225d76 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1187,22 +1187,24 @@ func tokenAt(subject string, index uint8) string { return _EMPTY_ } -func tokenizeSubjectIntoSlice(subject string, tts *[]string) { +// use similar to append. meaning, the updated slice will be returned +func tokenizeSubjectIntoSlice(tts []string, subject string) []string { start := 0 for i := 0; i < len(subject); i++ { if subject[i] == btsep { - *tts = append(*tts, subject[start:i]) + tts = append(tts, subject[start:i]) start = i + 1 } } - *tts = append(*tts, subject[start:]) + tts = append(tts, subject[start:]) + return tts } // Calls into the function isSubsetMatch() func subjectIsSubsetMatch(subject, test string) bool { tsa := [32]string{} tts := tsa[:0] - tokenizeSubjectIntoSlice(subject, &tts) + tts = tokenizeSubjectIntoSlice(tts, subject) return isSubsetMatch(tts, test) } @@ -1211,7 +1213,7 @@ func subjectIsSubsetMatch(subject, test string) bool { func isSubsetMatch(tokens []string, test string) bool { tsa := [32]string{} tts := tsa[:0] - tokenizeSubjectIntoSlice(test, &tts) + tts = tokenizeSubjectIntoSlice(tts, test) return isSubsetMatchTokenized(tokens, tts) } From ccef1bf327a6fa7fae8581ed54b24e50b2ee1d6e Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 22 Jul 2021 18:53:06 -0400 Subject: [PATCH 4/4] Remove unnecessary lines Signed-off-by: Matthias Hanel --- server/sublist.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index 16225d76..9520e26d 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1203,8 +1203,7 @@ func tokenizeSubjectIntoSlice(tts []string, subject string) []string { // Calls into the function isSubsetMatch() func subjectIsSubsetMatch(subject, test string) bool { tsa := [32]string{} - tts := tsa[:0] - tts = tokenizeSubjectIntoSlice(tts, subject) + tts := tokenizeSubjectIntoSlice(tsa[:0], subject) return isSubsetMatch(tts, test) } @@ -1212,8 +1211,7 @@ func subjectIsSubsetMatch(subject, test string) bool { // Calls into the function isSubsetMatchTokenized func isSubsetMatch(tokens []string, test string) bool { tsa := [32]string{} - tts := tsa[:0] - tts = tokenizeSubjectIntoSlice(tts, test) + tts := tokenizeSubjectIntoSlice(tsa[:0], test) return isSubsetMatchTokenized(tokens, tts) }