Fix races, add a test and adjust others.

* There was a race during unsubscribe()
* 'go test -race' reports a race in TestSetLogger test. This one could be ignored since we normally invoke SetLogger only on server startup. That being said, Travis failed when I tried to submit a PR for the fix of the unsubscribe race. So proposing to fix the logger too.
This commit is contained in:
Ivan Kozlovic
2015-11-18 20:39:56 -07:00
parent 200f2d95c6
commit 97da466312
3 changed files with 81 additions and 12 deletions

View File

@@ -52,6 +52,8 @@ type client struct {
route *route
sendLocalSubs bool
debug bool
trace bool
}
func (c *client) String() (id string) {
@@ -95,6 +97,8 @@ func (c *client) initClient() {
c.cid = atomic.AddUint64(&s.gcid, 1)
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
c.subs = hashmap.New()
c.debug = (atomic.LoadInt32(&debug) != 0)
c.trace = (atomic.LoadInt32(&trace) != 0)
// This is a scratch buffer used for processMsg()
// The msg header starts with "MSG ",
@@ -191,7 +195,7 @@ func (c *client) readLoop() {
}
func (c *client) traceMsg(msg []byte) {
if trace == 0 {
if !c.trace {
return
}
// FIXME(dlc), allow limits to printable payload
@@ -207,7 +211,7 @@ func (c *client) traceOutOp(op string, arg []byte) {
}
func (c *client) traceOp(format, op string, arg []byte) {
if trace == 0 {
if !c.trace {
return
}
@@ -329,7 +333,7 @@ func (c *client) processPong() {
}
func (c *client) processMsgArgs(arg []byte) error {
if trace == 1 {
if c.trace {
c.traceInOp("MSG", arg)
}
@@ -378,7 +382,7 @@ func (c *client) processMsgArgs(arg []byte) error {
}
func (c *client) processPub(arg []byte) error {
if trace == 1 {
if c.trace {
c.traceInOp("PUB", arg)
}
@@ -538,17 +542,31 @@ func (c *client) processUnsub(arg []byte) error {
default:
return fmt.Errorf("processUnsub Parse Error: '%s'", arg)
}
if sub, ok := (c.subs.Get(sid)).(*subscription); ok {
var sub *subscription
unsub := false
shouldForward := false
ok := false
c.mu.Lock()
if sub, ok = (c.subs.Get(sid)).(*subscription); ok {
if max > 0 {
sub.max = int64(max)
} else {
// Clear it here to override
sub.max = 0
}
unsub = true
shouldForward = c.typ != ROUTER && c.srv != nil
}
c.mu.Unlock()
if unsub {
c.unsubscribe(sub)
if shouldForward := c.typ != ROUTER && c.srv != nil; shouldForward {
c.srv.broadcastUnSubscribe(sub)
}
}
if shouldForward {
c.srv.broadcastUnSubscribe(sub)
}
if c.opts.Verbose {
c.sendOK()
@@ -641,7 +659,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
goto writeErr
}
if trace == 1 {
if c.trace {
client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil)
}
@@ -688,7 +706,7 @@ func (c *client) processMsg(msg []byte) {
atomic.AddInt64(&srv.inBytes, msgSize)
}
if trace == 1 {
if c.trace {
c.traceMsg(msg)
}
if c.opts.Verbose {

View File

@@ -4,12 +4,16 @@ import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"net"
"reflect"
"regexp"
"strings"
"sync"
"testing"
"time"
"github.com/nats-io/nats"
)
type serverInfo struct {
@@ -280,7 +284,7 @@ func TestClientNoBodyPubSubWithReply(t *testing.T) {
func TestClientPubWithQueueSub(t *testing.T) {
_, c, cr := setupClient()
num := 10
num := 100
// Queue SUB/PUB
subs := []byte("SUB foo g1 1\r\nSUB foo g1 2\r\n")
@@ -320,7 +324,7 @@ func TestClientPubWithQueueSub(t *testing.T) {
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num)
}
// Threshold for randomness for now
if n1 < 2 || n2 < 2 {
if n1 < 20 || n2 < 20 {
t.Fatalf("Received wrong # of msgs per subscriber: %d - %d\n", n1, n2)
}
}
@@ -561,3 +565,47 @@ func TestTwoTokenPubMatchSingleTokenSub(t *testing.T) {
t.Fatalf("PONG response was expected, got: %q\n", l)
}
}
func TestUnsubRace(t *testing.T) {
s := RunServer(nil)
defer s.Shutdown()
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d",
DefaultOptions.Host,
DefaultOptions.Port))
if err != nil {
t.Fatalf("Error creating client: %v\n", err)
}
defer nc.Close()
ncp, err := nats.Connect(fmt.Sprintf("nats://%s:%d",
DefaultOptions.Host,
DefaultOptions.Port))
if err != nil {
t.Fatalf("Error creating client: %v\n", err)
}
defer ncp.Close()
sub, _ := nc.Subscribe("foo", func(m *nats.Msg) {
// Just eat it..
})
nc.Flush()
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 10000; i++ {
ncp.Publish("foo", []byte("hello"))
}
wg.Done()
}()
time.Sleep(5 * time.Millisecond)
sub.Unsubscribe()
wg.Wait()
}

View File

@@ -72,6 +72,9 @@ func TestUnsubMax(t *testing.T) {
for i := 0; i < 100; i++ {
send("PUB foo 2\r\nok\r\n")
}
time.Sleep(50 * time.Millisecond)
matches := expectMsgs(2)
checkMsg(t, matches[0], "foo", "22", "", "2", "ok")
checkMsg(t, matches[1], "foo", "22", "", "2", "ok")