mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
Merge pull request #1424 from nats-io/fix_1421
[FIXED] Possible removal of interest on queue subs with leaf nodes
This commit is contained in:
@@ -3975,6 +3975,7 @@ func (c *client) teardownConn() {
|
||||
// Update route as normal for a normal subscriber.
|
||||
if sub.queue == nil {
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
srv.updateLeafNodes(acc, sub, -1)
|
||||
} else {
|
||||
// We handle queue subscribers special in case we
|
||||
// have a bunch we can just send one update to the
|
||||
@@ -3989,8 +3990,6 @@ func (c *client) teardownConn() {
|
||||
if srv.gateway.enabled {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
}
|
||||
// Now check on leafnode updates.
|
||||
srv.updateLeafNodes(acc, sub, -1)
|
||||
}
|
||||
// Process any qsubs here.
|
||||
for _, esub := range qsubs {
|
||||
|
||||
@@ -1395,7 +1395,7 @@ func TestConnectErrorReports(t *testing.T) {
|
||||
defer s.Shutdown()
|
||||
|
||||
// Wait long enough for the number of recurring attempts to happen
|
||||
time.Sleep(10 * routeConnectDelay)
|
||||
time.Sleep(50 * routeConnectDelay)
|
||||
s.Shutdown()
|
||||
|
||||
content, err := ioutil.ReadFile(log)
|
||||
@@ -1444,7 +1444,7 @@ func TestConnectErrorReports(t *testing.T) {
|
||||
defer s.Shutdown()
|
||||
|
||||
// Wait long enough for the number of recurring attempts to happen
|
||||
time.Sleep(10 * opts.LeafNode.ReconnectInterval)
|
||||
time.Sleep(50 * opts.LeafNode.ReconnectInterval)
|
||||
s.Shutdown()
|
||||
|
||||
content, err = ioutil.ReadFile(log)
|
||||
@@ -1494,7 +1494,7 @@ func TestConnectErrorReports(t *testing.T) {
|
||||
defer s.Shutdown()
|
||||
|
||||
// Wait long enough for the number of recurring attempts to happen
|
||||
time.Sleep(10 * gatewayConnectDelay)
|
||||
time.Sleep(50 * gatewayConnectDelay)
|
||||
s.Shutdown()
|
||||
|
||||
content, err = ioutil.ReadFile(log)
|
||||
@@ -1569,7 +1569,7 @@ func TestReconnectErrorReports(t *testing.T) {
|
||||
cs.Shutdown()
|
||||
|
||||
// Wait long enough for the number of recurring attempts to happen
|
||||
time.Sleep(DEFAULT_ROUTE_RECONNECT + 15*routeConnectDelay)
|
||||
time.Sleep(DEFAULT_ROUTE_RECONNECT + 50*routeConnectDelay)
|
||||
s.Shutdown()
|
||||
|
||||
content, err := ioutil.ReadFile(log)
|
||||
@@ -1636,7 +1636,7 @@ func TestReconnectErrorReports(t *testing.T) {
|
||||
cs.Shutdown()
|
||||
|
||||
// Wait long enough for the number of recurring attempts to happen
|
||||
time.Sleep(opts.LeafNode.ReconnectInterval + 15*opts.LeafNode.ReconnectInterval)
|
||||
time.Sleep(50 * opts.LeafNode.ReconnectInterval)
|
||||
s.Shutdown()
|
||||
|
||||
content, err = ioutil.ReadFile(log)
|
||||
@@ -1699,7 +1699,7 @@ func TestReconnectErrorReports(t *testing.T) {
|
||||
cs.Shutdown()
|
||||
|
||||
// Wait long enough for the number of recurring attempts to happen
|
||||
time.Sleep(2*gatewayReconnectDelay + 15*gatewayConnectDelay)
|
||||
time.Sleep(50 * gatewayConnectDelay)
|
||||
s.Shutdown()
|
||||
|
||||
content, err = ioutil.ReadFile(log)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
@@ -3594,3 +3595,53 @@ func TestServiceExportWithLeafnodeRestart(t *testing.T) {
|
||||
t.Fatal("Did not receive the correct message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeQueueSubscriberUnsubscribe(t *testing.T) {
|
||||
s, opts := runLeafServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
|
||||
leafSend, leafExpect := setupLeaf(t, lc, 1)
|
||||
leafSend("PING\r\n")
|
||||
leafExpect(pongRe)
|
||||
|
||||
// Create a client on leaf server and create a queue sub
|
||||
c1 := createClientConn(t, opts.Host, opts.Port)
|
||||
defer c1.Close()
|
||||
|
||||
send1, expect1 := setupConn(t, c1)
|
||||
send1("SUB foo bar 1\r\nPING\r\n")
|
||||
expect1(pongRe)
|
||||
|
||||
// Leaf should receive an LS+ foo bar 1
|
||||
leafExpect(lsubRe)
|
||||
|
||||
// Create a second client on leaf server and create queue sub on same group.
|
||||
c2 := createClientConn(t, opts.Host, opts.Port)
|
||||
defer c2.Close()
|
||||
|
||||
send2, expect2 := setupConn(t, c2)
|
||||
send2("SUB foo bar 1\r\nPING\r\n")
|
||||
expect2(pongRe)
|
||||
|
||||
// Leaf should receive an LS+ foo bar 2
|
||||
leafExpect(lsubRe)
|
||||
|
||||
// Now close c1
|
||||
c1.Close()
|
||||
|
||||
// Leaf should receive an indication that the queue group went to 1.
|
||||
// Which means LS+ foo bar 1.
|
||||
buf := leafExpect(lsubRe)
|
||||
if matches := lsubRe.FindAllSubmatch(buf, -1); len(matches) != 1 {
|
||||
t.Fatalf("Expected only 1 LS+, got %v", len(matches))
|
||||
}
|
||||
// Make sure that we did not get a LS- at the same time.
|
||||
if bytes.Contains(buf, []byte("LS-")) {
|
||||
t.Fatalf("Unexpected LS- in response: %q", buf)
|
||||
}
|
||||
// Make sure we receive nothing...
|
||||
expectNothing(t, lc)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user