mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 18:20:42 -07:00
Merge pull request #775 from nats-io/shadow
Cleanup of shadowed subscriptions, fixes #772
This commit is contained in:
@@ -529,10 +529,8 @@ func TestSimpleMapping(t *testing.T) {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Normal Subscription on bar client.
|
||||
go cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\nPING\r\n"))
|
||||
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
|
||||
if err != nil {
|
||||
// Normal and Queue Subscription on bar client.
|
||||
if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil {
|
||||
t.Fatalf("Error for client 'bar' from server: %v", err)
|
||||
}
|
||||
|
||||
@@ -568,6 +566,29 @@ func TestSimpleMapping(t *testing.T) {
|
||||
}
|
||||
checkMsg(l, "2")
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
|
||||
// We should have 2 subscriptions in both. Normal and Queue Subscriber
|
||||
// for barAcc which are local, and 2 that are shadowed in fooAcc.
|
||||
// Now make sure that when we unsubscribe we clean up properly for both.
|
||||
if bslc := barAcc.sl.Count(); bslc != 2 {
|
||||
t.Fatalf("Expected 2 normal subscriptions on barAcc, got %d", bslc)
|
||||
}
|
||||
if fslc := fooAcc.sl.Count(); fslc != 2 {
|
||||
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
|
||||
}
|
||||
|
||||
// Now unsubscribe.
|
||||
if err := cbar.parse([]byte("UNSUB 1\r\nUNSUB 2\r\n")); err != nil {
|
||||
t.Fatalf("Error for client 'bar' from server: %v", err)
|
||||
}
|
||||
|
||||
// We should have zero on both.
|
||||
if bslc := barAcc.sl.Count(); bslc != 0 {
|
||||
t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc)
|
||||
}
|
||||
if fslc := fooAcc.sl.Count(); fslc != 0 {
|
||||
t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoPrefixWildcardMapping(t *testing.T) {
|
||||
|
||||
@@ -242,7 +242,8 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState {
|
||||
// interest in published messages.
|
||||
type subscription struct {
|
||||
client *client
|
||||
im *streamImport // This is for importing support.
|
||||
im *streamImport // This is for import stream support.
|
||||
shadow []*subscription // This is to track shadowed accounts.
|
||||
subject []byte
|
||||
queue []byte
|
||||
sid []byte
|
||||
@@ -1382,6 +1383,7 @@ func (c *client) checkAccountImports(sub *subscription) error {
|
||||
|
||||
var rims [32]*streamImport
|
||||
var ims = rims[:0]
|
||||
|
||||
acc.mu.RLock()
|
||||
for _, im := range acc.imports.streams {
|
||||
if isSubsetMatch(tokens, im.prefix+im.from) {
|
||||
@@ -1390,6 +1392,8 @@ func (c *client) checkAccountImports(sub *subscription) error {
|
||||
}
|
||||
acc.mu.RUnlock()
|
||||
|
||||
var shadow []*subscription
|
||||
|
||||
// Now walk through collected importMaps
|
||||
for _, im := range ims {
|
||||
// We have a match for a local subscription with an import from another account.
|
||||
@@ -1402,9 +1406,20 @@ func (c *client) checkAccountImports(sub *subscription) error {
|
||||
nsub.subject = sub.subject[len(im.prefix):]
|
||||
}
|
||||
if err := im.acc.sl.Insert(&nsub); err != nil {
|
||||
return fmt.Errorf("Could not add shadow import subscription for account %q", im.acc.Name)
|
||||
errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name)
|
||||
c.Debugf(errs)
|
||||
return fmt.Errorf(errs)
|
||||
}
|
||||
if shadow == nil {
|
||||
shadow = make([]*subscription, 0, len(ims))
|
||||
}
|
||||
shadow = append(shadow, &nsub)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
sub.shadow = shadow
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1455,6 +1470,14 @@ func (c *client) unsubscribe(sub *subscription) {
|
||||
if c.typ == CLIENT && c.srv != nil && len(sub.queue) > 0 {
|
||||
c.srv.holdRemoteQSub(sub)
|
||||
}
|
||||
|
||||
// Check to see if we have shadown subscriptions.
|
||||
for _, nsub := range sub.shadow {
|
||||
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
|
||||
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
|
||||
}
|
||||
}
|
||||
sub.shadow = nil
|
||||
}
|
||||
|
||||
func (c *client) processUnsub(arg []byte) error {
|
||||
|
||||
Reference in New Issue
Block a user