From 97da466312d2ccf627f8a39b1172c8e3da59d975 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 18 Nov 2015 20:39:56 -0700 Subject: [PATCH] Fix races, add a test and adjust others. * There was a race during unsubscribe() * 'go test -race' reports a race in TestSetLogger test. This one could be ignored since we normally invoke SetLogger only on server startup. That being said, Travis failed when I tried to submit a PR for the fix of the unsubscribe race. So proposing to fix the logger too. --- server/client.go | 38 ++++++++++++++++++++++--------- server/client_test.go | 52 +++++++++++++++++++++++++++++++++++++++++-- test/proto_test.go | 3 +++ 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/server/client.go b/server/client.go index 99296c0a..c37dabc3 100644 --- a/server/client.go +++ b/server/client.go @@ -52,6 +52,8 @@ type client struct { route *route sendLocalSubs bool + debug bool + trace bool } func (c *client) String() (id string) { @@ -95,6 +97,8 @@ func (c *client) initClient() { c.cid = atomic.AddUint64(&s.gcid, 1) c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize) c.subs = hashmap.New() + c.debug = (atomic.LoadInt32(&debug) != 0) + c.trace = (atomic.LoadInt32(&trace) != 0) // This is a scratch buffer used for processMsg() // The msg header starts with "MSG ", @@ -191,7 +195,7 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { - if trace == 0 { + if !c.trace { return } // FIXME(dlc), allow limits to printable payload @@ -207,7 +211,7 @@ func (c *client) traceOutOp(op string, arg []byte) { } func (c *client) traceOp(format, op string, arg []byte) { - if trace == 0 { + if !c.trace { return } @@ -329,7 +333,7 @@ func (c *client) processPong() { } func (c *client) processMsgArgs(arg []byte) error { - if trace == 1 { + if c.trace { c.traceInOp("MSG", arg) } @@ -378,7 +382,7 @@ func (c *client) processMsgArgs(arg []byte) error { } func (c *client) processPub(arg []byte) error { - if trace == 1 { + if c.trace { c.traceInOp("PUB", arg) } @@ -538,17 +542,31 @@ func (c *client) processUnsub(arg []byte) error { default: return fmt.Errorf("processUnsub Parse Error: '%s'", arg) } - if sub, ok := (c.subs.Get(sid)).(*subscription); ok { + + var sub *subscription + + unsub := false + shouldForward := false + ok := false + + c.mu.Lock() + if sub, ok = (c.subs.Get(sid)).(*subscription); ok { if max > 0 { sub.max = int64(max) } else { // Clear it here to override sub.max = 0 } + unsub = true + shouldForward = c.typ != ROUTER && c.srv != nil + } + c.mu.Unlock() + + if unsub { c.unsubscribe(sub) - if shouldForward := c.typ != ROUTER && c.srv != nil; shouldForward { - c.srv.broadcastUnSubscribe(sub) - } + } + if shouldForward { + c.srv.broadcastUnSubscribe(sub) } if c.opts.Verbose { c.sendOK() @@ -641,7 +659,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { goto writeErr } - if trace == 1 { + if c.trace { client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil) } @@ -688,7 +706,7 @@ func (c *client) processMsg(msg []byte) { atomic.AddInt64(&srv.inBytes, msgSize) } - if trace == 1 { + if c.trace { c.traceMsg(msg) } if c.opts.Verbose { diff --git a/server/client_test.go b/server/client_test.go index 19bcb845..a847cb10 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -4,12 +4,16 @@ import ( "bufio" "bytes" "encoding/json" + "fmt" "net" "reflect" "regexp" "strings" + "sync" "testing" "time" + + "github.com/nats-io/nats" ) type serverInfo struct { @@ -280,7 +284,7 @@ func TestClientNoBodyPubSubWithReply(t *testing.T) { func TestClientPubWithQueueSub(t *testing.T) { _, c, cr := setupClient() - num := 10 + num := 100 // Queue SUB/PUB subs := []byte("SUB foo g1 1\r\nSUB foo g1 2\r\n") @@ -320,7 +324,7 @@ func TestClientPubWithQueueSub(t *testing.T) { t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num) } // Threshold for randomness for now - if n1 < 2 || n2 < 2 { + if n1 < 20 || n2 < 20 { t.Fatalf("Received wrong # of msgs per subscriber: %d - %d\n", n1, n2) } } @@ -561,3 +565,47 @@ func TestTwoTokenPubMatchSingleTokenSub(t *testing.T) { t.Fatalf("PONG response was expected, got: %q\n", l) } } + +func TestUnsubRace(t *testing.T) { + s := RunServer(nil) + defer s.Shutdown() + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", + DefaultOptions.Host, + DefaultOptions.Port)) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc.Close() + + ncp, err := nats.Connect(fmt.Sprintf("nats://%s:%d", + DefaultOptions.Host, + DefaultOptions.Port)) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer ncp.Close() + + sub, _ := nc.Subscribe("foo", func(m *nats.Msg) { + // Just eat it.. + }) + + nc.Flush() + + var wg sync.WaitGroup + + wg.Add(1) + + go func() { + for i := 0; i < 10000; i++ { + ncp.Publish("foo", []byte("hello")) + } + wg.Done() + }() + + time.Sleep(5 * time.Millisecond) + + sub.Unsubscribe() + + wg.Wait() +} diff --git a/test/proto_test.go b/test/proto_test.go index ec5aa6d4..8a1248ea 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -72,6 +72,9 @@ func TestUnsubMax(t *testing.T) { for i := 0; i < 100; i++ { send("PUB foo 2\r\nok\r\n") } + + time.Sleep(50 * time.Millisecond) + matches := expectMsgs(2) checkMsg(t, matches[0], "foo", "22", "", "2", "ok") checkMsg(t, matches[1], "foo", "22", "", "2", "ok")