diff --git a/server/accounts.go b/server/accounts.go index 8fc5ba6f..c85fa629 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1045,6 +1045,8 @@ func (a *Account) createRespWildcard() []byte { a.siReplyClient = c a.mu.Unlock() } + // Now check on leafnode updates. + s.updateLeafNodes(a, sub, 1) } return pre diff --git a/server/leafnode.go b/server/leafnode.go index 449b0111..e3611b6d 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1493,8 +1493,8 @@ func (c *client) processLeafMsgArgs(arg []byte) error { // processInboundLeafMsg is called to process an inbound msg from a leaf node. func (c *client) processInboundLeafMsg(msg []byte) { // Update statistics - c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. + c.in.msgs++ c.in.bytes += int32(len(msg) - LEN_CR_LF) // Check pub permissions diff --git a/test/leafnode_test.go b/test/leafnode_test.go index cbfb7c49..487caf4f 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -2430,6 +2430,8 @@ func TestLeafNodeServiceImportLikeNGS(t *testing.T) { sl, slOpts := runSolicitLeafServer(opts) defer sl.Shutdown() + checkLeafNodeConnected(t, sl) + // Create a normal direct connect client on B. url = fmt.Sprintf("nats://dlc:pass@%s:%d", opts.Host, opts.Port) nc2, err := nats.Connect(url) @@ -2455,6 +2457,44 @@ func TestLeafNodeServiceImportLikeNGS(t *testing.T) { } } +func TestLeafNodeServiceImportResponderOnLeaf(t *testing.T) { + gwSolicit := 10 * time.Millisecond + ca := createClusterEx(t, true, gwSolicit, true, "A", 3) + defer shutdownCluster(ca) + + // Now create a leafnode server on A that will bind to the NGS account. + opts := ca.opts[1] + sl, slOpts := runSolicitLeafServerToURL(fmt.Sprintf("nats-leaf://ngs:pass@%s:%d", opts.LeafNode.Host, opts.LeafNode.Port)) + defer sl.Shutdown() + + checkLeafNodeConnected(t, sl) + + // Now create a client on the leafnode. + ncl, err := nats.Connect(fmt.Sprintf("nats://%s:%d", slOpts.Host, slOpts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncl.Close() + + // Create a queue subscriber to send results + ncl.QueueSubscribe("ngs.usage.*", "ngs", func(m *nats.Msg) { + m.Respond([]byte("22")) + }) + ncl.Flush() + + // Create a normal direct connect client on A. Needs to be same server as leafnode. + opts = ca.opts[1] + nc, err := nats.Connect(fmt.Sprintf("nats://dlc:pass@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + if _, err := nc.Request("ngs.usage", []byte("fingers crossed"), 500*time.Millisecond); err != nil { + t.Fatalf("Did not receive response: %v", err) + } +} + func TestLeafNodeSendsAccountingEvents(t *testing.T) { s, opts, conf := runLeafNodeOperatorServer(t) defer os.Remove(conf)