mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
[FIXED] Message loop with cluster, leaf nodes and queue subs
In a setup with a cluster of servers to which 2 different leaf nodes attach to, and queue subs are attached to one of the leaf, if the leaf server is restarted and reconnects to another server in the cluster, there was a risk for an infinite message loop between some servers in the "hub" cluster. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -4114,7 +4114,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
|
||||
for i := 0; i < len(qsubs); i++ {
|
||||
sub = qsubs[i]
|
||||
if sub.client.kind == LEAF || sub.client.kind == ROUTER {
|
||||
if rsub == nil {
|
||||
// If we have assigned an rsub already, replace if the destination is LEAF
|
||||
// since we want to favor that compared to a ROUTER. We could make sure that
|
||||
// we override only if previous was a ROUTER and not a LEAF, but we don't have to.
|
||||
if rsub == nil || sub.client.kind == LEAF {
|
||||
rsub = sub
|
||||
}
|
||||
} else {
|
||||
@@ -4545,6 +4548,7 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
srv = c.srv
|
||||
noReconnect = c.flags.isSet(noReconnect)
|
||||
acc = c.acc
|
||||
spoke bool
|
||||
)
|
||||
|
||||
// Snapshot for use if we are a client connection.
|
||||
@@ -4560,6 +4564,7 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
sub.close()
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
spoke = c.isSpokeLeafNode()
|
||||
}
|
||||
|
||||
if c.route != nil {
|
||||
@@ -4593,7 +4598,6 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
// Unregister
|
||||
srv.removeClient(c)
|
||||
|
||||
notSpoke := !(kind == LEAF && c.isSpokeLeafNode())
|
||||
// Update remote subscriptions.
|
||||
if acc != nil && (kind == CLIENT || kind == LEAF) {
|
||||
qsubs := map[string]*qsub{}
|
||||
@@ -4602,29 +4606,36 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
c.unsubscribe(acc, sub, true, false)
|
||||
// Update route as normal for a normal subscriber.
|
||||
if sub.queue == nil {
|
||||
if notSpoke {
|
||||
if !spoke {
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
if srv.gateway.enabled {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
}
|
||||
}
|
||||
srv.updateLeafNodes(acc, sub, -1)
|
||||
} else {
|
||||
// We handle queue subscribers special in case we
|
||||
// have a bunch we can just send one update to the
|
||||
// connected routes.
|
||||
num := int32(1)
|
||||
if kind == LEAF {
|
||||
num = sub.qw
|
||||
}
|
||||
key := string(sub.subject) + " " + string(sub.queue)
|
||||
if esub, ok := qsubs[key]; ok {
|
||||
esub.n++
|
||||
esub.n += num
|
||||
} else {
|
||||
qsubs[key] = &qsub{sub, 1}
|
||||
qsubs[key] = &qsub{sub, num}
|
||||
}
|
||||
}
|
||||
if srv.gateway.enabled && notSpoke {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
}
|
||||
}
|
||||
// Process any qsubs here.
|
||||
for _, esub := range qsubs {
|
||||
if notSpoke {
|
||||
if !spoke {
|
||||
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
|
||||
if srv.gateway.enabled {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n))
|
||||
}
|
||||
}
|
||||
srv.updateLeafNodes(acc, esub.sub, -(esub.n))
|
||||
}
|
||||
|
||||
@@ -1592,6 +1592,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
key := string(sub.sid)
|
||||
osub := c.subs[key]
|
||||
updateGWs := false
|
||||
delta := int32(1)
|
||||
if osub == nil {
|
||||
c.subs[key] = sub
|
||||
// Now place into the account sl.
|
||||
@@ -1605,6 +1606,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
updateGWs = srv.gateway.enabled
|
||||
} else if sub.queue != nil {
|
||||
// For a queue we need to update the weight.
|
||||
delta = sub.qw - atomic.LoadInt32(&osub.qw)
|
||||
atomic.StoreInt32(&osub.qw, sub.qw)
|
||||
acc.sl.UpdateRemoteQSub(osub)
|
||||
}
|
||||
@@ -1620,14 +1622,14 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
// other leaf nodes as needed.
|
||||
if !spoke {
|
||||
// If we are routing add to the route map for the associated account.
|
||||
srv.updateRouteSubscriptionMap(acc, sub, 1)
|
||||
srv.updateRouteSubscriptionMap(acc, sub, delta)
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
|
||||
}
|
||||
}
|
||||
// Now check on leafnode updates for other leaf nodes. We understand solicited
|
||||
// and non-solicited state in this call so we will do the right thing.
|
||||
srv.updateLeafNodes(acc, sub, 1)
|
||||
srv.updateLeafNodes(acc, sub, delta)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3513,3 +3513,160 @@ func TestLeafNodeNoPingBeforeConnect(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeNoMsgLoop(t *testing.T) {
|
||||
hubConf := `
|
||||
listen: "127.0.0.1:-1"
|
||||
accounts {
|
||||
FOO {
|
||||
users [
|
||||
{username: leaf, password: pass}
|
||||
{username: user, password: pass}
|
||||
]
|
||||
}
|
||||
}
|
||||
cluster {
|
||||
name: "hub"
|
||||
listen: "127.0.0.1:-1"
|
||||
%s
|
||||
}
|
||||
leafnodes {
|
||||
listen: "127.0.0.1:-1"
|
||||
authorization {
|
||||
account: FOO
|
||||
}
|
||||
}
|
||||
`
|
||||
configS1 := createConfFile(t, []byte(fmt.Sprintf(hubConf, "")))
|
||||
defer removeFile(t, configS1)
|
||||
s1, o1 := RunServerWithConfig(configS1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
configS2S3 := createConfFile(t, []byte(fmt.Sprintf(hubConf, fmt.Sprintf(`routes: ["nats://127.0.0.1:%d"]`, o1.Cluster.Port))))
|
||||
defer removeFile(t, configS2S3)
|
||||
s2, o2 := RunServerWithConfig(configS2S3)
|
||||
defer s2.Shutdown()
|
||||
|
||||
s3, _ := RunServerWithConfig(configS2S3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
checkClusterFormed(t, s1, s2, s3)
|
||||
|
||||
contentLN := `
|
||||
listen: "127.0.0.1:%d"
|
||||
accounts {
|
||||
FOO {
|
||||
users [
|
||||
{username: leaf, password: pass}
|
||||
{username: user, password: pass}
|
||||
]
|
||||
}
|
||||
}
|
||||
leafnodes {
|
||||
remotes = [
|
||||
{
|
||||
url: "nats://leaf:pass@127.0.0.1:%d"
|
||||
account: FOO
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
lnconf := createConfFile(t, []byte(fmt.Sprintf(contentLN, -1, o1.LeafNode.Port)))
|
||||
defer removeFile(t, lnconf)
|
||||
sl1, slo1 := RunServerWithConfig(lnconf)
|
||||
defer sl1.Shutdown()
|
||||
|
||||
sl2, slo2 := RunServerWithConfig(lnconf)
|
||||
defer sl2.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, sl1)
|
||||
checkLeafNodeConnected(t, sl2)
|
||||
|
||||
// Create users on each leafnode
|
||||
nc1, err := nats.Connect(fmt.Sprintf("nats://user:pass@127.0.0.1:%d", slo1.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc1.Close()
|
||||
|
||||
rch := make(chan struct{}, 1)
|
||||
nc2, err := nats.Connect(
|
||||
fmt.Sprintf("nats://user:pass@127.0.0.1:%d", slo2.Port),
|
||||
nats.ReconnectWait(50*time.Millisecond),
|
||||
nats.ReconnectHandler(func(_ *nats.Conn) {
|
||||
rch <- struct{}{}
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
// Create queue subs on sl2
|
||||
nc2.QueueSubscribe("foo", "bar", func(_ *nats.Msg) {})
|
||||
nc2.QueueSubscribe("foo", "bar", func(_ *nats.Msg) {})
|
||||
nc2.Flush()
|
||||
|
||||
// Wait for interest to propagate to sl1
|
||||
checkSubInterest(t, sl1, "FOO", "foo", 250*time.Millisecond)
|
||||
|
||||
// Create sub on sl1
|
||||
ch := make(chan *nats.Msg, 10)
|
||||
nc1.Subscribe("foo", func(m *nats.Msg) {
|
||||
select {
|
||||
case ch <- m:
|
||||
default:
|
||||
}
|
||||
})
|
||||
nc1.Flush()
|
||||
|
||||
checkSubInterest(t, sl2, "FOO", "foo", 250*time.Millisecond)
|
||||
|
||||
// Produce from sl1
|
||||
nc1.Publish("foo", []byte("msg1"))
|
||||
|
||||
// Check message is received by plain sub
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("Did not receive message")
|
||||
}
|
||||
|
||||
// Restart leaf node, this time make sure we connect to 2nd server.
|
||||
sl2.Shutdown()
|
||||
|
||||
// Use config file but this time reuse the client port and set the 2nd server for
|
||||
// the remote leaf node port.
|
||||
lnconf = createConfFile(t, []byte(fmt.Sprintf(contentLN, slo2.Port, o2.LeafNode.Port)))
|
||||
defer removeFile(t, lnconf)
|
||||
sl2, _ = RunServerWithConfig(lnconf)
|
||||
defer sl2.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, sl2)
|
||||
|
||||
// Wait for client to reconnect
|
||||
select {
|
||||
case <-rch:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("Did not reconnect")
|
||||
}
|
||||
|
||||
// Produce a new messages
|
||||
for i := 0; i < 10; i++ {
|
||||
nc1.Publish("foo", []byte(fmt.Sprintf("msg%d", 2+i)))
|
||||
|
||||
// Check sub receives 1 message
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("Did not receive message")
|
||||
}
|
||||
// Check that there is no more...
|
||||
select {
|
||||
case m := <-ch:
|
||||
t.Fatalf("Loop: received second message %s", m.Data)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// OK
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1087,6 +1087,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
|
||||
|
||||
osub := c.subs[key]
|
||||
updateGWs := false
|
||||
delta := int32(1)
|
||||
if osub == nil {
|
||||
c.subs[key] = sub
|
||||
// Now place into the account sl.
|
||||
@@ -1100,17 +1101,18 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
|
||||
updateGWs = srv.gateway.enabled
|
||||
} else if sub.queue != nil {
|
||||
// For a queue we need to update the weight.
|
||||
delta = sub.qw - atomic.LoadInt32(&osub.qw)
|
||||
atomic.StoreInt32(&osub.qw, sub.qw)
|
||||
acc.sl.UpdateRemoteQSub(osub)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
|
||||
}
|
||||
|
||||
// Now check on leafnode updates.
|
||||
srv.updateLeafNodes(acc, sub, 1)
|
||||
srv.updateLeafNodes(acc, sub, delta)
|
||||
|
||||
if c.opts.Verbose {
|
||||
c.sendOK()
|
||||
|
||||
Reference in New Issue
Block a user