From 15bdfbb4aa7168acdf0b69eada9213a7f8a51a82 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 7 Nov 2018 19:50:45 -0800 Subject: [PATCH] Fix for #793 Signed-off-by: Derek Collison --- server/accounts_test.go | 93 ++++++++++++++++++++++++++++++++++ server/client.go | 108 +++++++++++++++++++++++++++++----------- server/sublist.go | 15 ++++++ 3 files changed, 188 insertions(+), 28 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index 571ae403..3d6fed18 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -836,6 +836,99 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { checkPayload(crBar, []byte("hello\r\n"), t) } +func TestMultipleImportsAndSingleWCSub(t *testing.T) { + s, fooAcc, barAcc := simpleAccountServer(t) + defer s.Shutdown() + + cfoo, _, _ := newClientForServer(s) + defer cfoo.nc.Close() + + if err := cfoo.registerWithAccount(fooAcc); err != nil { + t.Fatalf("Error registering client with 'foo' account: %v", err) + } + cbar, crBar, _ := newClientForServer(s) + defer cbar.nc.Close() + + if err := cbar.registerWithAccount(barAcc); err != nil { + t.Fatalf("Error registering client with 'bar' account: %v", err) + } + + if err := fooAcc.AddStreamExport("foo", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to account foo: %v", err) + } + if err := fooAcc.AddStreamExport("bar", []*Account{barAcc}); err != nil { + t.Fatalf("Error adding stream export to account foo: %v", err) + } + + if err := barAcc.AddStreamImport(fooAcc, "foo", "pub."); err != nil { + t.Fatalf("Error adding stream import to account bar: %v", err) + } + if err := barAcc.AddStreamImport(fooAcc, "bar", "pub."); err != nil { + t.Fatalf("Error adding stream import to account bar: %v", err) + } + + // Wildcard Subscription on bar client for both imports. + cbar.parse([]byte("SUB pub.* 1\r\n")) + + // Now publish a message on 'foo' and 'bar' + go cfoo.parseAndFlush([]byte("PUB foo 5\r\nhello\r\nPUB bar 5\r\nworld\r\n")) + + // Now check we got the messages from the wildcard subscription. + l, err := crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw := msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches := mraw[0] + if matches[SUB_INDEX] != "pub.foo" { + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("hello\r\n"), t) + + l, err = crBar.ReadString('\n') + if err != nil { + t.Fatalf("Error reading from client 'bar': %v", err) + } + mraw = msgPat.FindAllStringSubmatch(l, -1) + if len(mraw) == 0 { + t.Fatalf("No message received") + } + matches = mraw[0] + if matches[SUB_INDEX] != "pub.bar" { + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) + } + if matches[SID_INDEX] != "1" { + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) + } + checkPayload(crBar, []byte("world\r\n"), t) + + // Check subscription count. + if fslc := fooAcc.sl.Count(); fslc != 2 { + t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc) + } + if bslc := barAcc.sl.Count(); bslc != 1 { + t.Fatalf("Expected 1 normal subscriptions on barAcc, got %d", bslc) + } + + // Now unsubscribe. + if err := cbar.parse([]byte("UNSUB 1\r\n")); err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + // We should have zero on both. + if bslc := barAcc.sl.Count(); bslc != 0 { + t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc) + } + if fslc := fooAcc.sl.Count(); fslc != 0 { + t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc) + } +} + func TestCrossAccountRequestReply(t *testing.T) { s, fooAcc, barAcc := simpleAccountServer(t) defer s.Shutdown() diff --git a/server/client.go b/server/client.go index 3158e18f..2b2fbc43 100644 --- a/server/client.go +++ b/server/client.go @@ -1343,8 +1343,6 @@ func (c *client) processSub(argo []byte) (err error) { return nil } - // We can have two SUB protocols coming from a route due to some - // race conditions. We should make sure that we process only one. sid := string(sub.sid) acc := c.acc @@ -1388,50 +1386,79 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { return ErrMissingAccount } - var rims [32]*streamImport - var ims = rims[:0] - var tokens []string + var ( + rims [32]*streamImport + ims = rims[:0] + rfroms [32]*streamImport + froms = rfroms[:0] + tokens []string + tsa [32]string + hasWC bool + ) acc.mu.RLock() + // Loop over the import subjects. We have 3 scenarios. If we exact + // match or we know the proposed subject is a strict subset of the + // import we can subscribe the subcsription's subject directly. + // The third scenario is where the proposed subject has a wildcard + // and may not be an exact subset, but is a match. Therefore we have to + // subscribe to the import subject, not the subscription's subject. for _, im := range acc.imports.streams { + subj := string(sub.subject) + if subj == im.prefix+im.from { + ims = append(ims, im) + continue + } if tokens == nil { - tokens = strings.Split(string(sub.subject), tsep) + tokens = tsa[:0] + start := 0 + for i := 0; i < len(subj); i++ { + //This is not perfect, but the test below will + // be more exact, this is just to trigger the + // additional test. + if subj[i] == pwc || subj[i] == fwc { + hasWC = true + } else if subj[i] == btsep { + tokens = append(tokens, subj[start:i]) + start = i + 1 + } + } + tokens = append(tokens, subj[start:]) } if isSubsetMatch(tokens, im.prefix+im.from) { ims = append(ims, im) + } else if hasWC { + if subjectIsSubsetMatch(im.prefix+im.from, subj) { + froms = append(froms, im) + } } } acc.mu.RUnlock() var shadow []*subscription + if len(ims) > 0 || len(froms) > 0 { + shadow = make([]*subscription, 0, len(ims)+len(froms)) + } + // Now walk through collected importMaps for _, im := range ims { - // We have a match for a local subscription with an import from another account. // We will create a shadow subscription. - nsub := *sub // copy - nsub.im = im - if im.prefix != "" { - // redo subject here to match subject in the publisher account space. - // Just remove prefix from what they gave us. That maps into other space. - nsub.subject = sub.subject[len(im.prefix):] + nsub, err := c.addShadowSub(sub, im, false) + if err != nil { + return err } - - c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name) - - if err := im.acc.sl.Insert(&nsub); err != nil { - errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name) - c.Debugf(errs) - return fmt.Errorf(errs) + shadow = append(shadow, nsub) + } + // Now walk through importMaps that we need to subscribe + // exactly to the from property. + for _, im := range froms { + // We will create a shadow subscription. + nsub, err := c.addShadowSub(sub, im, true) + if err != nil { + return err } - // Update our route map here. - c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1) - // FIXME(dlc) - make sure to remove as well! - - if shadow == nil { - shadow = make([]*subscription, 0, len(ims)) - } - shadow = append(shadow, &nsub) + shadow = append(shadow, nsub) } if shadow != nil { @@ -1443,6 +1470,31 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { return nil } +// Add in the shadow subscription. +func (c *client) addShadowSub(sub *subscription, im *streamImport, useFrom bool) (*subscription, error) { + nsub := *sub // copy + nsub.im = im + if useFrom { + nsub.subject = []byte(im.from) + } else if im.prefix != "" { + // redo subject here to match subject in the publisher account space. + // Just remove prefix from what they gave us. That maps into other space. + nsub.subject = sub.subject[len(im.prefix):] + } + + c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name) + + if err := im.acc.sl.Insert(&nsub); err != nil { + errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name) + c.Debugf(errs) + return nil, fmt.Errorf(errs) + } + + // Update our route map here. + c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1) + return &nsub, nil +} + // canSubscribe determines if the client is authorized to subscribe to the // given subject. Assumes caller is holding lock. func (c *client) canSubscribe(subject string) bool { diff --git a/server/sublist.go b/server/sublist.go index 86141eaf..2389b586 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -790,6 +790,21 @@ func IsValidLiteralSubject(subject string) bool { return true } +// Calls into the function isSubsetMatch() +func subjectIsSubsetMatch(subject, test string) bool { + tsa := [32]string{} + tts := tsa[:0] + start := 0 + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tts = append(tts, subject[start:i]) + start = i + 1 + } + } + tts = append(tts, subject[start:]) + return isSubsetMatch(tts, test) +} + // This will test a subject as an array of tokens against a test subject // and determine if the tokens are matched. Both test subject and tokens // may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"],