From 88475014efbc56305aa14e56ff252719617c461c Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 24 Nov 2020 14:24:56 -0700 Subject: [PATCH] [FIXED] Split LMSG across routes If a LeafNode message is sent across a route, and the message does not fit in the buffer, the parser would incorrectly process the "pub args" as if it was a ROUTED message, not a LEAF message. This caused clonePubArg() to return an error that would cause the parser to end with a protocol violation. Keep track that we are processing an LMSG so that we can pass that information to clonePubArg() and do proper parsing in split scenario. Resolves #1743 Signed-off-by: Ivan Kozlovic --- server/leafnode_test.go | 81 ++++++++++++++++++++++++++++++++++++++++- server/parser.go | 10 ++++- 2 files changed, 88 insertions(+), 3 deletions(-) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index ee40d4d3..cfbf1c69 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1856,7 +1856,9 @@ func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) { select { case err := <-l.errCh: - fmt.Printf("@@IK: err=%q\n", err) + if !strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) { + t.Fatalf("Unexpected error: %v", err) + } case <-time.After(2 * time.Second): t.Fatal("Did not get any error") } @@ -1945,6 +1947,83 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { } } +func TestLeafNodeNoRouteParserError(t *testing.T) { + // This set the cluster name to "abc" + oSrv1 := DefaultOptions() + oSrv1.LeafNode.Host = "127.0.0.1" + oSrv1.LeafNode.Port = -1 + srv1 := RunServer(oSrv1) + defer srv1.Shutdown() + + oSrv2 := DefaultOptions() + oSrv2.LeafNode.Host = "127.0.0.1" + oSrv2.LeafNode.Port = -1 + oSrv2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oSrv1.Cluster.Port)) + srv2 := RunServer(oSrv2) + defer srv2.Shutdown() + + checkClusterFormed(t, srv1, srv2) + + u1, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", oSrv1.LeafNode.Port)) + if err != nil { + t.Fatalf("Error parsing url: %v", err) + } + u2, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", oSrv2.LeafNode.Port)) + if err != nil { + t.Fatalf("Error parsing url: %v", err) + } + remoteLeafs := []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u1, u2}}} + + oLeaf1 := DefaultOptions() + oLeaf1.LeafNode.Remotes = remoteLeafs + leaf1 := RunServer(oLeaf1) + defer leaf1.Shutdown() + + oLeaf2 := DefaultOptions() + oLeaf2.LeafNode.Remotes = remoteLeafs + oLeaf2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oLeaf1.Cluster.Port)) + leaf2 := RunServer(oLeaf2) + defer leaf2.Shutdown() + + checkClusterFormed(t, leaf1, leaf2) + + checkLeafNodeConnected(t, leaf1) + checkLeafNodeConnected(t, leaf2) + + ncSrv2 := natsConnect(t, srv2.ClientURL()) + defer ncSrv2.Close() + natsQueueSub(t, ncSrv2, "foo", "queue", func(m *nats.Msg) { + m.Respond([]byte("from srv2")) + }) + + // Check that "foo" interest is available everywhere. + checkSubInterest(t, srv1, globalAccountName, "foo", time.Second) + checkSubInterest(t, srv2, globalAccountName, "foo", time.Second) + checkSubInterest(t, leaf1, globalAccountName, "foo", time.Second) + checkSubInterest(t, leaf2, globalAccountName, "foo", time.Second) + + // Not required, but have a request payload that is more than 100 bytes + reqPayload := make([]byte, 150) + for i := 0; i < len(reqPayload); i++ { + reqPayload[i] = byte((i % 26)) + 'A' + } + + // Send repeated requests (from scratch) from leaf-2: + sendReq := func() { + t.Helper() + + ncLeaf2 := natsConnect(t, leaf2.ClientURL()) + defer ncLeaf2.Close() + + if _, err := ncLeaf2.Request("foo", reqPayload, time.Second); err != nil { + t.Fatalf("Did not receive reply: %v", err) + } + } + for i := 0; i < 100; i++ { + sendReq() + } +} + func TestLeafNodeOperatorBadCfg(t *testing.T) { tmpDir, err := ioutil.TempDir("", "_nats-server") if err != nil { diff --git a/server/parser.go b/server/parser.go index d8b7f87e..d7a0d73f 100644 --- a/server/parser.go +++ b/server/parser.go @@ -132,6 +132,7 @@ const ( func (c *client) parse(buf []byte) error { var i int var b byte + var lmsg bool // Snapshots c.mu.Lock() @@ -463,6 +464,7 @@ func (c *client) parse(buf []byte) error { // Drop all pub args c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject = nil, nil, nil, nil, nil c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil + lmsg = false case OP_A: switch b { case '+': @@ -932,6 +934,7 @@ func (c *client) parse(buf []byte) error { if trace { c.traceInOp("LMSG", arg) } + lmsg = true err = c.processRoutedOriginClusterMsgArgs(arg) } } else if c.kind == LEAF { @@ -1114,7 +1117,7 @@ func (c *client) parse(buf []byte) error { if c.argBuf == nil { // Works also for MSG_ARG, when message comes from ROUTE. - if err := c.clonePubArg(); err != nil { + if err := c.clonePubArg(lmsg); err != nil { goto parseErr } } @@ -1165,13 +1168,16 @@ func protoSnippet(start int, buf []byte) string { // clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but // we need to hold onto it into the next read. -func (c *client) clonePubArg() error { +func (c *client) clonePubArg(lmsg bool) error { // Just copy and re-process original arg buffer. c.argBuf = c.scratch[:0] c.argBuf = append(c.argBuf, c.pa.arg...) switch c.kind { case ROUTER, GATEWAY: + if lmsg { + return c.processRoutedOriginClusterMsgArgs(c.argBuf) + } if c.pa.hdr < 0 { return c.processRoutedMsgArgs(c.argBuf) } else {