mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] Split LMSG across routes
If a LeafNode message is sent across a route, and the message does not fit in the buffer, the parser would incorrectly process the "pub args" as if it was a ROUTED message, not a LEAF message. This caused clonePubArg() to return an error that would cause the parser to end with a protocol violation. Keep track that we are processing an LMSG so that we can pass that information to clonePubArg() and do proper parsing in split scenario. Resolves #1743 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1856,7 +1856,9 @@ func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) {
|
||||
|
||||
select {
|
||||
case err := <-l.errCh:
|
||||
fmt.Printf("@@IK: err=%q\n", err)
|
||||
if !strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Did not get any error")
|
||||
}
|
||||
@@ -1945,6 +1947,83 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeNoRouteParserError(t *testing.T) {
|
||||
// This set the cluster name to "abc"
|
||||
oSrv1 := DefaultOptions()
|
||||
oSrv1.LeafNode.Host = "127.0.0.1"
|
||||
oSrv1.LeafNode.Port = -1
|
||||
srv1 := RunServer(oSrv1)
|
||||
defer srv1.Shutdown()
|
||||
|
||||
oSrv2 := DefaultOptions()
|
||||
oSrv2.LeafNode.Host = "127.0.0.1"
|
||||
oSrv2.LeafNode.Port = -1
|
||||
oSrv2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oSrv1.Cluster.Port))
|
||||
srv2 := RunServer(oSrv2)
|
||||
defer srv2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, srv1, srv2)
|
||||
|
||||
u1, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", oSrv1.LeafNode.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error parsing url: %v", err)
|
||||
}
|
||||
u2, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", oSrv2.LeafNode.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error parsing url: %v", err)
|
||||
}
|
||||
remoteLeafs := []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u1, u2}}}
|
||||
|
||||
oLeaf1 := DefaultOptions()
|
||||
oLeaf1.LeafNode.Remotes = remoteLeafs
|
||||
leaf1 := RunServer(oLeaf1)
|
||||
defer leaf1.Shutdown()
|
||||
|
||||
oLeaf2 := DefaultOptions()
|
||||
oLeaf2.LeafNode.Remotes = remoteLeafs
|
||||
oLeaf2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oLeaf1.Cluster.Port))
|
||||
leaf2 := RunServer(oLeaf2)
|
||||
defer leaf2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, leaf1, leaf2)
|
||||
|
||||
checkLeafNodeConnected(t, leaf1)
|
||||
checkLeafNodeConnected(t, leaf2)
|
||||
|
||||
ncSrv2 := natsConnect(t, srv2.ClientURL())
|
||||
defer ncSrv2.Close()
|
||||
natsQueueSub(t, ncSrv2, "foo", "queue", func(m *nats.Msg) {
|
||||
m.Respond([]byte("from srv2"))
|
||||
})
|
||||
|
||||
// Check that "foo" interest is available everywhere.
|
||||
checkSubInterest(t, srv1, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, srv2, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, leaf1, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, leaf2, globalAccountName, "foo", time.Second)
|
||||
|
||||
// Not required, but have a request payload that is more than 100 bytes
|
||||
reqPayload := make([]byte, 150)
|
||||
for i := 0; i < len(reqPayload); i++ {
|
||||
reqPayload[i] = byte((i % 26)) + 'A'
|
||||
}
|
||||
|
||||
// Send repeated requests (from scratch) from leaf-2:
|
||||
sendReq := func() {
|
||||
t.Helper()
|
||||
|
||||
ncLeaf2 := natsConnect(t, leaf2.ClientURL())
|
||||
defer ncLeaf2.Close()
|
||||
|
||||
if _, err := ncLeaf2.Request("foo", reqPayload, time.Second); err != nil {
|
||||
t.Fatalf("Did not receive reply: %v", err)
|
||||
}
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
sendReq()
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeOperatorBadCfg(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "_nats-server")
|
||||
if err != nil {
|
||||
|
||||
@@ -132,6 +132,7 @@ const (
|
||||
func (c *client) parse(buf []byte) error {
|
||||
var i int
|
||||
var b byte
|
||||
var lmsg bool
|
||||
|
||||
// Snapshots
|
||||
c.mu.Lock()
|
||||
@@ -463,6 +464,7 @@ func (c *client) parse(buf []byte) error {
|
||||
// Drop all pub args
|
||||
c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject = nil, nil, nil, nil, nil
|
||||
c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil
|
||||
lmsg = false
|
||||
case OP_A:
|
||||
switch b {
|
||||
case '+':
|
||||
@@ -932,6 +934,7 @@ func (c *client) parse(buf []byte) error {
|
||||
if trace {
|
||||
c.traceInOp("LMSG", arg)
|
||||
}
|
||||
lmsg = true
|
||||
err = c.processRoutedOriginClusterMsgArgs(arg)
|
||||
}
|
||||
} else if c.kind == LEAF {
|
||||
@@ -1114,7 +1117,7 @@ func (c *client) parse(buf []byte) error {
|
||||
|
||||
if c.argBuf == nil {
|
||||
// Works also for MSG_ARG, when message comes from ROUTE.
|
||||
if err := c.clonePubArg(); err != nil {
|
||||
if err := c.clonePubArg(lmsg); err != nil {
|
||||
goto parseErr
|
||||
}
|
||||
}
|
||||
@@ -1165,13 +1168,16 @@ func protoSnippet(start int, buf []byte) string {
|
||||
|
||||
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
|
||||
// we need to hold onto it into the next read.
|
||||
func (c *client) clonePubArg() error {
|
||||
func (c *client) clonePubArg(lmsg bool) error {
|
||||
// Just copy and re-process original arg buffer.
|
||||
c.argBuf = c.scratch[:0]
|
||||
c.argBuf = append(c.argBuf, c.pa.arg...)
|
||||
|
||||
switch c.kind {
|
||||
case ROUTER, GATEWAY:
|
||||
if lmsg {
|
||||
return c.processRoutedOriginClusterMsgArgs(c.argBuf)
|
||||
}
|
||||
if c.pa.hdr < 0 {
|
||||
return c.processRoutedMsgArgs(c.argBuf)
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user