mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Remap subject only for service imports
Also optimized a test that was taking too long to run. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -2027,7 +2027,9 @@ func (a *Account) createRespWildcard() []byte {
|
||||
a.mu.Unlock()
|
||||
|
||||
// Create subscription and internal callback for all the wildcard response subjects.
|
||||
c.processSub(wcsub, nil, []byte(sid), a.processServiceImportResponse, false)
|
||||
if sub, err := c.processSub(wcsub, nil, []byte(sid), a.processServiceImportResponse, false); err == nil {
|
||||
sub.rsi = true
|
||||
}
|
||||
|
||||
return pre
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -117,14 +118,20 @@ func TestAccountIsolationExportImport(t *testing.T) {
|
||||
pubSubj: 0,
|
||||
"fizz": 0,
|
||||
}
|
||||
sub, err := ncImp.Subscribe(">", func(m *nats.Msg) {
|
||||
count := int32(0)
|
||||
ch := make(chan struct{}, 1)
|
||||
if _, err := ncImp.Subscribe(">", func(m *nats.Msg) {
|
||||
gotSubjs[m.Subject] += 1
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
if n := atomic.AddInt32(&count, 1); n == 3 {
|
||||
ch <- struct{}{}
|
||||
}
|
||||
}); err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
// Since both prod and cons use same server, flushing here will ensure
|
||||
// that the interest is registered and known at the time we publish.
|
||||
ncImp.Flush()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
if err := ncExp.Publish(pubSubj, []byte(fmt.Sprintf("ncExp pub %s", pubSubj))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -138,10 +145,6 @@ func TestAccountIsolationExportImport(t *testing.T) {
|
||||
if err := ncImp.Publish("fizz", []byte("ncImp pub fizz")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
if err := sub.Unsubscribe(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wantSubjs := map[string]int{
|
||||
// Subscriber ncImp should receive publishes from ncExp and ncImp.
|
||||
@@ -149,6 +152,16 @@ func TestAccountIsolationExportImport(t *testing.T) {
|
||||
// Subscriber ncImp should only receive the publish from ncImp.
|
||||
"fizz": 1,
|
||||
}
|
||||
|
||||
// Wait for at least the 3 expected messages
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("Expected 3 messages, got %v", atomic.LoadInt32(&count))
|
||||
}
|
||||
// But now wait a bit to see if subscription receives more than expected.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if got, want := len(gotSubjs), len(wantSubjs); got != want {
|
||||
t.Fatalf("unexpected subjs len, got=%d; want=%d", got, want)
|
||||
}
|
||||
|
||||
@@ -472,7 +472,8 @@ func (c *client) clientType() int {
|
||||
// to optionally have an opts section for non-normal stuff.
|
||||
type subscription struct {
|
||||
client *client
|
||||
im *streamImport // This is for import stream support.
|
||||
im *streamImport // This is for import stream support.
|
||||
rsi bool
|
||||
shadow []*subscription // This is to track shadowed accounts.
|
||||
icb msgHandler
|
||||
subject []byte
|
||||
@@ -3885,7 +3886,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
|
||||
}
|
||||
|
||||
// Remap to the original subject if internal.
|
||||
if sub.icb != nil {
|
||||
if sub.icb != nil && sub.rsi {
|
||||
subj = subject
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user