From 8af8cf4e674811ff1111ed9d17bc01e07bdfdfc9 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 8 Feb 2021 14:41:33 -0700 Subject: [PATCH] Remap subject only for service imports Also optimized a test that was taking too long to run. Signed-off-by: Ivan Kozlovic --- server/accounts.go | 4 +++- server/accounts_test.go | 31 ++++++++++++++++++++++--------- server/client.go | 5 +++-- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 35bba947..38c10e9e 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2027,7 +2027,9 @@ func (a *Account) createRespWildcard() []byte { a.mu.Unlock() // Create subscription and internal callback for all the wildcard response subjects. - c.processSub(wcsub, nil, []byte(sid), a.processServiceImportResponse, false) + if sub, err := c.processSub(wcsub, nil, []byte(sid), a.processServiceImportResponse, false); err == nil { + sub.rsi = true + } return pre } diff --git a/server/accounts_test.go b/server/accounts_test.go index f84326bc..3bc5eef8 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -117,14 +118,20 @@ func TestAccountIsolationExportImport(t *testing.T) { pubSubj: 0, "fizz": 0, } - sub, err := ncImp.Subscribe(">", func(m *nats.Msg) { + count := int32(0) + ch := make(chan struct{}, 1) + if _, err := ncImp.Subscribe(">", func(m *nats.Msg) { gotSubjs[m.Subject] += 1 - }) - if err != nil { - t.Fatal(err) + if n := atomic.AddInt32(&count, 1); n == 3 { + ch <- struct{}{} + } + }); err != nil { + t.Fatalf("Error on subscribe: %v", err) } + // Since both prod and cons use same server, flushing here will ensure + // that the interest is registered and known at the time we publish. + ncImp.Flush() - time.Sleep(1 * time.Second) if err := ncExp.Publish(pubSubj, []byte(fmt.Sprintf("ncExp pub %s", pubSubj))); err != nil { t.Fatal(err) } @@ -138,10 +145,6 @@ func TestAccountIsolationExportImport(t *testing.T) { if err := ncImp.Publish("fizz", []byte("ncImp pub fizz")); err != nil { t.Fatal(err) } - time.Sleep(1 * time.Second) - if err := sub.Unsubscribe(); err != nil { - t.Fatal(err) - } wantSubjs := map[string]int{ // Subscriber ncImp should receive publishes from ncExp and ncImp. @@ -149,6 +152,16 @@ func TestAccountIsolationExportImport(t *testing.T) { // Subscriber ncImp should only receive the publish from ncImp. "fizz": 1, } + + // Wait for at least the 3 expected messages + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("Expected 3 messages, got %v", atomic.LoadInt32(&count)) + } + // But now wait a bit to see if subscription receives more than expected. + time.Sleep(50 * time.Millisecond) + if got, want := len(gotSubjs), len(wantSubjs); got != want { t.Fatalf("unexpected subjs len, got=%d; want=%d", got, want) } diff --git a/server/client.go b/server/client.go index d328f69b..6a7b4f7a 100644 --- a/server/client.go +++ b/server/client.go @@ -472,7 +472,8 @@ func (c *client) clientType() int { // to optionally have an opts section for non-normal stuff. type subscription struct { client *client - im *streamImport // This is for import stream support. + im *streamImport // This is for import stream support. + rsi bool shadow []*subscription // This is to track shadowed accounts. icb msgHandler subject []byte @@ -3885,7 +3886,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, } // Remap to the original subject if internal. - if sub.icb != nil { + if sub.icb != nil && sub.rsi { subj = subject }