diff --git a/server/client.go b/server/client.go index 802b5d37..4adda8e9 100644 --- a/server/client.go +++ b/server/client.go @@ -4114,7 +4114,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, for i := 0; i < len(qsubs); i++ { sub = qsubs[i] if sub.client.kind == LEAF || sub.client.kind == ROUTER { - if rsub == nil { + // If we have assigned an rsub already, replace if the destination is LEAF + // since we want to favor that compared to a ROUTER. We could make sure that + // we override only if previous was a ROUTER and not a LEAF, but we don't have to. + if rsub == nil || sub.client.kind == LEAF { rsub = sub } } else { @@ -4545,6 +4548,7 @@ func (c *client) closeConnection(reason ClosedState) { srv = c.srv noReconnect = c.flags.isSet(noReconnect) acc = c.acc + spoke bool ) // Snapshot for use if we are a client connection. @@ -4560,6 +4564,7 @@ func (c *client) closeConnection(reason ClosedState) { sub.close() subs = append(subs, sub) } + spoke = c.isSpokeLeafNode() } if c.route != nil { @@ -4593,7 +4598,6 @@ func (c *client) closeConnection(reason ClosedState) { // Unregister srv.removeClient(c) - notSpoke := !(kind == LEAF && c.isSpokeLeafNode()) // Update remote subscriptions. if acc != nil && (kind == CLIENT || kind == LEAF) { qsubs := map[string]*qsub{} @@ -4602,29 +4606,36 @@ func (c *client) closeConnection(reason ClosedState) { c.unsubscribe(acc, sub, true, false) // Update route as normal for a normal subscriber. if sub.queue == nil { - if notSpoke { + if !spoke { srv.updateRouteSubscriptionMap(acc, sub, -1) + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + } } srv.updateLeafNodes(acc, sub, -1) } else { // We handle queue subscribers special in case we // have a bunch we can just send one update to the // connected routes. + num := int32(1) + if kind == LEAF { + num = sub.qw + } key := string(sub.subject) + " " + string(sub.queue) if esub, ok := qsubs[key]; ok { - esub.n++ + esub.n += num } else { - qsubs[key] = &qsub{sub, 1} + qsubs[key] = &qsub{sub, num} } } - if srv.gateway.enabled && notSpoke { - srv.gatewayUpdateSubInterest(acc.Name, sub, -1) - } } // Process any qsubs here. for _, esub := range qsubs { - if notSpoke { + if !spoke { srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n)) + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n)) + } } srv.updateLeafNodes(acc, esub.sub, -(esub.n)) } diff --git a/server/leafnode.go b/server/leafnode.go index 3f253372..15abe091 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1592,6 +1592,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { key := string(sub.sid) osub := c.subs[key] updateGWs := false + delta := int32(1) if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1605,6 +1606,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. + delta = sub.qw - atomic.LoadInt32(&osub.qw) atomic.StoreInt32(&osub.qw, sub.qw) acc.sl.UpdateRemoteQSub(osub) } @@ -1620,14 +1622,14 @@ func (c *client) processLeafSub(argo []byte) (err error) { // other leaf nodes as needed. if !spoke { // If we are routing add to the route map for the associated account. - srv.updateRouteSubscriptionMap(acc, sub, 1) + srv.updateRouteSubscriptionMap(acc, sub, delta) if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, 1) + srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } } // Now check on leafnode updates for other leaf nodes. We understand solicited // and non-solicited state in this call so we will do the right thing. - srv.updateLeafNodes(acc, sub, 1) + srv.updateLeafNodes(acc, sub, delta) return nil } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index d4559226..7da76420 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -3513,3 +3513,160 @@ func TestLeafNodeNoPingBeforeConnect(t *testing.T) { } } } + +func TestLeafNodeNoMsgLoop(t *testing.T) { + hubConf := ` + listen: "127.0.0.1:-1" + accounts { + FOO { + users [ + {username: leaf, password: pass} + {username: user, password: pass} + ] + } + } + cluster { + name: "hub" + listen: "127.0.0.1:-1" + %s + } + leafnodes { + listen: "127.0.0.1:-1" + authorization { + account: FOO + } + } + ` + configS1 := createConfFile(t, []byte(fmt.Sprintf(hubConf, ""))) + defer removeFile(t, configS1) + s1, o1 := RunServerWithConfig(configS1) + defer s1.Shutdown() + + configS2S3 := createConfFile(t, []byte(fmt.Sprintf(hubConf, fmt.Sprintf(`routes: ["nats://127.0.0.1:%d"]`, o1.Cluster.Port)))) + defer removeFile(t, configS2S3) + s2, o2 := RunServerWithConfig(configS2S3) + defer s2.Shutdown() + + s3, _ := RunServerWithConfig(configS2S3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + contentLN := ` + listen: "127.0.0.1:%d" + accounts { + FOO { + users [ + {username: leaf, password: pass} + {username: user, password: pass} + ] + } + } + leafnodes { + remotes = [ + { + url: "nats://leaf:pass@127.0.0.1:%d" + account: FOO + } + ] + } + ` + lnconf := createConfFile(t, []byte(fmt.Sprintf(contentLN, -1, o1.LeafNode.Port))) + defer removeFile(t, lnconf) + sl1, slo1 := RunServerWithConfig(lnconf) + defer sl1.Shutdown() + + sl2, slo2 := RunServerWithConfig(lnconf) + defer sl2.Shutdown() + + checkLeafNodeConnected(t, sl1) + checkLeafNodeConnected(t, sl2) + + // Create users on each leafnode + nc1, err := nats.Connect(fmt.Sprintf("nats://user:pass@127.0.0.1:%d", slo1.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc1.Close() + + rch := make(chan struct{}, 1) + nc2, err := nats.Connect( + fmt.Sprintf("nats://user:pass@127.0.0.1:%d", slo2.Port), + nats.ReconnectWait(50*time.Millisecond), + nats.ReconnectHandler(func(_ *nats.Conn) { + rch <- struct{}{} + }), + ) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Create queue subs on sl2 + nc2.QueueSubscribe("foo", "bar", func(_ *nats.Msg) {}) + nc2.QueueSubscribe("foo", "bar", func(_ *nats.Msg) {}) + nc2.Flush() + + // Wait for interest to propagate to sl1 + checkSubInterest(t, sl1, "FOO", "foo", 250*time.Millisecond) + + // Create sub on sl1 + ch := make(chan *nats.Msg, 10) + nc1.Subscribe("foo", func(m *nats.Msg) { + select { + case ch <- m: + default: + } + }) + nc1.Flush() + + checkSubInterest(t, sl2, "FOO", "foo", 250*time.Millisecond) + + // Produce from sl1 + nc1.Publish("foo", []byte("msg1")) + + // Check message is received by plain sub + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("Did not receive message") + } + + // Restart leaf node, this time make sure we connect to 2nd server. + sl2.Shutdown() + + // Use config file but this time reuse the client port and set the 2nd server for + // the remote leaf node port. + lnconf = createConfFile(t, []byte(fmt.Sprintf(contentLN, slo2.Port, o2.LeafNode.Port))) + defer removeFile(t, lnconf) + sl2, _ = RunServerWithConfig(lnconf) + defer sl2.Shutdown() + + checkLeafNodeConnected(t, sl2) + + // Wait for client to reconnect + select { + case <-rch: + case <-time.After(time.Second): + t.Fatalf("Did not reconnect") + } + + // Produce a new messages + for i := 0; i < 10; i++ { + nc1.Publish("foo", []byte(fmt.Sprintf("msg%d", 2+i))) + + // Check sub receives 1 message + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("Did not receive message") + } + // Check that there is no more... + select { + case m := <-ch: + t.Fatalf("Loop: received second message %s", m.Data) + case <-time.After(50 * time.Millisecond): + // OK + } + } +} diff --git a/server/route.go b/server/route.go index d9e4614b..fa2c0a69 100644 --- a/server/route.go +++ b/server/route.go @@ -1087,6 +1087,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { osub := c.subs[key] updateGWs := false + delta := int32(1) if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1100,17 +1101,18 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. + delta = sub.qw - atomic.LoadInt32(&osub.qw) atomic.StoreInt32(&osub.qw, sub.qw) acc.sl.UpdateRemoteQSub(osub) } c.mu.Unlock() if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, 1) + srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } // Now check on leafnode updates. - srv.updateLeafNodes(acc, sub, 1) + srv.updateLeafNodes(acc, sub, delta) if c.opts.Verbose { c.sendOK()