Fixed data race with client trace flag

This was introduced with ec3115ad21

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2018-11-07 16:33:30 -07:00
parent b46d3fbcd9
commit 0c11279946
5 changed files with 57 additions and 13 deletions

View File

@@ -1209,8 +1209,8 @@ func (c *client) processPong() {
c.mu.Unlock()
}
func (c *client) processPub(arg []byte) error {
if c.trace {
func (c *client) processPub(trace bool, arg []byte) error {
if trace {
c.traceInOp("PUB", arg)
}

View File

@@ -1114,3 +1114,50 @@ func TestQueueAutoUnsubscribe(t *testing.T) {
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
})
}
func TestClientTraceRace(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
// Activate trace logging
s.SetLogger(&DummyLogger{}, false, true)
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()
total := 10000
count := 0
ch := make(chan bool, 1)
if _, err := nc1.Subscribe("foo", func(_ *nats.Msg) {
count++
if count == total {
ch <- true
}
}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < total; i++ {
nc1.Publish("bar", []byte("hello"))
}
}()
for i := 0; i < total; i++ {
nc2.Publish("foo", []byte("hello"))
}
if err := wait(ch); err != nil {
t.Fatal("Did not get all our messages")
}
wg.Wait()
}

View File

@@ -201,7 +201,7 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processPub(arg); err != nil {
if err := c.processPub(c.trace, arg); err != nil {
return err
}
c.drop, c.as, c.state = OP_START, i+1, MSG_PAYLOAD
@@ -642,7 +642,7 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processRoutedMsgArgs(arg); err != nil {
if err := c.processRoutedMsgArgs(c.trace, arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
@@ -866,13 +866,10 @@ func (c *client) clonePubArg() {
c.argBuf = c.scratch[:0]
c.argBuf = append(c.argBuf, c.pa.arg...)
trace := c.trace
c.trace = false
// This is a routed msg
if c.pa.account != nil {
c.processRoutedMsgArgs(c.argBuf)
c.processRoutedMsgArgs(false, c.argBuf)
} else {
c.processPub(c.argBuf)
c.processPub(false, c.argBuf)
}
c.trace = trace
}

View File

@@ -288,7 +288,7 @@ func TestParsePubArg(t *testing.T) {
subject: "foo", reply: "", size: 2222, szb: "2222"},
} {
t.Run(test.arg, func(t *testing.T) {
if err := c.processPub([]byte(test.arg)); err != nil {
if err := c.processPub(false, []byte(test.arg)); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if !bytes.Equal(c.pa.subject, []byte(test.subject)) {
@@ -311,7 +311,7 @@ func TestParsePubBadSize(t *testing.T) {
c := dummyClient()
// Setup localized max payload
c.mpay = 32768
if err := c.processPub([]byte("foo 2222222222222222")); err == nil {
if err := c.processPub(false, []byte("foo 2222222222222222")); err == nil {
t.Fatalf("Expected parse error for size too large")
}
}

View File

@@ -143,8 +143,8 @@ func (c *client) processAccountUnsub(arg []byte) {
}
// Process an inbound RMSG specification from the remote route.
func (c *client) processRoutedMsgArgs(arg []byte) error {
if c.trace {
func (c *client) processRoutedMsgArgs(trace bool, arg []byte) error {
if trace {
c.traceInOp("RMSG", arg)
}