diff --git a/TODO.md b/TODO.md index bdd399ae..1f03b855 100644 --- a/TODO.md +++ b/TODO.md @@ -12,7 +12,6 @@ - [ ] Buffer pools/sync pools? - [ ] IOVec pools and writev for high fanout? - [ ] Add ability to reload config on signal -- [ ] NewSource on Rand to lower lock contention on QueueSubs, or redesign! - [ ] Add ENV and variable support to dconf - [ ] Modify cluster support for single message across routes between pub/sub and d-queue - [ ] Memory limits/warnings? @@ -20,6 +19,7 @@ - [ ] Info updates contain other implicit route servers - [ ] Multi-tenant accounts with isolation of subject space - [ ] Pedantic state +- [X] NewSource on Rand to lower lock contention on QueueSubs, or redesign! - [X] Default sort by cid on connz - [X] Track last activity time per connection? - [X] Add total connections to varz so we won't miss spikes, etc. diff --git a/server/client.go b/server/client.go index 1c6b1f25..8cfc88ce 100644 --- a/server/client.go +++ b/server/client.go @@ -16,6 +16,10 @@ import ( "github.com/nats-io/gnatsd/sublist" ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + const ( // Scratch buffer size for the processMsg() calls. msgScratchSize = 512 @@ -750,8 +754,7 @@ func (c *client) processMsg(msg []byte) { msgh = append(msgh, ' ') si := len(msgh) - var qmap map[string][]*subscription - var qsubs []*subscription + seenQ := map[string]struct{}{} isRoute := c.typ == ROUTER var rmap map[string]struct{} @@ -770,37 +773,27 @@ func (c *client) processMsg(msg []byte) { } // Loop over all subscriptions that match. - for _, v := range r { - sub := v.(*subscription) + // We randomize/shuffle the list to optimize a bit on queue subscribers. + indexes := rand.Perm(len(r)) + for _, i := range indexes { + sub := r[i].(*subscription) - // Process queue group subscriptions by gathering them all up - // here. We will pick the winners when we are done processing - // all of the subscriptions. if sub.queue != nil { // Queue subscriptions handled from routes directly above. if isRoute { continue } - // FIXME(dlc), this can be more efficient - if qmap == nil { - qmap = make(map[string][]*subscription) - } qname := string(sub.queue) - qsubs = qmap[qname] - if qsubs == nil { - qsubs = make([]*subscription, 0, 4) + if _, ok := seenQ[qname]; ok { + continue } - qsubs = append(qsubs, sub) - qmap[qname] = qsubs - continue + seenQ[qname] = struct{}{} } - // Process normal, non-queue group subscriptions. - // If this is a send to a ROUTER, make sure we only send it // once. The other side will handle the appropriate re-processing. // Also enforce 1-Hop. - if sub.client.typ == ROUTER { + if sub.client.typ == ROUTER && sub.queue == nil { // Skip if sourced from a ROUTER and going to another ROUTER. // This is 1-Hop semantics for ROUTERs. if isRoute { @@ -826,18 +819,10 @@ func (c *client) processMsg(msg []byte) { sub.client.mu.Unlock() } + // Process subscription. mh := c.msgHeader(msgh[:si], sub) c.deliverMsg(sub, mh, msg) } - - if qmap != nil { - for _, qsubs := range qmap { - index := rand.Int() % len(qsubs) - sub := qsubs[index] - mh := c.msgHeader(msgh[:si], sub) - c.deliverMsg(sub, mh, msg) - } - } } func (c *client) processPingTimer() { diff --git a/test/proto_test.go b/test/proto_test.go index 5b7e4855..abe259f3 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -39,8 +39,9 @@ func TestProtoBasics(t *testing.T) { // 2 Messages send("SUB * 2\r\nPUB foo 2\r\nok\r\n") matches = expectMsgs(2) - checkMsg(t, matches[0], "foo", "1", "", "2", "ok") - checkMsg(t, matches[1], "foo", "2", "", "2", "ok") + // Could arrive in any order + checkMsg(t, matches[0], "foo", "", "", "2", "ok") + checkMsg(t, matches[1], "foo", "", "", "2", "ok") } func TestProtoErr(t *testing.T) { diff --git a/test/routes_test.go b/test/routes_test.go index 9e7ab4dd..b9560de3 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -304,11 +304,20 @@ func TestRouteQueueSemantics(t *testing.T) { matches = expectMsgs(2) // Expect first to be the normal subscriber, next will be the queue one. - checkMsg(t, matches[0], "foo", "RSID:2:4", "", "2", "ok") + if string(matches[0][SID_INDEX]) != "RSID:2:4" && + string(matches[1][SID_INDEX]) != "RSID:2:4" { + t.Fatalf("Did not received routed sid\n") + } + checkMsg(t, matches[0], "foo", "", "", "2", "ok") checkMsg(t, matches[1], "foo", "", "", "2", "ok") // Check the rsid to verify it is one of the queue group subscribers. - rsid := string(matches[1][SID_INDEX]) + var rsid string + if matches[0][SID_INDEX][0] == 'Q' { + rsid = string(matches[0][SID_INDEX]) + } else { + rsid = string(matches[1][SID_INDEX]) + } if rsid != qrsid1 && rsid != qrsid2 { t.Fatalf("Expected a queue group rsid, got %s\n", rsid) }