From af57f55738f5419b8914650156a2f1b79cb12359 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 26 Jan 2021 13:10:23 -0700 Subject: [PATCH] Fixing some flappers (leafnode and mqtt) Signed-off-by: Ivan Kozlovic --- server/leafnode_test.go | 27 ++++++++++++++++++++++++--- server/mqtt_test.go | 12 ++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index dae790bc..472117fa 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1880,6 +1880,7 @@ func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) { func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { // This set the cluster name to "abc" oSrv1 := DefaultOptions() + oSrv1.ServerName = "srv1" oSrv1.LeafNode.Host = "127.0.0.1" oSrv1.LeafNode.Port = -1 srv1 := RunServer(oSrv1) @@ -1891,6 +1892,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { } oLeaf1 := DefaultOptions() + oLeaf1.ServerName = "leaf1" oLeaf1.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u}}} leaf1 := RunServer(oLeaf1) defer leaf1.Shutdown() @@ -1898,6 +1900,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { leaf1ClusterURL := fmt.Sprintf("nats://127.0.0.1:%d", oLeaf1.Cluster.Port) oLeaf2 := DefaultOptions() + oLeaf2.ServerName = "leaf2" oLeaf2.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u}}} oLeaf2.Routes = RoutesFromStr(leaf1ClusterURL) leaf2 := RunServer(oLeaf2) @@ -1925,9 +1928,26 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { defer ncLeaf2.Close() // Check that "foo" interest is available everywhere. - checkSubInterest(t, srv1, globalAccountName, "foo", time.Second) - checkSubInterest(t, leaf1, globalAccountName, "foo", time.Second) - checkSubInterest(t, leaf2, globalAccountName, "foo", time.Second) + // For this test, we want to make sure that the 2 queue subs are + // registered on all servers, so we don't use checkSubInterest + // which would simply return "true" if there is any interest on "foo". + servers := []*Server{srv1, leaf1, leaf2} + checkFor(t, time.Second, 15*time.Millisecond, func() error { + for _, s := range servers { + acc, err := s.LookupAccount(globalAccountName) + if err != nil { + return err + } + acc.mu.RLock() + r := acc.sl.Match("foo") + ok := len(r.qsubs) == 1 && len(r.qsubs[0]) == 2 + acc.mu.RUnlock() + if !ok { + return fmt.Errorf("interest not propagated on %q", s.Name()) + } + } + return nil + }) // Send requests (from leaf2). For this test to make sure that // there is no duplicate, we want to make sure that we check for @@ -1936,6 +1956,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { sub := natsSubSync(t, ncLeaf2, "reply_subj") natsFlush(t, ncLeaf2) + // Here we have a single sub on "reply_subj" so using checkSubInterest is ok. checkSubInterest(t, srv1, globalAccountName, "reply_subj", time.Second) checkSubInterest(t, leaf1, globalAccountName, "reply_subj", time.Second) checkSubInterest(t, leaf2, globalAccountName, "reply_subj", time.Second) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index bdb47856..5d3b1d9a 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -2360,6 +2360,11 @@ func TestMQTTSubWithNATSStream(t *testing.T) { testMQTTSub(t, 1, mc, r, []*mqttFilter{&mqttFilter{filter: "foo/bar", qos: 1}}, []byte{1}) testMQTTFlush(t, mc, nil, r) + mcp, rp := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mcp.Close() + testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false) + testMQTTFlush(t, mcp, nil, rp) + nc := natsConnect(t, s.ClientURL()) defer nc.Close() @@ -2405,11 +2410,11 @@ func TestMQTTSubWithNATSStream(t *testing.T) { checkRecv("nats", 0) // Send from MQTT as a QoS0 - testMQTTPublish(t, mc, r, 0, false, false, "foo/bar", 0, []byte("qos0")) + testMQTTPublish(t, mcp, rp, 0, false, false, "foo/bar", 0, []byte("qos0")) checkRecv("qos0", 0) // Send from MQTT as a QoS1 - testMQTTPublish(t, mc, r, 1, false, false, "foo/bar", 1, []byte("qos1")) + testMQTTPublish(t, mcp, rp, 1, false, false, "foo/bar", 1, []byte("qos1")) checkRecv("qos1", mqttPubQos1) } @@ -3748,6 +3753,9 @@ func TestMQTTMaxAckPending(t *testing.T) { testMQTTCheckPubMsg(t, c, r, "foo", mqttPubQos1, []byte("msg2")) testMQTTDisconnect(t, c, nil) + // Give a chance to the server to register that this client is gone. + checkClientsCount(t, s, 1) + // Send 2 messages while sub is offline testMQTTPublish(t, cp, rp, 1, false, false, "foo", 1, []byte("msg3")) testMQTTPublish(t, cp, rp, 1, false, false, "foo", 1, []byte("msg4"))