diff --git a/server/client.go b/server/client.go index 3e1063ec..75b47a3d 100644 --- a/server/client.go +++ b/server/client.go @@ -3975,6 +3975,7 @@ func (c *client) teardownConn() { // Update route as normal for a normal subscriber. if sub.queue == nil { srv.updateRouteSubscriptionMap(acc, sub, -1) + srv.updateLeafNodes(acc, sub, -1) } else { // We handle queue subscribers special in case we // have a bunch we can just send one update to the @@ -3989,8 +3990,6 @@ func (c *client) teardownConn() { if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, -1) } - // Now check on leafnode updates. - srv.updateLeafNodes(acc, sub, -1) } // Process any qsubs here. for _, esub := range qsubs { diff --git a/server/server_test.go b/server/server_test.go index f39864d7..914fffef 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1395,7 +1395,7 @@ func TestConnectErrorReports(t *testing.T) { defer s.Shutdown() // Wait long enough for the number of recurring attempts to happen - time.Sleep(10 * routeConnectDelay) + time.Sleep(50 * routeConnectDelay) s.Shutdown() content, err := ioutil.ReadFile(log) @@ -1444,7 +1444,7 @@ func TestConnectErrorReports(t *testing.T) { defer s.Shutdown() // Wait long enough for the number of recurring attempts to happen - time.Sleep(10 * opts.LeafNode.ReconnectInterval) + time.Sleep(50 * opts.LeafNode.ReconnectInterval) s.Shutdown() content, err = ioutil.ReadFile(log) @@ -1494,7 +1494,7 @@ func TestConnectErrorReports(t *testing.T) { defer s.Shutdown() // Wait long enough for the number of recurring attempts to happen - time.Sleep(10 * gatewayConnectDelay) + time.Sleep(50 * gatewayConnectDelay) s.Shutdown() content, err = ioutil.ReadFile(log) @@ -1569,7 +1569,7 @@ func TestReconnectErrorReports(t *testing.T) { cs.Shutdown() // Wait long enough for the number of recurring attempts to happen - time.Sleep(DEFAULT_ROUTE_RECONNECT + 15*routeConnectDelay) + time.Sleep(DEFAULT_ROUTE_RECONNECT + 50*routeConnectDelay) s.Shutdown() content, err := ioutil.ReadFile(log) @@ -1636,7 +1636,7 @@ func TestReconnectErrorReports(t *testing.T) { cs.Shutdown() // Wait long enough for the number of recurring attempts to happen - time.Sleep(opts.LeafNode.ReconnectInterval + 15*opts.LeafNode.ReconnectInterval) + time.Sleep(50 * opts.LeafNode.ReconnectInterval) s.Shutdown() content, err = ioutil.ReadFile(log) @@ -1699,7 +1699,7 @@ func TestReconnectErrorReports(t *testing.T) { cs.Shutdown() // Wait long enough for the number of recurring attempts to happen - time.Sleep(2*gatewayReconnectDelay + 15*gatewayConnectDelay) + time.Sleep(50 * gatewayConnectDelay) s.Shutdown() content, err = ioutil.ReadFile(log) diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 2c84dac3..d6f61327 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -14,6 +14,7 @@ package test import ( + "bytes" "crypto/tls" "crypto/x509" "encoding/json" @@ -3594,3 +3595,53 @@ func TestServiceExportWithLeafnodeRestart(t *testing.T) { t.Fatal("Did not receive the correct message") } } + +func TestLeafNodeQueueSubscriberUnsubscribe(t *testing.T) { + s, opts := runLeafServer() + defer s.Shutdown() + + lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) + defer lc.Close() + + leafSend, leafExpect := setupLeaf(t, lc, 1) + leafSend("PING\r\n") + leafExpect(pongRe) + + // Create a client on leaf server and create a queue sub + c1 := createClientConn(t, opts.Host, opts.Port) + defer c1.Close() + + send1, expect1 := setupConn(t, c1) + send1("SUB foo bar 1\r\nPING\r\n") + expect1(pongRe) + + // Leaf should receive an LS+ foo bar 1 + leafExpect(lsubRe) + + // Create a second client on leaf server and create queue sub on same group. + c2 := createClientConn(t, opts.Host, opts.Port) + defer c2.Close() + + send2, expect2 := setupConn(t, c2) + send2("SUB foo bar 1\r\nPING\r\n") + expect2(pongRe) + + // Leaf should receive an LS+ foo bar 2 + leafExpect(lsubRe) + + // Now close c1 + c1.Close() + + // Leaf should receive an indication that the queue group went to 1. + // Which means LS+ foo bar 1. + buf := leafExpect(lsubRe) + if matches := lsubRe.FindAllSubmatch(buf, -1); len(matches) != 1 { + t.Fatalf("Expected only 1 LS+, got %v", len(matches)) + } + // Make sure that we did not get a LS- at the same time. + if bytes.Contains(buf, []byte("LS-")) { + t.Fatalf("Unexpected LS- in response: %q", buf) + } + // Make sure we receive nothing... + expectNothing(t, lc) +}