diff --git a/server/client.go b/server/client.go index 43206336..1d2d9fa1 100644 --- a/server/client.go +++ b/server/client.go @@ -165,6 +165,8 @@ func (c *client) readLoop() { c.closeConnection() return } + // Grab for updates for last activity. + last := time.Now() if err := c.parse(b[:n]); err != nil { // handled inline if err != ErrMaxPayload && err != ErrAuthorization { @@ -187,6 +189,9 @@ func (c *client) readLoop() { cp.mu.Unlock() cp.closeConnection() cp.mu.Lock() + } else { + // Update outbound last activity. + cp.last = last } } cp.mu.Unlock() @@ -195,6 +200,7 @@ func (c *client) readLoop() { // Check to see if we got closed, e.g. slow consumer c.mu.Lock() nc := c.nc + c.last = last c.mu.Unlock() if nc == nil { return @@ -454,10 +460,6 @@ func (c *client) processPub(arg []byte) error { if c.opts.Pedantic && !sublist.IsValidLiteralSubject(c.pa.subject) { c.sendErr("Invalid Subject") } - // Update last activity. - c.mu.Lock() - c.last = time.Now() - c.mu.Unlock() return nil } @@ -511,8 +513,6 @@ func (c *client) processSub(argo []byte) (err error) { c.mu.Unlock() return nil } - // Update last activity. - c.last = time.Now() // We can have two SUB protocols coming from a route due to some // race conditions. We should make sure that we process only one. @@ -657,9 +657,6 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { // Update statistics - // Update last activity. - client.last = time.Now() - // The msg includes the CR_LF, so pull back out for accounting. msgSize := int64(len(msg) - LEN_CR_LF) @@ -736,11 +733,6 @@ func (c *client) processMsg(msg []byte) { // Snapshot server. srv := c.srv - if srv != nil { - atomic.AddInt64(&srv.inMsgs, 1) - atomic.AddInt64(&srv.inBytes, msgSize) - } - if c.trace { c.traceMsg(msg) } @@ -751,6 +743,10 @@ func (c *client) processMsg(msg []byte) { return } + // Accounting + atomic.AddInt64(&srv.inMsgs, 1) + atomic.AddInt64(&srv.inBytes, msgSize) + r := srv.sl.Match(c.pa.subject) if len(r) <= 0 { return @@ -764,8 +760,6 @@ func (c *client) processMsg(msg []byte) { msgh = append(msgh, ' ') si := len(msgh) - seenQ := map[string]struct{}{} - isRoute := c.typ == ROUTER var rmap map[string]struct{} @@ -782,28 +776,40 @@ func (c *client) processMsg(msg []byte) { } } - // Loop over all subscriptions that match. - // We randomize/shuffle the list to optimize a bit on queue subscribers. - indexes := rand.Perm(len(r)) - for _, i := range indexes { - sub := r[i].(*subscription) + var qmap map[string][]*subscription + // Loop over all subscriptions that match. + for _, v := range r { + sub := v.(*subscription) + + // Process queue group subscriptions by gathering them all up + // here. We will pick the winners when we are done processing + // all of the subscriptions. if sub.queue != nil { // Queue subscriptions handled from routes directly above. if isRoute { continue } - qname := string(sub.queue) - if _, ok := seenQ[qname]; ok { - continue + // FIXME(dlc), this can be more efficient + if qmap == nil { + qmap = make(map[string][]*subscription) } - seenQ[qname] = struct{}{} + qname := string(sub.queue) + qsubs := qmap[qname] + if qsubs == nil { + qsubs = make([]*subscription, 0, 4) + } + qsubs = append(qsubs, sub) + qmap[qname] = qsubs + continue } + // Process normal, non-queue group subscriptions. + // If this is a send to a ROUTER, make sure we only send it // once. The other side will handle the appropriate re-processing. // Also enforce 1-Hop. - if sub.client.typ == ROUTER && sub.queue == nil { + if sub.client.typ == ROUTER { // Skip if sourced from a ROUTER and going to another ROUTER. // This is 1-Hop semantics for ROUTERs. if isRoute { @@ -829,10 +835,18 @@ func (c *client) processMsg(msg []byte) { sub.client.mu.Unlock() } - // Process subscription. mh := c.msgHeader(msgh[:si], sub) c.deliverMsg(sub, mh, msg) } + + if qmap != nil { + for _, qsubs := range qmap { + index := rand.Intn(len(qsubs)) + sub := qsubs[index] + mh := c.msgHeader(msgh[:si], sub) + c.deliverMsg(sub, mh, msg) + } + } } func (c *client) processPingTimer() { diff --git a/server/util_test.go b/server/util_test.go index e2aa4153..8ce10b65 100644 --- a/server/util_test.go +++ b/server/util_test.go @@ -1,11 +1,13 @@ -// Copyright 2014 Apcera Inc. All rights reserved. +// Copyright 2014-2016 Apcera Inc. All rights reserved. package server import ( + "math/rand" "strconv" "sync" "testing" + "time" ) func TestParseSize(t *testing.T) { @@ -69,3 +71,43 @@ func BenchmarkNoDeferMutex(b *testing.B) { noDeferUnlock(&mu) } } + +func createTestSub() *subscription { + return &subscription{ + subject: []byte("foo"), + queue: []byte("bar"), + sid: []byte("22"), + } +} + +func BenchmarkArrayRand(b *testing.B) { + b.StopTimer() + r := rand.New(rand.NewSource(time.Now().UnixNano())) + // Create an array of 10 items + subs := []*subscription{} + for i := 0; i < 10; i++ { + subs = append(subs, createTestSub()) + } + b.StartTimer() + + for i := 0; i < b.N; i++ { + index := r.Intn(len(subs)) + _ = subs[index] + } +} + +func BenchmarkMapRange(b *testing.B) { + b.StopTimer() + // Create an map of 10 items + subs := map[int]*subscription{} + for i := 0; i < 10; i++ { + subs[i] = createTestSub() + } + b.StartTimer() + + for i := 0; i < b.N; i++ { + for _, _ = range subs { + break + } + } +} diff --git a/sublist/sublist.go b/sublist/sublist.go index 25fbfe0f..da423699 100644 --- a/sublist/sublist.go +++ b/sublist/sublist.go @@ -1,4 +1,4 @@ -// Copyright 2012-2015 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. // Sublist is a subject distribution data structure that can match subjects to // interested subscribers. Subscribers can have wildcard subjects to match @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - "github.com/nats-io/gnatsd/hash" "github.com/nats-io/gnatsd/hashmap" ) @@ -63,7 +62,6 @@ func newNode() *node { // algortihm for the tokens, which should be short. func newLevel() *level { h := hashmap.New() - h.Hash = hash.FNV1A return &level{nodes: h} } @@ -81,7 +79,7 @@ func New() *Sublist { } // Common byte variables for wildcards and token separator. -var ( +const ( _PWC = byte('*') _FWC = byte('>') _SEP = byte('.')