mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[CHANGED] LeafNode: remotes from same server binding to same hub account
Previously, the server would reject a second remote leafnode connection from the same server if it was binding to the same account on the hub even if the remote was using different local accounts. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -624,6 +624,9 @@ type ClientOpts struct {
|
||||
// Routes and Leafnodes only
|
||||
Import *SubjectPermission `json:"import,omitempty"`
|
||||
Export *SubjectPermission `json:"export,omitempty"`
|
||||
|
||||
// Leafnodes
|
||||
RemoteAccount string `json:"remote_account,omitempty"`
|
||||
}
|
||||
|
||||
var defaultOpts = ClientOpts{Verbose: true, Pedantic: true, Echo: true}
|
||||
|
||||
@@ -78,6 +78,8 @@ type leaf struct {
|
||||
remoteServer string
|
||||
// domain name of remote server
|
||||
remoteDomain string
|
||||
// account name of remote server
|
||||
remoteAccName string
|
||||
// Used to suppress sub and unsub interest. Same as routes but our audience
|
||||
// here is tied to this leaf node. This will hold all subscriptions except this
|
||||
// leaf nodes. This represents all the interest we want to send to the other side.
|
||||
@@ -348,8 +350,8 @@ func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
// Changes in the list of remote leaf nodes is not supported.
|
||||
// However, make sure that we don't go over the arrays.
|
||||
@@ -764,16 +766,17 @@ var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-
|
||||
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
|
||||
// We support basic user/pass and operator based user JWT with signatures.
|
||||
cinfo := leafConnectInfo{
|
||||
Version: VERSION,
|
||||
ID: c.srv.info.ID,
|
||||
Domain: c.srv.info.Domain,
|
||||
Name: c.srv.info.Name,
|
||||
Hub: c.leaf.remote.Hub,
|
||||
Cluster: clusterName,
|
||||
Headers: headers,
|
||||
JetStream: c.acc.jetStreamConfigured(),
|
||||
DenyPub: c.leaf.remote.DenyImports,
|
||||
Compression: c.leaf.compression,
|
||||
Version: VERSION,
|
||||
ID: c.srv.info.ID,
|
||||
Domain: c.srv.info.Domain,
|
||||
Name: c.srv.info.Name,
|
||||
Hub: c.leaf.remote.Hub,
|
||||
Cluster: clusterName,
|
||||
Headers: headers,
|
||||
JetStream: c.acc.jetStreamConfigured(),
|
||||
DenyPub: c.leaf.remote.DenyImports,
|
||||
Compression: c.leaf.compression,
|
||||
RemoteAccount: c.acc.GetName(),
|
||||
}
|
||||
|
||||
// If a signature callback is specified, this takes precedence over anything else.
|
||||
@@ -1310,6 +1313,8 @@ func (c *client) processLeafnodeInfo(info *Info) {
|
||||
// Clear deadline that was set in createLeafNode while waiting for the INFO.
|
||||
c.nc.SetDeadline(time.Time{})
|
||||
resumeConnect = true
|
||||
} else if !firstINFO && didSolicit {
|
||||
c.leaf.remoteAccName = info.RemoteAccount
|
||||
}
|
||||
|
||||
// Check if we have the remote account information and if so make sure it's stored.
|
||||
@@ -1503,6 +1508,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
|
||||
}
|
||||
myRemoteDomain := c.leaf.remoteDomain
|
||||
mySrvName := c.leaf.remoteServer
|
||||
remoteAccName := c.leaf.remoteAccName
|
||||
myClustName := c.leaf.remoteCluster
|
||||
solicited := c.leaf.remote != nil
|
||||
c.mu.Unlock()
|
||||
@@ -1518,7 +1524,8 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
|
||||
// We have code for the loop detection elsewhere, which also delays
|
||||
// attempt to reconnect.
|
||||
if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
|
||||
ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName {
|
||||
ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
|
||||
remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
|
||||
old = ol
|
||||
}
|
||||
ol.mu.Unlock()
|
||||
@@ -1693,6 +1700,9 @@ type leafConnectInfo struct {
|
||||
|
||||
// Just used to detect wrong connection attempts.
|
||||
Gateway string `json:"gateway,omitempty"`
|
||||
|
||||
// Tells the accept side which account the remote is binding to.
|
||||
RemoteAccount string `json:"remote_account,omitempty"`
|
||||
}
|
||||
|
||||
// processLeafNodeConnect will process the inbound connect args.
|
||||
@@ -1774,6 +1784,8 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
|
||||
// Remember the remote server.
|
||||
c.leaf.remoteServer = proto.Name
|
||||
// Remember the remote account name
|
||||
c.leaf.remoteAccName = proto.RemoteAccount
|
||||
|
||||
// If the other side has declared itself a hub, so we will take on the spoke role.
|
||||
if proto.Hub {
|
||||
|
||||
@@ -2185,65 +2185,80 @@ func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) {
|
||||
checkLeafNodeConnected(t, sl)
|
||||
}
|
||||
|
||||
func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) {
|
||||
func TestLeafNodeTwoRemotesBindToSameHubAccount(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.LeafNode.Host = "127.0.0.1"
|
||||
opts.LeafNode.Port = -1
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
conf := `
|
||||
listen: 127.0.0.1:-1
|
||||
cluster { name: ln22, listen: 127.0.0.1:-1 }
|
||||
accounts {
|
||||
a { users [ {user: a, password: a} ]}
|
||||
b { users [ {user: b, password: b} ]}
|
||||
}
|
||||
leafnodes {
|
||||
remotes = [
|
||||
{
|
||||
url: nats-leaf://127.0.0.1:%d
|
||||
account: a
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
account string
|
||||
fail bool
|
||||
}{
|
||||
{"different local accounts", "b", false},
|
||||
{"same local accounts", "a", true},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
conf := `
|
||||
listen: 127.0.0.1:-1
|
||||
cluster { name: ln22, listen: 127.0.0.1:-1 }
|
||||
accounts {
|
||||
a { users [ {user: a, password: a} ]}
|
||||
b { users [ {user: b, password: b} ]}
|
||||
}
|
||||
{
|
||||
url: nats-leaf://127.0.0.1:%d
|
||||
account: b
|
||||
leafnodes {
|
||||
remotes = [
|
||||
{
|
||||
url: nats-leaf://127.0.0.1:%[1]d
|
||||
account: a
|
||||
}
|
||||
{
|
||||
url: nats-leaf://127.0.0.1:%[1]d
|
||||
account: %s
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
lconf := createConfFile(t, []byte(fmt.Sprintf(conf, opts.LeafNode.Port, opts.LeafNode.Port)))
|
||||
`
|
||||
lconf := createConfFile(t, []byte(fmt.Sprintf(conf, opts.LeafNode.Port, test.account)))
|
||||
|
||||
lopts, err := ProcessConfigFile(lconf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error loading config file: %v", err)
|
||||
}
|
||||
lopts.NoLog = false
|
||||
ln, err := NewServer(lopts)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating server: %v", err)
|
||||
}
|
||||
defer ln.Shutdown()
|
||||
l := &captureErrorLogger{errCh: make(chan string, 10)}
|
||||
ln.SetLogger(l, false, false)
|
||||
lopts, err := ProcessConfigFile(lconf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error loading config file: %v", err)
|
||||
}
|
||||
lopts.NoLog = false
|
||||
ln, err := NewServer(lopts)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating server: %v", err)
|
||||
}
|
||||
defer ln.Shutdown()
|
||||
l := &captureErrorLogger{errCh: make(chan string, 10)}
|
||||
ln.SetLogger(l, false, false)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ln.Start()
|
||||
}()
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ln.Start()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-l.errCh:
|
||||
if !strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Did not get any error")
|
||||
select {
|
||||
case err := <-l.errCh:
|
||||
if test.fail && !strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) {
|
||||
t.Fatalf("Did not get expected duplicate connection error: %v", err)
|
||||
} else if !test.fail && strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) {
|
||||
t.Fatalf("Incorrectly detected a duplicate connection: %v", err)
|
||||
}
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
if test.fail {
|
||||
t.Fatal("Did not get expected error")
|
||||
}
|
||||
}
|
||||
ln.Shutdown()
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
ln.Shutdown()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
|
||||
@@ -6443,3 +6458,348 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
|
||||
})
|
||||
require_True(t, r2.Load() > r1.Load())
|
||||
}
|
||||
|
||||
func TestLeafNodeTwoRemotesToSameHubAccount(t *testing.T) {
|
||||
conf1 := createConfFile(t, []byte(`
|
||||
port: -1
|
||||
server_name: "hub"
|
||||
accounts {
|
||||
HA { users: [{user: ha, password: pwd}] }
|
||||
}
|
||||
leafnodes {
|
||||
port: -1
|
||||
}
|
||||
`))
|
||||
s1, o1 := RunServerWithConfig(conf1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
conf2 := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
port: -1
|
||||
server_name: "spoke"
|
||||
accounts {
|
||||
A { users: [{user: A, password: pwd}] }
|
||||
B { users: [{user: B, password: pwd}] }
|
||||
C { users: [{user: C, password: pwd}] }
|
||||
}
|
||||
leafnodes {
|
||||
remotes [
|
||||
{
|
||||
url: "nats://ha:pwd@127.0.0.1:%d"
|
||||
local: "A"
|
||||
}
|
||||
{
|
||||
url: "nats://ha:pwd@127.0.0.1:%d"
|
||||
local: "C"
|
||||
}
|
||||
]
|
||||
}
|
||||
`, o1.LeafNode.Port, o1.LeafNode.Port)))
|
||||
s2, _ := RunServerWithConfig(conf2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
l := &captureErrorLogger{errCh: make(chan string, 10)}
|
||||
s2.SetLogger(l, false, false)
|
||||
|
||||
checkLeafNodeConnectedCount(t, s2, 2)
|
||||
|
||||
// Make sure we don't get duplicate leafnode connection errors
|
||||
deadline := time.NewTimer(1500 * time.Millisecond)
|
||||
for done := false; !done; {
|
||||
select {
|
||||
case err := <-l.errCh:
|
||||
if strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) {
|
||||
t.Fatalf("Got error: %v", err)
|
||||
}
|
||||
case <-deadline.C:
|
||||
done = true
|
||||
}
|
||||
}
|
||||
|
||||
nca := natsConnect(t, s2.ClientURL(), nats.UserInfo("A", "pwd"))
|
||||
defer nca.Close()
|
||||
suba := natsSubSync(t, nca, "A")
|
||||
ncb := natsConnect(t, s2.ClientURL(), nats.UserInfo("B", "pwd"))
|
||||
defer ncb.Close()
|
||||
subb := natsSubSync(t, ncb, "B")
|
||||
ncc := natsConnect(t, s2.ClientURL(), nats.UserInfo("C", "pwd"))
|
||||
defer ncc.Close()
|
||||
subc := natsSubSync(t, ncc, "C")
|
||||
subs := map[string]*nats.Subscription{"A": suba, "B": subb, "C": subc}
|
||||
|
||||
for _, subj := range []string{"A", "C"} {
|
||||
checkSubInterest(t, s1, "HA", subj, time.Second)
|
||||
}
|
||||
|
||||
nc := natsConnect(t, s1.ClientURL(), nats.UserInfo("ha", "pwd"))
|
||||
defer nc.Close()
|
||||
|
||||
for _, subj := range []string{"A", "B", "C"} {
|
||||
natsPub(t, nc, subj, []byte("hello"))
|
||||
}
|
||||
|
||||
for _, subj := range []string{"A", "B", "C"} {
|
||||
var expected bool
|
||||
if subj != "B" {
|
||||
expected = true
|
||||
}
|
||||
sub := subs[subj]
|
||||
if expected {
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
} else {
|
||||
if _, err := sub.NextMsg(50 * time.Millisecond); err != nats.ErrTimeout {
|
||||
t.Fatalf("Expected timeout error, got %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeTwoRemotesToSameHubAccountWithClusters(t *testing.T) {
|
||||
hubTmpl := `
|
||||
port: -1
|
||||
server_name: "%s"
|
||||
accounts {
|
||||
HA { users: [{user: HA, password: pwd}] }
|
||||
}
|
||||
cluster {
|
||||
name: "hub"
|
||||
port: -1
|
||||
%s
|
||||
}
|
||||
leafnodes {
|
||||
port: -1
|
||||
}
|
||||
`
|
||||
confH1 := createConfFile(t, []byte(fmt.Sprintf(hubTmpl, "H1", _EMPTY_)))
|
||||
sh1, oh1 := RunServerWithConfig(confH1)
|
||||
defer sh1.Shutdown()
|
||||
|
||||
confH2 := createConfFile(t, []byte(fmt.Sprintf(hubTmpl, "H2", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", oh1.Cluster.Port))))
|
||||
sh2, oh2 := RunServerWithConfig(confH2)
|
||||
defer sh2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, sh1, sh2)
|
||||
|
||||
spokeTmpl := `
|
||||
port: -1
|
||||
server_name: "%s"
|
||||
accounts {
|
||||
A { users: [{user: A, password: pwd}] }
|
||||
B { users: [{user: B, password: pwd}] }
|
||||
}
|
||||
cluster {
|
||||
name: "spoke"
|
||||
port: -1
|
||||
%s
|
||||
}
|
||||
leafnodes {
|
||||
remotes [
|
||||
{
|
||||
url: "nats://HA:pwd@127.0.0.1:%d"
|
||||
local: "A"
|
||||
}
|
||||
{
|
||||
url: "nats://HA:pwd@127.0.0.1:%d"
|
||||
local: "B"
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
sp2Leafport int
|
||||
}{
|
||||
{"connect to different hub servers", oh2.LeafNode.Port},
|
||||
{"connect to same hub server", oh1.LeafNode.Port},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
confSP1 := createConfFile(t, []byte(fmt.Sprintf(spokeTmpl, "SP1", _EMPTY_, oh1.LeafNode.Port, oh1.LeafNode.Port)))
|
||||
sp1, osp1 := RunServerWithConfig(confSP1)
|
||||
defer sp1.Shutdown()
|
||||
|
||||
confSP2 := createConfFile(t, []byte(fmt.Sprintf(spokeTmpl, "SP2",
|
||||
fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", osp1.Cluster.Port), test.sp2Leafport, test.sp2Leafport)))
|
||||
sp2, _ := RunServerWithConfig(confSP2)
|
||||
defer sp2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, sp1, sp2)
|
||||
checkLeafNodeConnectedCount(t, sp1, 2)
|
||||
checkLeafNodeConnectedCount(t, sp2, 2)
|
||||
|
||||
var conns []*nats.Conn
|
||||
createConn := func(s *Server, user string) {
|
||||
t.Helper()
|
||||
nc := natsConnect(t, s.ClientURL(), nats.UserInfo(user, "pwd"))
|
||||
conns = append(conns, nc)
|
||||
}
|
||||
for _, nc := range conns {
|
||||
defer nc.Close()
|
||||
}
|
||||
createConn(sh1, "HA")
|
||||
createConn(sh2, "HA")
|
||||
createConn(sp1, "A")
|
||||
createConn(sp2, "A")
|
||||
createConn(sp1, "B")
|
||||
createConn(sp2, "B")
|
||||
|
||||
check := func(subConn *nats.Conn, subj string, checkA, checkB bool) {
|
||||
t.Helper()
|
||||
sub := natsSubSync(t, subConn, subj)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
checkSubInterest(t, sh1, "HA", subj, time.Second)
|
||||
checkSubInterest(t, sh2, "HA", subj, time.Second)
|
||||
if checkA {
|
||||
checkSubInterest(t, sp1, "A", subj, time.Second)
|
||||
checkSubInterest(t, sp2, "A", subj, time.Second)
|
||||
}
|
||||
if checkB {
|
||||
checkSubInterest(t, sp1, "B", subj, time.Second)
|
||||
checkSubInterest(t, sp2, "B", subj, time.Second)
|
||||
}
|
||||
|
||||
for i, ncp := range conns {
|
||||
// Don't publish from account "A" connections if we are
|
||||
// dealing with account "B", and vice-versa.
|
||||
if !checkA && i >= 2 && i <= 3 {
|
||||
continue
|
||||
}
|
||||
if !checkB && i >= 4 {
|
||||
continue
|
||||
}
|
||||
natsPub(t, ncp, subj, []byte("hello"))
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
// Make sure we don't get a duplicate
|
||||
if msg, err := sub.NextMsg(50 * time.Millisecond); err != nats.ErrTimeout {
|
||||
t.Fatalf("Unexpected message or error: msg=%v - err=%v", msg, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
check(conns[0], "HA.1", true, true)
|
||||
check(conns[1], "HA.2", true, true)
|
||||
check(conns[2], "SPA.1", true, false)
|
||||
check(conns[3], "SPA.2", true, false)
|
||||
check(conns[4], "SPB.1", false, true)
|
||||
check(conns[5], "SPB.2", false, true)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeSameLocalAccountToMultipleHubs(t *testing.T) {
|
||||
hub1Conf := createConfFile(t, []byte(`
|
||||
port: -1
|
||||
server_name: hub1
|
||||
accounts {
|
||||
hub1 { users: [{user: hub1, password: pwd}] }
|
||||
}
|
||||
leafnodes {
|
||||
port: -1
|
||||
}
|
||||
`))
|
||||
sh1, oh1 := RunServerWithConfig(hub1Conf)
|
||||
defer sh1.Shutdown()
|
||||
|
||||
hub2Conf := createConfFile(t, []byte(`
|
||||
port: -1
|
||||
server_name: hub2
|
||||
accounts {
|
||||
hub2 { users: [{user: hub2, password: pwd}] }
|
||||
}
|
||||
leafnodes {
|
||||
port: -1
|
||||
}
|
||||
`))
|
||||
sh2, oh2 := RunServerWithConfig(hub2Conf)
|
||||
defer sh2.Shutdown()
|
||||
|
||||
lconf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
port: -1
|
||||
server_name: leaf
|
||||
accounts {
|
||||
A { users: [{user: A, password: pwd}] }
|
||||
B { users: [{user: B, password: pwd}] }
|
||||
C { users: [{user: C, password: pwd}] }
|
||||
}
|
||||
leafnodes {
|
||||
port: -1
|
||||
remotes [
|
||||
{
|
||||
url: nats://hub1:pwd@127.0.0.1:%[1]d
|
||||
local: "A"
|
||||
}
|
||||
{
|
||||
url: nats://hub1:pwd@127.0.0.1:%[1]d
|
||||
local: "C"
|
||||
}
|
||||
{
|
||||
url: nats://hub2:pwd@127.0.0.1:%[2]d
|
||||
local: "A"
|
||||
}
|
||||
{
|
||||
url: nats://hub2:pwd@127.0.0.1:%[2]d
|
||||
local: "B"
|
||||
}
|
||||
]
|
||||
}
|
||||
`, oh1.LeafNode.Port, oh2.LeafNode.Port)))
|
||||
s, _ := RunServerWithConfig(lconf)
|
||||
defer s.Shutdown()
|
||||
|
||||
// The leafnode to hub1 should have 2 connections (A and C)
|
||||
// while the one to hub2 should have 2 connections (A and B)
|
||||
checkLeafNodeConnectedCount(t, sh1, 2)
|
||||
checkLeafNodeConnectedCount(t, sh2, 2)
|
||||
checkLeafNodeConnectedCount(t, s, 4)
|
||||
|
||||
nca := natsConnect(t, s.ClientURL(), nats.UserInfo("A", "pwd"))
|
||||
defer nca.Close()
|
||||
suba := natsSubSync(t, nca, "A")
|
||||
ncb := natsConnect(t, s.ClientURL(), nats.UserInfo("B", "pwd"))
|
||||
defer ncb.Close()
|
||||
subb := natsSubSync(t, ncb, "B")
|
||||
ncc := natsConnect(t, s.ClientURL(), nats.UserInfo("C", "pwd"))
|
||||
defer ncc.Close()
|
||||
subc := natsSubSync(t, ncc, "C")
|
||||
|
||||
checkSubInterest(t, sh1, "hub1", "A", time.Second)
|
||||
checkSubNoInterest(t, sh1, "hub1", "B", time.Second)
|
||||
checkSubInterest(t, sh1, "hub1", "C", time.Second)
|
||||
|
||||
checkSubInterest(t, sh2, "hub2", "A", time.Second)
|
||||
checkSubInterest(t, sh2, "hub2", "B", time.Second)
|
||||
checkSubNoInterest(t, sh2, "hub2", "C", time.Second)
|
||||
|
||||
nch1 := natsConnect(t, sh1.ClientURL(), nats.UserInfo("hub1", "pwd"))
|
||||
defer nch1.Close()
|
||||
nch2 := natsConnect(t, sh2.ClientURL(), nats.UserInfo("hub2", "pwd"))
|
||||
defer nch2.Close()
|
||||
|
||||
checkNoMsg := func(sub *nats.Subscription) {
|
||||
t.Helper()
|
||||
if msg, err := sub.NextMsg(50 * time.Millisecond); err != nats.ErrTimeout {
|
||||
t.Fatalf("Unexpected message: %s", msg.Data)
|
||||
}
|
||||
}
|
||||
|
||||
checkSub := func(sub *nats.Subscription, subj, payload string) {
|
||||
t.Helper()
|
||||
msg := natsNexMsg(t, sub, time.Second)
|
||||
require_Equal(t, subj, msg.Subject)
|
||||
require_Equal(t, payload, string(msg.Data))
|
||||
// Make sure we don't get duplicates
|
||||
checkNoMsg(sub)
|
||||
}
|
||||
|
||||
natsPub(t, nch1, "A", []byte("msgA1"))
|
||||
checkSub(suba, "A", "msgA1")
|
||||
natsPub(t, nch1, "B", []byte("msgB1"))
|
||||
checkNoMsg(subb)
|
||||
natsPub(t, nch1, "C", []byte("msgC1"))
|
||||
checkSub(subc, "C", "msgC1")
|
||||
|
||||
natsPub(t, nch2, "A", []byte("msgA2"))
|
||||
checkSub(suba, "A", "msgA2")
|
||||
natsPub(t, nch2, "B", []byte("msgB2"))
|
||||
checkSub(subb, "B", "msgB2")
|
||||
natsPub(t, nch2, "C", []byte("msgC2"))
|
||||
checkNoMsg(subc)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user