[FIXED] LeafNode: possible duplicate messages in complex setup

This is specific to setup described [here](https://github.com/nats-io/nats-server/issues/3191#issuecomment-1296974382)
and does not require JetStream to be reproduced. The added test
reproduces the above setup but without JetStream enabled in
the accounts.

Each cluster has a leafnode for a given account to the other
cluster. The accounts import/export a subject. When a consumer
is connected to cluster "B" and the producer is on cluster "A"
there was a duplicate message. Due to shadow subscription caused
by the import/export rules, an additional subscription was
sent across the leafnode.

Resolves #3191

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-11-03 12:34:01 -06:00
parent 0f8aa11422
commit 91c84c03c2
2 changed files with 263 additions and 35 deletions

View File

@@ -845,45 +845,58 @@ func (l *loopDetectedLogger) Errorf(format string, v ...interface{}) {
}
func TestLeafNodeLoop(t *testing.T) {
// This test requires that we set the port to known value because
// we want A point to B and B to A.
oa := DefaultOptions()
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
oa.LeafNode.Port = 1234
ub, _ := url.Parse("nats://127.0.0.1:5678")
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}}
oa.LeafNode.connDelay = 50 * time.Millisecond
sa := RunServer(oa)
defer sa.Shutdown()
test := func(t *testing.T, cluster bool) {
// This test requires that we set the port to known value because
// we want A point to B and B to A.
oa := DefaultOptions()
if !cluster {
oa.Cluster.Port = 0
oa.Cluster.Name = _EMPTY_
}
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
oa.LeafNode.Port = 1234
ub, _ := url.Parse("nats://127.0.0.1:5678")
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}}
oa.LeafNode.connDelay = 50 * time.Millisecond
sa := RunServer(oa)
defer sa.Shutdown()
l := &loopDetectedLogger{ch: make(chan string, 1)}
sa.SetLogger(l, false, false)
l := &loopDetectedLogger{ch: make(chan string, 1)}
sa.SetLogger(l, false, false)
ob := DefaultOptions()
ob.Cluster.Name = "xyz"
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
ob.LeafNode.Port = 5678
ua, _ := url.Parse("nats://127.0.0.1:1234")
ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}}
ob.LeafNode.connDelay = 50 * time.Millisecond
sb := RunServer(ob)
defer sb.Shutdown()
ob := DefaultOptions()
if !cluster {
ob.Cluster.Port = 0
ob.Cluster.Name = _EMPTY_
} else {
ob.Cluster.Name = "xyz"
}
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
ob.LeafNode.Port = 5678
ua, _ := url.Parse("nats://127.0.0.1:1234")
ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}}
ob.LeafNode.connDelay = 50 * time.Millisecond
sb := RunServer(ob)
defer sb.Shutdown()
select {
case <-l.ch:
// OK!
case <-time.After(2 * time.Second):
t.Fatalf("Did not get any error regarding loop")
select {
case <-l.ch:
// OK!
case <-time.After(2 * time.Second):
t.Fatalf("Did not get any error regarding loop")
}
sb.Shutdown()
ob.Port = -1
ob.Cluster.Port = -1
ob.LeafNode.Remotes = nil
sb = RunServer(ob)
defer sb.Shutdown()
checkLeafNodeConnected(t, sa)
}
sb.Shutdown()
ob.Port = -1
ob.Cluster.Port = -1
ob.LeafNode.Remotes = nil
sb = RunServer(ob)
defer sb.Shutdown()
checkLeafNodeConnected(t, sa)
t.Run("standalone", func(t *testing.T) { test(t, false) })
t.Run("cluster", func(t *testing.T) { test(t, true) })
}
func TestLeafNodeLoopFromDAG(t *testing.T) {
@@ -4662,3 +4675,212 @@ func TestLeafNodePermsSuppressSubs(t *testing.T) {
// OK
}
}
func TestLeafNodeDuplicateMsg(t *testing.T) {
// This involves 2 clusters with leafnodes to each other with a different
// account, and those accounts import/export a subject that caused
// duplicate messages. This test requires static ports since we need to
// have A->B and B->A.
a1Conf := createConfFile(t, []byte(`
cluster : {
name : A
port : -1
}
leafnodes : {
port : 14333
remotes : [{
account : A
urls : [nats://leafa:pwd@127.0.0.1:24333]
}]
}
port : -1
server_name : A_1
accounts:{
A:{
users:[
{user: leafa, password: pwd},
{user: usera, password: usera, permissions: {
publish:{ allow:["iot.b.topic"] }
subscribe:{ allow:["iot.a.topic"] }
}}
]
imports:[
{stream:{account:"B", subject:"iot.a.topic"}}
]
},
B:{
users:[
{user: leafb, password: pwd},
]
exports:[
{stream: "iot.a.topic", accounts: ["A"]}
]
}
}
`))
defer removeFile(t, a1Conf)
a1, oa1 := RunServerWithConfig(a1Conf)
defer a1.Shutdown()
a2Conf := createConfFile(t, []byte(fmt.Sprintf(`
cluster : {
name : A
port : -1
routes : [nats://127.0.0.1:%d]
}
leafnodes : {
port : 14334
remotes : [{
account : A
urls : [nats://leafa:pwd@127.0.0.1:24334]
}]
}
port : -1
server_name : A_2
accounts:{
A:{
users:[
{user: leafa, password: pwd},
{user: usera, password: usera, permissions: {
publish:{ allow:["iot.b.topic"] }
subscribe:{ allow:["iot.a.topic"] }
}}
]
imports:[
{stream:{account:"B", subject:"iot.a.topic"}}
]
},
B:{
users:[
{user: leafb, password: pwd},
]
exports:[
{stream: "iot.a.topic", accounts: ["A"]}
]
}
}`, oa1.Cluster.Port)))
defer removeFile(t, a2Conf)
a2, _ := RunServerWithConfig(a2Conf)
defer a2.Shutdown()
checkClusterFormed(t, a1, a2)
b1Conf := createConfFile(t, []byte(`
cluster : {
name : B
port : -1
}
leafnodes : {
port : 24333
remotes : [{
account : B
urls : [nats://leafb:pwd@127.0.0.1:14333]
}]
}
port : -1
server_name : B_1
accounts:{
A:{
users:[
{user: leafa, password: pwd},
]
exports:[
{stream: "iot.b.topic", accounts: ["B"]}
]
},
B:{
users:[
{user: leafb, password: pwd},
{user: userb, password: userb, permissions: {
publish:{ allow:["iot.a.topic"] },
subscribe:{ allow:["iot.b.topic"] }
}}
]
imports:[
{stream:{account:"A", subject:"iot.b.topic"}}
]
}
}`))
defer removeFile(t, b1Conf)
b1, ob1 := RunServerWithConfig(b1Conf)
defer b1.Shutdown()
b2Conf := createConfFile(t, []byte(fmt.Sprintf(`
cluster : {
name : B
port : -1
routes : [nats://127.0.0.1:%d]
}
leafnodes : {
port : 24334
remotes : [{
account : B
urls : [nats://leafb:pwd@127.0.0.1:14334]
}]
}
port : -1
server_name : B_2
accounts:{
A:{
users:[
{user: leafa, password: pwd},
]
exports:[
{stream: "iot.b.topic", accounts: ["B"]}
]
},
B:{
users:[
{user: leafb, password: pwd},
{user: userb, password: userb, permissions: {
publish:{ allow:["iot.a.topic"] },
subscribe:{ allow:["iot.b.topic"] }
}}
]
imports:[
{stream:{account:"A", subject:"iot.b.topic"}}
]
}
}`, ob1.Cluster.Port)))
defer removeFile(t, b2Conf)
b2, _ := RunServerWithConfig(b2Conf)
defer b2.Shutdown()
checkClusterFormed(t, b1, b2)
checkLeafNodeConnectedCount(t, a1, 2)
checkLeafNodeConnectedCount(t, a2, 2)
checkLeafNodeConnectedCount(t, b1, 2)
checkLeafNodeConnectedCount(t, b2, 2)
check := func(t *testing.T, subSrv *Server, pubSrv *Server) {
sc := natsConnect(t, subSrv.ClientURL(), nats.UserInfo("userb", "userb"))
defer sc.Close()
subject := "iot.b.topic"
sub := natsSubSync(t, sc, subject)
// Wait for this to be available in A cluster
checkSubInterest(t, a1, "A", subject, time.Second)
checkSubInterest(t, a2, "A", subject, time.Second)
pb := natsConnect(t, pubSrv.ClientURL(), nats.UserInfo("usera", "usera"))
defer pb.Close()
natsPub(t, pb, subject, []byte("msg"))
natsNexMsg(t, sub, time.Second)
// Should be only 1
if msg, err := sub.NextMsg(100 * time.Millisecond); err == nil {
t.Fatalf("Received duplicate on %q: %s", msg.Subject, msg.Data)
}
}
t.Run("sub_b1_pub_a1", func(t *testing.T) { check(t, b1, a1) })
t.Run("sub_b1_pub_a2", func(t *testing.T) { check(t, b1, a2) })
t.Run("sub_b2_pub_a1", func(t *testing.T) { check(t, b2, a1) })
t.Run("sub_b2_pub_a2", func(t *testing.T) { check(t, b2, a2) })
}