mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
Merge pull request #1848 from nats-io/fix_some_flappers
Fixing some flappers (leafnode and mqtt)
This commit is contained in:
@@ -1880,6 +1880,7 @@ func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) {
|
||||
func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
|
||||
// This set the cluster name to "abc"
|
||||
oSrv1 := DefaultOptions()
|
||||
oSrv1.ServerName = "srv1"
|
||||
oSrv1.LeafNode.Host = "127.0.0.1"
|
||||
oSrv1.LeafNode.Port = -1
|
||||
srv1 := RunServer(oSrv1)
|
||||
@@ -1891,6 +1892,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
|
||||
}
|
||||
|
||||
oLeaf1 := DefaultOptions()
|
||||
oLeaf1.ServerName = "leaf1"
|
||||
oLeaf1.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u}}}
|
||||
leaf1 := RunServer(oLeaf1)
|
||||
defer leaf1.Shutdown()
|
||||
@@ -1898,6 +1900,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
|
||||
leaf1ClusterURL := fmt.Sprintf("nats://127.0.0.1:%d", oLeaf1.Cluster.Port)
|
||||
|
||||
oLeaf2 := DefaultOptions()
|
||||
oLeaf2.ServerName = "leaf2"
|
||||
oLeaf2.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u}}}
|
||||
oLeaf2.Routes = RoutesFromStr(leaf1ClusterURL)
|
||||
leaf2 := RunServer(oLeaf2)
|
||||
@@ -1925,9 +1928,26 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
|
||||
defer ncLeaf2.Close()
|
||||
|
||||
// Check that "foo" interest is available everywhere.
|
||||
checkSubInterest(t, srv1, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, leaf1, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, leaf2, globalAccountName, "foo", time.Second)
|
||||
// For this test, we want to make sure that the 2 queue subs are
|
||||
// registered on all servers, so we don't use checkSubInterest
|
||||
// which would simply return "true" if there is any interest on "foo".
|
||||
servers := []*Server{srv1, leaf1, leaf2}
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
for _, s := range servers {
|
||||
acc, err := s.LookupAccount(globalAccountName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
acc.mu.RLock()
|
||||
r := acc.sl.Match("foo")
|
||||
ok := len(r.qsubs) == 1 && len(r.qsubs[0]) == 2
|
||||
acc.mu.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("interest not propagated on %q", s.Name())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Send requests (from leaf2). For this test to make sure that
|
||||
// there is no duplicate, we want to make sure that we check for
|
||||
@@ -1936,6 +1956,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
|
||||
sub := natsSubSync(t, ncLeaf2, "reply_subj")
|
||||
natsFlush(t, ncLeaf2)
|
||||
|
||||
// Here we have a single sub on "reply_subj" so using checkSubInterest is ok.
|
||||
checkSubInterest(t, srv1, globalAccountName, "reply_subj", time.Second)
|
||||
checkSubInterest(t, leaf1, globalAccountName, "reply_subj", time.Second)
|
||||
checkSubInterest(t, leaf2, globalAccountName, "reply_subj", time.Second)
|
||||
|
||||
@@ -2360,6 +2360,11 @@ func TestMQTTSubWithNATSStream(t *testing.T) {
|
||||
testMQTTSub(t, 1, mc, r, []*mqttFilter{&mqttFilter{filter: "foo/bar", qos: 1}}, []byte{1})
|
||||
testMQTTFlush(t, mc, nil, r)
|
||||
|
||||
mcp, rp := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
|
||||
defer mcp.Close()
|
||||
testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false)
|
||||
testMQTTFlush(t, mcp, nil, rp)
|
||||
|
||||
nc := natsConnect(t, s.ClientURL())
|
||||
defer nc.Close()
|
||||
|
||||
@@ -2405,11 +2410,11 @@ func TestMQTTSubWithNATSStream(t *testing.T) {
|
||||
checkRecv("nats", 0)
|
||||
|
||||
// Send from MQTT as a QoS0
|
||||
testMQTTPublish(t, mc, r, 0, false, false, "foo/bar", 0, []byte("qos0"))
|
||||
testMQTTPublish(t, mcp, rp, 0, false, false, "foo/bar", 0, []byte("qos0"))
|
||||
checkRecv("qos0", 0)
|
||||
|
||||
// Send from MQTT as a QoS1
|
||||
testMQTTPublish(t, mc, r, 1, false, false, "foo/bar", 1, []byte("qos1"))
|
||||
testMQTTPublish(t, mcp, rp, 1, false, false, "foo/bar", 1, []byte("qos1"))
|
||||
checkRecv("qos1", mqttPubQos1)
|
||||
}
|
||||
|
||||
@@ -3748,6 +3753,9 @@ func TestMQTTMaxAckPending(t *testing.T) {
|
||||
testMQTTCheckPubMsg(t, c, r, "foo", mqttPubQos1, []byte("msg2"))
|
||||
testMQTTDisconnect(t, c, nil)
|
||||
|
||||
// Give a chance to the server to register that this client is gone.
|
||||
checkClientsCount(t, s, 1)
|
||||
|
||||
// Send 2 messages while sub is offline
|
||||
testMQTTPublish(t, cp, rp, 1, false, false, "foo", 1, []byte("msg3"))
|
||||
testMQTTPublish(t, cp, rp, 1, false, false, "foo", 1, []byte("msg4"))
|
||||
|
||||
Reference in New Issue
Block a user