mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
@@ -836,6 +836,99 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
}
|
||||
|
||||
func TestMultipleImportsAndSingleWCSub(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
cfoo, _, _ := newClientForServer(s)
|
||||
defer cfoo.nc.Close()
|
||||
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
if err := fooAcc.AddStreamExport("foo", []*Account{barAcc}); err != nil {
|
||||
t.Fatalf("Error adding stream export to account foo: %v", err)
|
||||
}
|
||||
if err := fooAcc.AddStreamExport("bar", []*Account{barAcc}); err != nil {
|
||||
t.Fatalf("Error adding stream export to account foo: %v", err)
|
||||
}
|
||||
|
||||
if err := barAcc.AddStreamImport(fooAcc, "foo", "pub."); err != nil {
|
||||
t.Fatalf("Error adding stream import to account bar: %v", err)
|
||||
}
|
||||
if err := barAcc.AddStreamImport(fooAcc, "bar", "pub."); err != nil {
|
||||
t.Fatalf("Error adding stream import to account bar: %v", err)
|
||||
}
|
||||
|
||||
// Wildcard Subscription on bar client for both imports.
|
||||
cbar.parse([]byte("SUB pub.* 1\r\n"))
|
||||
|
||||
// Now publish a message on 'foo' and 'bar'
|
||||
go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\nPUB bar 5\r\nworld\r\n"))
|
||||
|
||||
// Now check we got the messages from the wildcard subscription.
|
||||
l, err := crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
matches := mraw[0]
|
||||
if matches[SUB_INDEX] != "pub.foo" {
|
||||
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
||||
}
|
||||
if matches[SID_INDEX] != "1" {
|
||||
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
||||
}
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
|
||||
l, err = crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
mraw = msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
matches = mraw[0]
|
||||
if matches[SUB_INDEX] != "pub.bar" {
|
||||
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
|
||||
}
|
||||
if matches[SID_INDEX] != "1" {
|
||||
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
|
||||
}
|
||||
checkPayload(crBar, []byte("world\r\n"), t)
|
||||
|
||||
// Check subscription count.
|
||||
if fslc := fooAcc.sl.Count(); fslc != 2 {
|
||||
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
|
||||
}
|
||||
if bslc := barAcc.sl.Count(); bslc != 1 {
|
||||
t.Fatalf("Expected 1 normal subscriptions on barAcc, got %d", bslc)
|
||||
}
|
||||
|
||||
// Now unsubscribe.
|
||||
if err := cbar.parse([]byte("UNSUB 1\r\n")); err != nil {
|
||||
t.Fatalf("Error for client 'bar' from server: %v", err)
|
||||
}
|
||||
// We should have zero on both.
|
||||
if bslc := barAcc.sl.Count(); bslc != 0 {
|
||||
t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc)
|
||||
}
|
||||
if fslc := fooAcc.sl.Count(); fslc != 0 {
|
||||
t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCrossAccountRequestReply(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
108
server/client.go
108
server/client.go
@@ -1343,8 +1343,6 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// We can have two SUB protocols coming from a route due to some
|
||||
// race conditions. We should make sure that we process only one.
|
||||
sid := string(sub.sid)
|
||||
acc := c.acc
|
||||
|
||||
@@ -1388,50 +1386,79 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
|
||||
var rims [32]*streamImport
|
||||
var ims = rims[:0]
|
||||
var tokens []string
|
||||
var (
|
||||
rims [32]*streamImport
|
||||
ims = rims[:0]
|
||||
rfroms [32]*streamImport
|
||||
froms = rfroms[:0]
|
||||
tokens []string
|
||||
tsa [32]string
|
||||
hasWC bool
|
||||
)
|
||||
|
||||
acc.mu.RLock()
|
||||
// Loop over the import subjects. We have 3 scenarios. If we exact
|
||||
// match or we know the proposed subject is a strict subset of the
|
||||
// import we can subscribe the subcsription's subject directly.
|
||||
// The third scenario is where the proposed subject has a wildcard
|
||||
// and may not be an exact subset, but is a match. Therefore we have to
|
||||
// subscribe to the import subject, not the subscription's subject.
|
||||
for _, im := range acc.imports.streams {
|
||||
subj := string(sub.subject)
|
||||
if subj == im.prefix+im.from {
|
||||
ims = append(ims, im)
|
||||
continue
|
||||
}
|
||||
if tokens == nil {
|
||||
tokens = strings.Split(string(sub.subject), tsep)
|
||||
tokens = tsa[:0]
|
||||
start := 0
|
||||
for i := 0; i < len(subj); i++ {
|
||||
//This is not perfect, but the test below will
|
||||
// be more exact, this is just to trigger the
|
||||
// additional test.
|
||||
if subj[i] == pwc || subj[i] == fwc {
|
||||
hasWC = true
|
||||
} else if subj[i] == btsep {
|
||||
tokens = append(tokens, subj[start:i])
|
||||
start = i + 1
|
||||
}
|
||||
}
|
||||
tokens = append(tokens, subj[start:])
|
||||
}
|
||||
if isSubsetMatch(tokens, im.prefix+im.from) {
|
||||
ims = append(ims, im)
|
||||
} else if hasWC {
|
||||
if subjectIsSubsetMatch(im.prefix+im.from, subj) {
|
||||
froms = append(froms, im)
|
||||
}
|
||||
}
|
||||
}
|
||||
acc.mu.RUnlock()
|
||||
|
||||
var shadow []*subscription
|
||||
|
||||
if len(ims) > 0 || len(froms) > 0 {
|
||||
shadow = make([]*subscription, 0, len(ims)+len(froms))
|
||||
}
|
||||
|
||||
// Now walk through collected importMaps
|
||||
for _, im := range ims {
|
||||
// We have a match for a local subscription with an import from another account.
|
||||
// We will create a shadow subscription.
|
||||
nsub := *sub // copy
|
||||
nsub.im = im
|
||||
if im.prefix != "" {
|
||||
// redo subject here to match subject in the publisher account space.
|
||||
// Just remove prefix from what they gave us. That maps into other space.
|
||||
nsub.subject = sub.subject[len(im.prefix):]
|
||||
nsub, err := c.addShadowSub(sub, im, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name)
|
||||
|
||||
if err := im.acc.sl.Insert(&nsub); err != nil {
|
||||
errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name)
|
||||
c.Debugf(errs)
|
||||
return fmt.Errorf(errs)
|
||||
shadow = append(shadow, nsub)
|
||||
}
|
||||
// Now walk through importMaps that we need to subscribe
|
||||
// exactly to the from property.
|
||||
for _, im := range froms {
|
||||
// We will create a shadow subscription.
|
||||
nsub, err := c.addShadowSub(sub, im, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Update our route map here.
|
||||
c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1)
|
||||
// FIXME(dlc) - make sure to remove as well!
|
||||
|
||||
if shadow == nil {
|
||||
shadow = make([]*subscription, 0, len(ims))
|
||||
}
|
||||
shadow = append(shadow, &nsub)
|
||||
shadow = append(shadow, nsub)
|
||||
}
|
||||
|
||||
if shadow != nil {
|
||||
@@ -1443,6 +1470,31 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add in the shadow subscription.
|
||||
func (c *client) addShadowSub(sub *subscription, im *streamImport, useFrom bool) (*subscription, error) {
|
||||
nsub := *sub // copy
|
||||
nsub.im = im
|
||||
if useFrom {
|
||||
nsub.subject = []byte(im.from)
|
||||
} else if im.prefix != "" {
|
||||
// redo subject here to match subject in the publisher account space.
|
||||
// Just remove prefix from what they gave us. That maps into other space.
|
||||
nsub.subject = sub.subject[len(im.prefix):]
|
||||
}
|
||||
|
||||
c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name)
|
||||
|
||||
if err := im.acc.sl.Insert(&nsub); err != nil {
|
||||
errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name)
|
||||
c.Debugf(errs)
|
||||
return nil, fmt.Errorf(errs)
|
||||
}
|
||||
|
||||
// Update our route map here.
|
||||
c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1)
|
||||
return &nsub, nil
|
||||
}
|
||||
|
||||
// canSubscribe determines if the client is authorized to subscribe to the
|
||||
// given subject. Assumes caller is holding lock.
|
||||
func (c *client) canSubscribe(subject string) bool {
|
||||
|
||||
@@ -790,6 +790,21 @@ func IsValidLiteralSubject(subject string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Calls into the function isSubsetMatch()
|
||||
func subjectIsSubsetMatch(subject, test string) bool {
|
||||
tsa := [32]string{}
|
||||
tts := tsa[:0]
|
||||
start := 0
|
||||
for i := 0; i < len(subject); i++ {
|
||||
if subject[i] == btsep {
|
||||
tts = append(tts, subject[start:i])
|
||||
start = i + 1
|
||||
}
|
||||
}
|
||||
tts = append(tts, subject[start:])
|
||||
return isSubsetMatch(tts, test)
|
||||
}
|
||||
|
||||
// This will test a subject as an array of tokens against a test subject
|
||||
// and determine if the tokens are matched. Both test subject and tokens
|
||||
// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"],
|
||||
|
||||
Reference in New Issue
Block a user