diff --git a/server/accounts_test.go b/server/accounts_test.go index 68c77896..a46582b2 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -529,10 +529,8 @@ func TestSimpleMapping(t *testing.T) { t.Fatalf("Error adding account import to client bar: %v", err) } - // Normal Subscription on bar client. - go cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\nPING\r\n")) - _, err := crBar.ReadString('\n') // Make sure subscriptions were processed. - if err != nil { + // Normal and Queue Subscription on bar client. + if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil { t.Fatalf("Error for client 'bar' from server: %v", err) } @@ -568,6 +566,29 @@ func TestSimpleMapping(t *testing.T) { } checkMsg(l, "2") checkPayload(crBar, []byte("hello\r\n"), t) + + // We should have 2 subscriptions in both. Normal and Queue Subscriber + // for barAcc which are local, and 2 that are shadowed in fooAcc. + // Now make sure that when we unsubscribe we clean up properly for both. + if bslc := barAcc.sl.Count(); bslc != 2 { + t.Fatalf("Expected 2 normal subscriptions on barAcc, got %d", bslc) + } + if fslc := fooAcc.sl.Count(); fslc != 2 { + t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc) + } + + // Now unsubscribe. + if err := cbar.parse([]byte("UNSUB 1\r\nUNSUB 2\r\n")); err != nil { + t.Fatalf("Error for client 'bar' from server: %v", err) + } + + // We should have zero on both. + if bslc := barAcc.sl.Count(); bslc != 0 { + t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc) + } + if fslc := fooAcc.sl.Count(); fslc != 0 { + t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc) + } } func TestNoPrefixWildcardMapping(t *testing.T) { diff --git a/server/client.go b/server/client.go index 1a0f2480..bbbf66fd 100644 --- a/server/client.go +++ b/server/client.go @@ -242,7 +242,8 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState { // interest in published messages. type subscription struct { client *client - im *streamImport // This is for importing support. + im *streamImport // This is for import stream support. + shadow []*subscription // This is to track shadowed accounts. subject []byte queue []byte sid []byte @@ -1382,6 +1383,7 @@ func (c *client) checkAccountImports(sub *subscription) error { var rims [32]*streamImport var ims = rims[:0] + acc.mu.RLock() for _, im := range acc.imports.streams { if isSubsetMatch(tokens, im.prefix+im.from) { @@ -1390,6 +1392,8 @@ func (c *client) checkAccountImports(sub *subscription) error { } acc.mu.RUnlock() + var shadow []*subscription + // Now walk through collected importMaps for _, im := range ims { // We have a match for a local subscription with an import from another account. @@ -1402,9 +1406,20 @@ func (c *client) checkAccountImports(sub *subscription) error { nsub.subject = sub.subject[len(im.prefix):] } if err := im.acc.sl.Insert(&nsub); err != nil { - return fmt.Errorf("Could not add shadow import subscription for account %q", im.acc.Name) + errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name) + c.Debugf(errs) + return fmt.Errorf(errs) } + if shadow == nil { + shadow = make([]*subscription, 0, len(ims)) + } + shadow = append(shadow, &nsub) } + + c.mu.Lock() + sub.shadow = shadow + c.mu.Unlock() + return nil } @@ -1455,6 +1470,14 @@ func (c *client) unsubscribe(sub *subscription) { if c.typ == CLIENT && c.srv != nil && len(sub.queue) > 0 { c.srv.holdRemoteQSub(sub) } + + // Check to see if we have shadown subscriptions. + for _, nsub := range sub.shadow { + if err := nsub.im.acc.sl.Remove(nsub); err != nil { + c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name) + } + } + sub.shadow = nil } func (c *client) processUnsub(arg []byte) error {