Cleanup of shadowed subscriptions, fixes #772

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-10-08 09:53:41 -07:00
parent 3438468c95
commit ce95e5a84b
2 changed files with 50 additions and 6 deletions

View File

@@ -529,10 +529,8 @@ func TestSimpleMapping(t *testing.T) {
t.Fatalf("Error adding account import to client bar: %v", err)
}
// Normal Subscription on bar client.
go cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\nPING\r\n"))
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
// Normal and Queue Subscription on bar client.
if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
@@ -568,6 +566,29 @@ func TestSimpleMapping(t *testing.T) {
}
checkMsg(l, "2")
checkPayload(crBar, []byte("hello\r\n"), t)
// We should have 2 subscriptions in both. Normal and Queue Subscriber
// for barAcc which are local, and 2 that are shadowed in fooAcc.
// Now make sure that when we unsubscribe we clean up properly for both.
if bslc := barAcc.sl.Count(); bslc != 2 {
t.Fatalf("Expected 2 normal subscriptions on barAcc, got %d", bslc)
}
if fslc := fooAcc.sl.Count(); fslc != 2 {
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
}
// Now unsubscribe.
if err := cbar.parse([]byte("UNSUB 1\r\nUNSUB 2\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// We should not have zero on both.
if bslc := barAcc.sl.Count(); bslc != 0 {
t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc)
}
if fslc := fooAcc.sl.Count(); fslc != 0 {
t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc)
}
}
func TestNoPrefixWildcardMapping(t *testing.T) {

View File

@@ -242,7 +242,8 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState {
// interest in published messages.
type subscription struct {
client *client
im *streamImport // This is for importing support.
im *streamImport // This is for import stream support.
shadow []*subscription // This is to track shadowed accounts.
subject []byte
queue []byte
sid []byte
@@ -1382,6 +1383,7 @@ func (c *client) checkAccountImports(sub *subscription) error {
var rims [32]*streamImport
var ims = rims[:0]
acc.mu.RLock()
for _, im := range acc.imports.streams {
if isSubsetMatch(tokens, im.prefix+im.from) {
@@ -1390,6 +1392,8 @@ func (c *client) checkAccountImports(sub *subscription) error {
}
acc.mu.RUnlock()
var shadow []*subscription
// Now walk through collected importMaps
for _, im := range ims {
// We have a match for a local subscription with an import from another account.
@@ -1402,9 +1406,20 @@ func (c *client) checkAccountImports(sub *subscription) error {
nsub.subject = sub.subject[len(im.prefix):]
}
if err := im.acc.sl.Insert(&nsub); err != nil {
return fmt.Errorf("Could not add shadow import subscription for account %q", im.acc.Name)
errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name)
c.Debugf(errs)
return fmt.Errorf(errs)
}
if shadow == nil {
shadow = make([]*subscription, 0, len(ims))
}
shadow = append(shadow, &nsub)
}
c.mu.Lock()
sub.shadow = shadow
c.mu.Unlock()
return nil
}
@@ -1455,6 +1470,14 @@ func (c *client) unsubscribe(sub *subscription) {
if c.typ == CLIENT && c.srv != nil && len(sub.queue) > 0 {
c.srv.holdRemoteQSub(sub)
}
// Check to see if we have shadown subscriptions.
for _, nsub := range sub.shadow {
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
}
}
sub.shadow = nil
}
func (c *client) processUnsub(arg []byte) error {