diff --git a/server/leafnode_test.go b/server/leafnode_test.go index cfbf1c69..497f464d 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1947,7 +1947,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { } } -func TestLeafNodeNoRouteParserError(t *testing.T) { +func TestLeafNodeLMsgSplit(t *testing.T) { // This set the cluster name to "abc" oSrv1 := DefaultOptions() oSrv1.LeafNode.Host = "127.0.0.1" @@ -2024,6 +2024,84 @@ func TestLeafNodeNoRouteParserError(t *testing.T) { } } +type parseRouteLSUnsubLogger struct { + DummyLogger + gotTrace chan struct{} + gotErr chan error +} + +func (l *parseRouteLSUnsubLogger) Errorf(format string, v ...interface{}) { + err := fmt.Errorf(format, v...) + select { + case l.gotErr <- err: + default: + } +} + +func (l *parseRouteLSUnsubLogger) Tracef(format string, v ...interface{}) { + trace := fmt.Sprintf(format, v...) + if strings.Contains(trace, "LS- $G foo bar") { + l.gotTrace <- struct{}{} + } +} + +func TestLeafNodeRouteParseLSUnsub(t *testing.T) { + // This set the cluster name to "abc" + oSrv1 := DefaultOptions() + oSrv1.LeafNode.Host = "127.0.0.1" + oSrv1.LeafNode.Port = -1 + srv1 := RunServer(oSrv1) + defer srv1.Shutdown() + + l := &parseRouteLSUnsubLogger{gotTrace: make(chan struct{}, 1), gotErr: make(chan error, 1)} + srv1.SetLogger(l, true, true) + + oSrv2 := DefaultOptions() + oSrv2.LeafNode.Host = "127.0.0.1" + oSrv2.LeafNode.Port = -1 + oSrv2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oSrv1.Cluster.Port)) + srv2 := RunServer(oSrv2) + defer srv2.Shutdown() + + checkClusterFormed(t, srv1, srv2) + + u2, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", oSrv2.LeafNode.Port)) + if err != nil { + t.Fatalf("Error parsing url: %v", err) + } + remoteLeafs := []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u2}}} + + oLeaf2 := DefaultOptions() + oLeaf2.LeafNode.Remotes = remoteLeafs + leaf2 := RunServer(oLeaf2) + defer leaf2.Shutdown() + + checkLeafNodeConnected(t, srv2) + checkLeafNodeConnected(t, leaf2) + + ncLeaf2 := natsConnect(t, leaf2.ClientURL()) + defer ncLeaf2.Close() + + sub := natsQueueSubSync(t, ncLeaf2, "foo", "bar") + // The issue was with the unsubscribe of this queue subscription + natsUnsub(t, sub) + + // We should get the trace + select { + case <-l.gotTrace: + // OK! + case <-time.After(100 * time.Millisecond): + t.Fatalf("Did not get LS- trace") + } + // And no error... + select { + case e := <-l.gotErr: + t.Fatalf("There was an error on server 1: %q", e.Error()) + case <-time.After(100 * time.Millisecond): + // OK! + } +} + func TestLeafNodeOperatorBadCfg(t *testing.T) { tmpDir, err := ioutil.TempDir("", "_nats-server") if err != nil { diff --git a/server/parser.go b/server/parser.go index d7a0d73f..e9492b2f 100644 --- a/server/parser.go +++ b/server/parser.go @@ -733,7 +733,12 @@ func (c *client) parse(buf []byte) error { err = c.processUnsub(arg) case ROUTER: if trace && c.srv != nil { - c.traceInOp("RS-", arg) + switch c.op { + case 'R', 'r': + c.traceInOp("RS-", arg) + case 'L', 'l': + c.traceInOp("LS-", arg) + } } err = c.processRemoteUnsub(arg) case GATEWAY: diff --git a/server/route.go b/server/route.go index 58527211..56b2f238 100644 --- a/server/route.go +++ b/server/route.go @@ -1236,10 +1236,10 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra if len(sub.origin) > 0 && c.route.lnoc { if isSubProto { buf = append(buf, lSubBytes...) + buf = append(buf, sub.origin...) } else { buf = append(buf, lUnsubBytes...) } - buf = append(buf, sub.origin...) buf = append(buf, ' ') } else { if isSubProto {