From 0c11279946aa67102b4dfc90cb17821ecaccedbd Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 7 Nov 2018 16:33:30 -0700 Subject: [PATCH] Fixed data race with client trace flag This was introduced with https://github.com/nats-io/gnatsd/commit/ec3115ad210cbf8ad0fa1dc722db8975181bec67 Signed-off-by: Ivan Kozlovic --- server/client.go | 4 ++-- server/client_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++ server/parser.go | 11 ++++------ server/parser_test.go | 4 ++-- server/route.go | 4 ++-- 5 files changed, 57 insertions(+), 13 deletions(-) diff --git a/server/client.go b/server/client.go index f35ffa4b..3158e18f 100644 --- a/server/client.go +++ b/server/client.go @@ -1209,8 +1209,8 @@ func (c *client) processPong() { c.mu.Unlock() } -func (c *client) processPub(arg []byte) error { - if c.trace { +func (c *client) processPub(trace bool, arg []byte) error { + if trace { c.traceInOp("PUB", arg) } diff --git a/server/client_test.go b/server/client_test.go index 50cfc4dd..7ef980c3 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1114,3 +1114,50 @@ func TestQueueAutoUnsubscribe(t *testing.T) { expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz)) }) } + +func TestClientTraceRace(t *testing.T) { + opts := DefaultOptions() + s := RunServer(opts) + defer s.Shutdown() + + // Activate trace logging + s.SetLogger(&DummyLogger{}, false, true) + + nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc1.Close() + total := 10000 + count := 0 + ch := make(chan bool, 1) + if _, err := nc1.Subscribe("foo", func(_ *nats.Msg) { + count++ + if count == total { + ch <- true + } + }); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < total; i++ { + nc1.Publish("bar", []byte("hello")) + } + }() + for i := 0; i < total; i++ { + nc2.Publish("foo", []byte("hello")) + } + if err := wait(ch); err != nil { + t.Fatal("Did not get all our messages") + } + wg.Wait() +} diff --git a/server/parser.go b/server/parser.go index 9763e6f1..a06bbaec 100644 --- a/server/parser.go +++ b/server/parser.go @@ -201,7 +201,7 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processPub(arg); err != nil { + if err := c.processPub(c.trace, arg); err != nil { return err } c.drop, c.as, c.state = OP_START, i+1, MSG_PAYLOAD @@ -642,7 +642,7 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processRoutedMsgArgs(arg); err != nil { + if err := c.processRoutedMsgArgs(c.trace, arg); err != nil { return err } c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD @@ -866,13 +866,10 @@ func (c *client) clonePubArg() { c.argBuf = c.scratch[:0] c.argBuf = append(c.argBuf, c.pa.arg...) - trace := c.trace - c.trace = false // This is a routed msg if c.pa.account != nil { - c.processRoutedMsgArgs(c.argBuf) + c.processRoutedMsgArgs(false, c.argBuf) } else { - c.processPub(c.argBuf) + c.processPub(false, c.argBuf) } - c.trace = trace } diff --git a/server/parser_test.go b/server/parser_test.go index fa8eadba..5773e06a 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -288,7 +288,7 @@ func TestParsePubArg(t *testing.T) { subject: "foo", reply: "", size: 2222, szb: "2222"}, } { t.Run(test.arg, func(t *testing.T) { - if err := c.processPub([]byte(test.arg)); err != nil { + if err := c.processPub(false, []byte(test.arg)); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if !bytes.Equal(c.pa.subject, []byte(test.subject)) { @@ -311,7 +311,7 @@ func TestParsePubBadSize(t *testing.T) { c := dummyClient() // Setup localized max payload c.mpay = 32768 - if err := c.processPub([]byte("foo 2222222222222222")); err == nil { + if err := c.processPub(false, []byte("foo 2222222222222222")); err == nil { t.Fatalf("Expected parse error for size too large") } } diff --git a/server/route.go b/server/route.go index 426599f0..62be8d50 100644 --- a/server/route.go +++ b/server/route.go @@ -143,8 +143,8 @@ func (c *client) processAccountUnsub(arg []byte) { } // Process an inbound RMSG specification from the remote route. -func (c *client) processRoutedMsgArgs(arg []byte) error { - if c.trace { +func (c *client) processRoutedMsgArgs(trace bool, arg []byte) error { + if trace { c.traceInOp("RMSG", arg) }