mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
Merge pull request #2369 from nats-io/import-export-2361
[fixed] subscription on wildcard import that is not a subset
This commit is contained in:
@@ -3294,3 +3294,86 @@ func TestAccountSystemPermsWithGlobalAccess(t *testing.T) {
|
||||
}
|
||||
defer sc.Close()
|
||||
}
|
||||
|
||||
const importSubscriptionOverlapTemplate = `
|
||||
listen: 127.0.0.1:-1
|
||||
accounts: {
|
||||
ACCOUNT_X: {
|
||||
users: [
|
||||
{user: publisher}
|
||||
]
|
||||
exports: [
|
||||
{stream: %s}
|
||||
]
|
||||
},
|
||||
ACCOUNT_Y: {
|
||||
users: [
|
||||
{user: subscriber}
|
||||
]
|
||||
imports: [
|
||||
{stream: {account: ACCOUNT_X, subject: %s }, %s}
|
||||
]
|
||||
}
|
||||
}`
|
||||
|
||||
func TestImportSubscriptionPartialOverlapWithPrefix(t *testing.T) {
|
||||
cf := createConfFile(t, []byte(fmt.Sprintf(importSubscriptionOverlapTemplate, ">", ">", "prefix: myprefix")))
|
||||
defer removeFile(t, cf)
|
||||
|
||||
s, opts := RunServerWithConfig(cf)
|
||||
defer s.Shutdown()
|
||||
|
||||
ncX := natsConnect(t, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", "publisher", "", opts.Port))
|
||||
defer ncX.Close()
|
||||
|
||||
ncY := natsConnect(t, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", "subscriber", "", opts.Port))
|
||||
defer ncY.Close()
|
||||
|
||||
for _, subj := range []string{">", "myprefix.*", "myprefix.>", "myprefix.test", "*.>", "*.*", "*.test"} {
|
||||
t.Run(subj, func(t *testing.T) {
|
||||
sub, err := ncY.SubscribeSync(subj)
|
||||
sub.AutoUnsubscribe(1)
|
||||
require_NoError(t, err)
|
||||
require_NoError(t, ncY.Flush())
|
||||
|
||||
ncX.Publish("test", []byte("hello"))
|
||||
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
require_True(t, string(m.Data) == "hello")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestImportSubscriptionPartialOverlapWithTransform(t *testing.T) {
|
||||
cf := createConfFile(t, []byte(fmt.Sprintf(importSubscriptionOverlapTemplate, "*.*.>", "*.*.>", "to: myprefix.$2.$1.>")))
|
||||
defer removeFile(t, cf)
|
||||
|
||||
s, opts := RunServerWithConfig(cf)
|
||||
defer s.Shutdown()
|
||||
|
||||
ncX := natsConnect(t, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", "publisher", "", opts.Port))
|
||||
defer ncX.Close()
|
||||
|
||||
ncY := natsConnect(t, fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", "subscriber", "", opts.Port))
|
||||
defer ncY.Close()
|
||||
|
||||
for _, subj := range []string{">", "*.*.*.>", "*.2.*.>", "*.*.1.>", "*.2.1.>", "*.*.*.*", "*.2.1.*", "*.*.*.test",
|
||||
"*.*.1.test", "*.2.*.test", "*.2.1.test", "myprefix.*.*.*", "myprefix.>", "myprefix.*.>", "myprefix.*.*.>",
|
||||
"myprefix.2.>", "myprefix.2.1.>", "myprefix.*.1.>", "myprefix.2.*.>", "myprefix.2.1.*", "myprefix.*.*.test",
|
||||
"myprefix.2.1.test"} {
|
||||
t.Run(subj, func(t *testing.T) {
|
||||
sub, err := ncY.SubscribeSync(subj)
|
||||
sub.AutoUnsubscribe(1)
|
||||
require_NoError(t, err)
|
||||
require_NoError(t, ncY.Flush())
|
||||
|
||||
ncX.Publish("1.2.test", []byte("hello"))
|
||||
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
require_True(t, string(m.Data) == "hello")
|
||||
require_Equal(t, m.Subject, "myprefix.2.1.test")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
110
server/client.go
110
server/client.go
@@ -2478,8 +2478,9 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw
|
||||
|
||||
// Used to pass stream import matches to addShadowSub
|
||||
type ime struct {
|
||||
im *streamImport
|
||||
dyn bool
|
||||
im *streamImport
|
||||
overlapSubj string
|
||||
dyn bool
|
||||
}
|
||||
|
||||
// If the client's account has stream imports and there are matches for
|
||||
@@ -2491,47 +2492,84 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
|
||||
}
|
||||
|
||||
var (
|
||||
_ims [16]ime
|
||||
ims = _ims[:0]
|
||||
tokens []string
|
||||
tsa [32]string
|
||||
hasWC bool
|
||||
_ims [16]ime
|
||||
ims = _ims[:0]
|
||||
imTsa [32]string
|
||||
tokens []string
|
||||
tsa [32]string
|
||||
hasWC bool
|
||||
tokensModified bool
|
||||
)
|
||||
|
||||
acc.mu.RLock()
|
||||
// Loop over the import subjects. We have 3 scenarios. If we have an
|
||||
subj := string(sub.subject)
|
||||
if len(acc.imports.streams) > 0 {
|
||||
tokens = tokenizeSubjectIntoSlice(tsa[:0], subj)
|
||||
for _, tk := range tokens {
|
||||
if tk == pwcs {
|
||||
hasWC = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasWC && tokens[len(tokens)-1] == fwcs {
|
||||
hasWC = true
|
||||
}
|
||||
}
|
||||
// Loop over the import subjects. We have 4 scenarios. If we have an
|
||||
// exact match or a superset match we should use the from field from
|
||||
// the import. If we are a subset, we have to dynamically calculate
|
||||
// the subject.
|
||||
// the import. If we are a subset or overlap, we have to dynamically calculate
|
||||
// the subject. On overlap, ime requires the overlap subject.
|
||||
for _, im := range acc.imports.streams {
|
||||
if im.invalid {
|
||||
continue
|
||||
}
|
||||
subj := string(sub.subject)
|
||||
if subj == im.to {
|
||||
ims = append(ims, ime{im, false})
|
||||
ims = append(ims, ime{im, _EMPTY_, false})
|
||||
continue
|
||||
}
|
||||
if tokens == nil {
|
||||
tokens = tsa[:0]
|
||||
start := 0
|
||||
for i := 0; i < len(subj); i++ {
|
||||
// This is not perfect, but the test below will
|
||||
// be more exact, this is just to trigger the
|
||||
// additional test.
|
||||
if subj[i] == pwc || subj[i] == fwc {
|
||||
hasWC = true
|
||||
} else if subj[i] == btsep {
|
||||
tokens = append(tokens, subj[start:i])
|
||||
start = i + 1
|
||||
if tokensModified {
|
||||
// re-tokenize subj to overwrite modifications from a previous iteration
|
||||
tokens = tokenizeSubjectIntoSlice(tsa[:0], subj)
|
||||
tokensModified = false
|
||||
}
|
||||
imTokens := tokenizeSubjectIntoSlice(imTsa[:0], im.to)
|
||||
|
||||
if isSubsetMatchTokenized(tokens, imTokens) {
|
||||
ims = append(ims, ime{im, _EMPTY_, true})
|
||||
} else if hasWC {
|
||||
if isSubsetMatchTokenized(imTokens, tokens) {
|
||||
ims = append(ims, ime{im, _EMPTY_, false})
|
||||
} else {
|
||||
imTokensLen := len(imTokens)
|
||||
for i, t := range tokens {
|
||||
if i >= imTokensLen {
|
||||
break
|
||||
}
|
||||
if t == pwcs && imTokens[i] != fwcs {
|
||||
tokens[i] = imTokens[i]
|
||||
tokensModified = true
|
||||
}
|
||||
}
|
||||
tokensLen := len(tokens)
|
||||
lastIdx := tokensLen - 1
|
||||
if tokens[lastIdx] == fwcs {
|
||||
if imTokensLen >= tokensLen {
|
||||
// rewrite ">" in tokens to be more specific
|
||||
tokens[lastIdx] = imTokens[lastIdx]
|
||||
tokensModified = true
|
||||
if imTokensLen > tokensLen {
|
||||
// copy even more specific parts from import
|
||||
tokens = append(tokens, imTokens[tokensLen:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
if isSubsetMatchTokenized(tokens, imTokens) {
|
||||
// As isSubsetMatchTokenized was already called with tokens and imTokens,
|
||||
// we wouldn't be here if it where not for tokens being modified.
|
||||
// Hence, Join to re compute the subject string
|
||||
ims = append(ims, ime{im, strings.Join(tokens, tsep), true})
|
||||
}
|
||||
}
|
||||
tokens = append(tokens, subj[start:])
|
||||
}
|
||||
if isSubsetMatch(tokens, im.to) {
|
||||
ims = append(ims, ime{im, true})
|
||||
} else if hasWC && subjectIsSubsetMatch(im.to, subj) {
|
||||
ims = append(ims, ime{im, false})
|
||||
}
|
||||
}
|
||||
acc.mu.RUnlock()
|
||||
@@ -2572,13 +2610,21 @@ func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error
|
||||
if im.rtr == nil {
|
||||
im.rtr = im.tr.reverse()
|
||||
}
|
||||
subj, err := im.rtr.transformSubject(string(nsub.subject))
|
||||
s := string(nsub.subject)
|
||||
if ime.overlapSubj != _EMPTY_ {
|
||||
s = ime.overlapSubj
|
||||
}
|
||||
subj, err := im.rtr.transformSubject(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nsub.subject = []byte(subj)
|
||||
} else if !im.usePub || !ime.dyn {
|
||||
nsub.subject = []byte(im.from)
|
||||
if ime.overlapSubj != _EMPTY_ {
|
||||
nsub.subject = []byte(ime.overlapSubj)
|
||||
} else {
|
||||
nsub.subject = []byte(im.from)
|
||||
}
|
||||
}
|
||||
// Else use original subject
|
||||
c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name)
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
// Common byte variables for wildcards and token separator.
|
||||
const (
|
||||
pwc = '*'
|
||||
pwcs = "*"
|
||||
fwc = '>'
|
||||
fwcs = ">"
|
||||
tsep = "."
|
||||
@@ -1186,10 +1187,8 @@ func tokenAt(subject string, index uint8) string {
|
||||
return _EMPTY_
|
||||
}
|
||||
|
||||
// Calls into the function isSubsetMatch()
|
||||
func subjectIsSubsetMatch(subject, test string) bool {
|
||||
tsa := [32]string{}
|
||||
tts := tsa[:0]
|
||||
// use similar to append. meaning, the updated slice will be returned
|
||||
func tokenizeSubjectIntoSlice(tts []string, subject string) []string {
|
||||
start := 0
|
||||
for i := 0; i < len(subject); i++ {
|
||||
if subject[i] == btsep {
|
||||
@@ -1198,27 +1197,31 @@ func subjectIsSubsetMatch(subject, test string) bool {
|
||||
}
|
||||
}
|
||||
tts = append(tts, subject[start:])
|
||||
return tts
|
||||
}
|
||||
|
||||
// Calls into the function isSubsetMatch()
|
||||
func subjectIsSubsetMatch(subject, test string) bool {
|
||||
tsa := [32]string{}
|
||||
tts := tokenizeSubjectIntoSlice(tsa[:0], subject)
|
||||
return isSubsetMatch(tts, test)
|
||||
}
|
||||
|
||||
// This will test a subject as an array of tokens against a test subject
|
||||
// Calls into the function isSubsetMatchTokenized
|
||||
func isSubsetMatch(tokens []string, test string) bool {
|
||||
tsa := [32]string{}
|
||||
tts := tokenizeSubjectIntoSlice(tsa[:0], test)
|
||||
return isSubsetMatchTokenized(tokens, tts)
|
||||
}
|
||||
|
||||
// This will test a subject as an array of tokens against a test subject (also encoded as array of tokens)
|
||||
// and determine if the tokens are matched. Both test subject and tokens
|
||||
// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"],
|
||||
// but not of foo.bar, etc.
|
||||
func isSubsetMatch(tokens []string, test string) bool {
|
||||
tsa := [32]string{}
|
||||
tts := tsa[:0]
|
||||
start := 0
|
||||
for i := 0; i < len(test); i++ {
|
||||
if test[i] == btsep {
|
||||
tts = append(tts, test[start:i])
|
||||
start = i + 1
|
||||
}
|
||||
}
|
||||
tts = append(tts, test[start:])
|
||||
|
||||
func isSubsetMatchTokenized(tokens, test []string) bool {
|
||||
// Walk the target tokens
|
||||
for i, t2 := range tts {
|
||||
for i, t2 := range test {
|
||||
if i >= len(tokens) {
|
||||
return false
|
||||
}
|
||||
@@ -1241,7 +1244,7 @@ func isSubsetMatch(tokens []string, test string) bool {
|
||||
if !m {
|
||||
return false
|
||||
}
|
||||
if i >= len(tts) {
|
||||
if i >= len(test) {
|
||||
return true
|
||||
}
|
||||
continue
|
||||
@@ -1250,7 +1253,7 @@ func isSubsetMatch(tokens []string, test string) bool {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return len(tokens) == len(tts)
|
||||
return len(tokens) == len(test)
|
||||
}
|
||||
|
||||
// matchLiteral is used to test literal subjects, those that do not have any
|
||||
|
||||
Reference in New Issue
Block a user