From 049db6e854c93b83f714980c814502fa5ec70472 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 1 Jun 2018 13:43:15 -0600 Subject: [PATCH] Support for queue subscriber retries over routes Signed-off-by: Derek Collison --- server/client.go | 180 ++++++++++++++++++---------------------- server/client_test.go | 62 ++++++++++++++ server/const.go | 3 + server/opts.go | 4 + server/opts_test.go | 1 + server/route.go | 186 ++++++++++++++++++++++++++++++++++++++++++ server/routes_test.go | 58 +++++++++---- server/server.go | 27 ++++-- 8 files changed, 397 insertions(+), 124 deletions(-) diff --git a/server/client.go b/server/client.go index 04bb1c08..6e4423b0 100644 --- a/server/client.go +++ b/server/client.go @@ -47,8 +47,9 @@ func init() { const ( // Scratch buffer size for the processMsg() calls. - msgScratchSize = 512 - msgHeadProto = "MSG " + msgScratchSize = 512 + msgHeadProto = "MSG " + msgHeadProtoLen = len(msgHeadProto) ) // For controlling dynamic buffer sizes. @@ -987,7 +988,7 @@ func (c *client) processPub(arg []byte) error { } if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) { - c.sendErr("Invalid Subject") + c.sendErr("Invalid Publish Subject") } return nil } @@ -1093,6 +1094,7 @@ func (c *client) canSubscribe(sub []byte) bool { return len(c.perms.sub.Match(string(sub)).psubs) > 0 } +// Low level unsubscribe for a given client. func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() @@ -1103,10 +1105,21 @@ func (c *client) unsubscribe(sub *subscription) { return } c.traceOp("<-> %s", "DELSUB", sub.sid) + delete(c.subs, string(sub.sid)) if c.srv != nil { c.srv.sl.Remove(sub) } + + // If we are a queue subscriber on a client connection and we have routes, + // we will remember the remote sid and the queue group in case a route + // tries to deliver us a message. Remote queue subscribers are directed + // so we need to know what to do to avoid unnecessary message drops + // from [auto-]unsubscribe. + if c.typ == CLIENT && c.srv != nil && + len(sub.queue) > 0 && c.srv.NumRoutes() > 0 { + c.srv.holdRemoteQSub(sub) + } } func (c *client) processUnsub(arg []byte) error { @@ -1306,6 +1319,16 @@ func (c *client) pubAllowed() bool { return allowed } +// prepMsgHeader will prepare the message header prefix +func (c *client) prepMsgHeader() []byte { + // Use the scratch buffer.. + msgh := c.msgb[:msgHeadProtoLen] + + // msg header + msgh = append(msgh, c.pa.subject...) + return append(msgh, ' ') +} + // processMsg is called to process an inbound msg from a client. func (c *client) processMsg(msg []byte) { // Snapshot server. @@ -1334,6 +1357,8 @@ func (c *client) processMsg(msg []byte) { return } + // Match the subscriptions. We will use our own L1 map if + // it's still valid, avoiding contention on the shared sublist. var r *SublistResult var ok bool @@ -1371,114 +1396,67 @@ func (c *client) processMsg(msg []byte) { return } - // Check for pedantic and bad subject. - if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) { + if c.typ == ROUTER { + c.processRoutedMsg(r, msg) return } - // Scratch buffer.. - msgh := c.msgb[:len(msgHeadProto)] - - // msg header - msgh = append(msgh, c.pa.subject...) - msgh = append(msgh, ' ') + // Client connection processing here. + msgh := c.prepMsgHeader() si := len(msgh) - isRoute := c.typ == ROUTER - isRouteQsub := false + // Used to only send messages once across any given route. + var rmap map[string]struct{} - // If we are a route and we have a queue subscription, deliver direct - // since they are sent direct via L2 semantics. If the match is a queue - // subscription, we will return from here regardless if we find a sub. - if isRoute { - isQueue, sub, err := srv.routeSidQueueSubscriber(c.pa.sid) - if isQueue { - // We got an invalid QRSID, so stop here - if err != nil { - c.Errorf("Unable to deliver messaage: %v", err) - return + // Loop over all normal subscriptions that match. + for _, sub := range r.psubs { + // Check if this is a send to a ROUTER, make sure we only send it + // once. The other side will handle the appropriate re-processing + // and fan-out. Also enforce 1-Hop semantics, so no routing to another. + if sub.client.typ == ROUTER { + // Check to see if we have already sent it here. + if rmap == nil { + rmap = make(map[string]struct{}, srv.numRoutes()) } + sub.client.mu.Lock() + if sub.client.nc == nil || + sub.client.route == nil || + sub.client.route.remoteID == "" { + c.Debugf("Bad or Missing ROUTER Identity, not processing msg") + sub.client.mu.Unlock() + continue + } + if _, ok := rmap[sub.client.route.remoteID]; ok { + c.Debugf("Ignoring route, already processed and sent msg") + sub.client.mu.Unlock() + continue + } + rmap[sub.client.route.remoteID] = routeSeen + sub.client.mu.Unlock() + } + // Normal delivery + mh := c.msgHeader(msgh[:si], sub) + c.deliverMsg(sub, mh, msg) + } + + // Check to see if we have our own rand yet. Global rand + // has contention with lots of clients, etc. + if c.in.prand == nil { + c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + } + // Process queue subs + for i := 0; i < len(r.qsubs); i++ { + qsubs := r.qsubs[i] + // Find a subscription that is able to deliver this message + // starting at a random index. + startIndex := c.in.prand.Intn(len(qsubs)) + for i := 0; i < len(qsubs); i++ { + index := (startIndex + i) % len(qsubs) + sub := qsubs[index] if sub != nil { mh := c.msgHeader(msgh[:si], sub) if c.deliverMsg(sub, mh, msg) { - return - } - } - isRouteQsub = true - // At this point we know fo sure that it's a queue subscription and - // we didn't make a delivery attempt, because either a subscriber limit - // was exceeded or a subscription is already gone. - // So, let the code below find yet another matching subscription. - // We are at risk that a message might go back and forth between routes - // during these attempts, but at the end it shall either be delivered - // (at most once) or dropped. - } - } - - // Don't process normal subscriptions in case of a queue subscription resend. - // Otherwise, we'd end up with potentially delivering the same message twice. - if !isRouteQsub { - // Used to only send normal subscriptions once across a given route. - var rmap map[string]struct{} - - // Loop over all normal subscriptions that match. - for _, sub := range r.psubs { - // Check if this is a send to a ROUTER, make sure we only send it - // once. The other side will handle the appropriate re-processing - // and fan-out. Also enforce 1-Hop semantics, so no routing to another. - if sub.client.typ == ROUTER { - // Skip if sourced from a ROUTER and going to another ROUTER. - // This is 1-Hop semantics for ROUTERs. - if isRoute { - continue - } - // Check to see if we have already sent it here. - if rmap == nil { - rmap = make(map[string]struct{}, srv.numRoutes()) - } - sub.client.mu.Lock() - if sub.client.nc == nil || sub.client.route == nil || - sub.client.route.remoteID == "" { - c.Debugf("Bad or Missing ROUTER Identity, not processing msg") - sub.client.mu.Unlock() - continue - } - if _, ok := rmap[sub.client.route.remoteID]; ok { - c.Debugf("Ignoring route, already processed") - sub.client.mu.Unlock() - continue - } - rmap[sub.client.route.remoteID] = routeSeen - sub.client.mu.Unlock() - } - // Normal delivery - mh := c.msgHeader(msgh[:si], sub) - c.deliverMsg(sub, mh, msg) - } - } - - // Now process any queue subs we have if not a route... - // or if we did not make a delivery attempt yet. - if isRouteQsub || !isRoute { - // Check to see if we have our own rand yet. Global rand - // has contention with lots of clients, etc. - if c.in.prand == nil { - c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano())) - } - // Process queue subs - for i := 0; i < len(r.qsubs); i++ { - qsubs := r.qsubs[i] - // Find a subscription that is able to deliver this message - // starting at a random index. - startIndex := c.in.prand.Intn(len(qsubs)) - for i := 0; i < len(qsubs); i++ { - index := (startIndex + i) % len(qsubs) - sub := qsubs[index] - if sub != nil { - mh := c.msgHeader(msgh[:si], sub) - if c.deliverMsg(sub, mh, msg) { - break - } + break } } } diff --git a/server/client_test.go b/server/client_test.go index 3e71810e..8b487572 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -23,6 +23,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "testing" "time" @@ -911,3 +912,64 @@ func TestDynamicBuffers(t *testing.T) { } c.checkBuffers(t, minBufSize, minBufSize) } + +// Similar to the routed version. Make sure we receive all of the +// messages with auto-unsubscribe enabled. +func TestQueueAutoUnsubscribe(t *testing.T) { + opts := DefaultOptions() + s := RunServer(opts) + defer s.Shutdown() + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + rbar := int32(0) + barCb := func(m *nats.Msg) { + atomic.AddInt32(&rbar, 1) + } + rbaz := int32(0) + bazCb := func(m *nats.Msg) { + atomic.AddInt32(&rbaz, 1) + } + + // Create 1000 subscriptions with auto-unsubscribe of 1. + // Do two groups, one bar and one baz. + for i := 0; i < 1000; i++ { + qsub, err := nc.QueueSubscribe("foo", "bar", barCb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if err := qsub.AutoUnsubscribe(1); err != nil { + t.Fatalf("Error on auto-unsubscribe: %v", err) + } + qsub, err = nc.QueueSubscribe("foo", "baz", bazCb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if err := qsub.AutoUnsubscribe(1); err != nil { + t.Fatalf("Error on auto-unsubscribe: %v", err) + } + } + nc.Flush() + + expected := int32(1000) + for i := int32(0); i < expected; i++ { + nc.Publish("foo", []byte("Don't Drop Me!")) + } + nc.Flush() + + wait := time.Now().Add(5 * time.Second) + for time.Now().Before(wait) { + nbar := atomic.LoadInt32(&rbar) + nbaz := atomic.LoadInt32(&rbaz) + if nbar == expected && nbaz == expected { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'\n", + expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz)) +} diff --git a/server/const.go b/server/const.go index e1540ae9..ec5a161e 100644 --- a/server/const.go +++ b/server/const.go @@ -109,4 +109,7 @@ const ( // MAX_PUB_ARGS Maximum possible number of arguments from PUB proto. MAX_PUB_ARGS = 3 + + // DEFAULT_REMOTE_QSUBS_SWEEPER + DEFAULT_REMOTE_QSUBS_SWEEPER = 30 * time.Second ) diff --git a/server/opts.go b/server/opts.go index 7fb5346e..095d71ca 100644 --- a/server/opts.go +++ b/server/opts.go @@ -87,6 +87,7 @@ type Options struct { TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` WriteDeadline time.Duration `json:"-"` + RQSubsSweep time.Duration `json:"-"` CustomClientAuthentication Authentication `json:"-"` CustomRouterAuthentication Authentication `json:"-"` @@ -949,6 +950,9 @@ func processOptions(opts *Options) { if opts.WriteDeadline == time.Duration(0) { opts.WriteDeadline = DEFAULT_FLUSH_DEADLINE } + if opts.RQSubsSweep == time.Duration(0) { + opts.RQSubsSweep = DEFAULT_REMOTE_QSUBS_SWEEPER + } } // ConfigureOptions accepts a flag set and augment it with NATS Server diff --git a/server/opts_test.go b/server/opts_test.go index 9ab69b3b..8b54091b 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -40,6 +40,7 @@ func TestDefaultOptions(t *testing.T) { MaxPayload: MAX_PAYLOAD_SIZE, MaxPending: MAX_PENDING_SIZE, WriteDeadline: DEFAULT_FLUSH_DEADLINE, + RQSubsSweep: DEFAULT_REMOTE_QSUBS_SWEEPER, } opts := &Options{} diff --git a/server/route.go b/server/route.go index af1ebde7..bd210567 100644 --- a/server/route.go +++ b/server/route.go @@ -61,12 +61,196 @@ type connectInfo struct { Name string `json:"name"` } +// Used to hold onto mappings for unsubscribed +// routed queue subscribers. +type rqsub struct { + group []byte + atime time.Time +} + // Route protocol constants const ( ConProto = "CONNECT %s" + _CRLF_ InfoProto = "INFO %s" + _CRLF_ ) +// Clear up the timer and any map held for remote qsubs. +func (s *Server) clearRemoteQSubs() { + s.rqsMu.Lock() + defer s.rqsMu.Unlock() + if s.rqsubsTimer != nil { + s.rqsubsTimer.Stop() + s.rqsubsTimer = nil + } + s.rqsubs = nil +} + +// Check to see if we can remove any of the remote qsubs mappings +func (s *Server) purgeRemoteQSubs() { + ri := s.getOpts().RQSubsSweep + s.rqsMu.Lock() + exp := time.Now().Add(-ri) + for k, rqsub := range s.rqsubs { + if exp.After(rqsub.atime) { + delete(s.rqsubs, k) + } + } + if s.rqsubsTimer != nil { + // Reset timer. + s.rqsubsTimer = time.AfterFunc(ri, s.purgeRemoteQSubs) + } + s.rqsMu.Unlock() +} + +// Lookup a remote queue group sid. +func (s *Server) lookupRemoteQGroup(sid string) []byte { + s.rqsMu.RLock() + rqsub := s.rqsubs[sid] + s.rqsMu.RUnlock() + return rqsub.group +} + +// This will hold onto a remote queue subscriber to allow +// for mapping and handling if we get a message after the +// subscription goes away. +func (s *Server) holdRemoteQSub(sub *subscription) { + // Should not happen, but protect anyway. + if len(sub.queue) == 0 { + return + } + // Add the entry + s.rqsMu.Lock() + // Start timer if needed. + if s.rqsubsTimer == nil { + ri := s.getOpts().RQSubsSweep + s.rqsubsTimer = time.AfterFunc(ri, s.purgeRemoteQSubs) + } + // Create map if needed. + if s.rqsubs == nil { + s.rqsubs = make(map[string]rqsub) + } + group := make([]byte, len(sub.queue)) + copy(group, sub.queue) + rqsub := rqsub{group: group, atime: time.Now()} + s.rqsubs[routeSid(sub)] = rqsub + s.rqsMu.Unlock() +} + +// This is for when we receive a directed message for a queue subscriber +// that has gone away. We reroute like a new message but scope to only +// the queue subscribers that it was originally intended for. We will +// prefer local clients, but will bounce to another route if needed. +func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { + c.Debugf("Attempting redelivery of message for absent queue subscriber on group '%q'", group) + + // We only care about qsubs here. Data structure not setup for optimized + // lookup for our specific group however. + + var qsubs []*subscription + for _, qs := range r.qsubs { + if len(qs) != 0 && bytes.Compare(group, qs[0].queue) == 0 { + qsubs = qs + break + } + } + + // If no match return. + if qsubs == nil { + c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group) + return + } + + // We have a matched group of queue subscribers. + // We prefer a local subscriber since that was the original target. + + // Spin prand if needed. + if c.in.prand == nil { + c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + } + + // Hold onto a remote if we come across it to utilize in case no locals exist. + var rsub *subscription + + startIndex := c.in.prand.Intn(len(qsubs)) + for i := 0; i < len(qsubs); i++ { + index := (startIndex + i) % len(qsubs) + sub := qsubs[index] + if sub == nil { + continue + } + if rsub == nil && bytes.HasPrefix(sub.sid, []byte(QRSID)) { + rsub = sub + continue + } + mh := c.msgHeader(msgh[:len(msgh)], sub) + if c.deliverMsg(sub, mh, msg) { + c.Debugf("Redelivery succeeded for message on group '%q'", group) + return + } + } + // If we are here we failed to find a local, see if we snapshotted a + // remote sub, and if so deliver to that. + if rsub != nil { + mh := c.msgHeader(msgh[:len(msgh)], rsub) + if c.deliverMsg(rsub, mh, msg) { + c.Debugf("Re-routing message on group '%q' to remote server", group) + return + } + } + c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group) +} + +// processRoutedMsg processes messages inbound from a route. +func (c *client) processRoutedMsg(r *SublistResult, msg []byte) { + // Snapshot server. + srv := c.srv + + msgh := c.prepMsgHeader() + si := len(msgh) + + // If we have a queue subscription, deliver direct + // since they are sent direct via L2 semantics over routes. + // If the match is a queue subscription, we will return from + // here regardless if we find a sub. + isq, sub, err := srv.routeSidQueueSubscriber(c.pa.sid) + if isq { + if err != nil { + // We got an invalid QRSID, so stop here + c.Errorf("Unable to deliver routed queue message: %v", err) + return + } + didDeliver := false + if sub != nil { + mh := c.msgHeader(msgh[:si], sub) + didDeliver = c.deliverMsg(sub, mh, msg) + } + if !didDeliver && c.srv != nil { + group := c.srv.lookupRemoteQGroup(string(c.pa.sid)) + c.reRouteQMsg(r, msgh, msg, group) + } + return + } + // Normal pub/sub message here + // Loop over all normal subscriptions that match. + for _, sub := range r.psubs { + // Check if this is a send to a ROUTER, if so we ignore to + // enforce 1-hop semantics. + if sub.client.typ == ROUTER { + continue + } + sub.client.mu.Lock() + if sub.client.nc == nil { + sub.client.mu.Unlock() + continue + } + sub.client.mu.Unlock() + + // Normal delivery + mh := c.msgHeader(msgh[:si], sub) + c.deliverMsg(sub, mh, msg) + } +} + // Lock should be held entering here. func (c *client) sendConnect(tlsRequired bool) { var user, pass string @@ -498,6 +682,8 @@ func (s *Server) routeSidQueueSubscriber(rsid []byte) (bool, *subscription, erro return true, nil, nil } +// Creates a routable sid that can be used +// to reach remote subscriptions. func routeSid(sub *subscription) string { var qi string if len(sub.queue) > 0 { diff --git a/server/routes_test.go b/server/routes_test.go index 3c34992d..5bc2ccd6 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -872,9 +872,10 @@ func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) { nc.Close() } -func TestRoutedQueueUnsubscribe(t *testing.T) { +func TestRoutedQueueAutoUnsubscribe(t *testing.T) { optsA, _ := ProcessConfigFile("./configs/seed.conf") optsA.NoSigs, optsA.NoLog = true, true + optsA.RQSubsSweep = 250 * time.Millisecond srvA := RunServer(optsA) defer srvA.Shutdown() @@ -900,16 +901,28 @@ func TestRoutedQueueUnsubscribe(t *testing.T) { } defer ncB.Close() - received := int32(0) - cb := func(m *nats.Msg) { - atomic.AddInt32(&received, 1) + rbar := int32(0) + barCb := func(m *nats.Msg) { + atomic.AddInt32(&rbar, 1) + } + rbaz := int32(0) + bazCb := func(m *nats.Msg) { + atomic.AddInt32(&rbaz, 1) } - // Create 50 queue subs with auto-unsubscribe to each server. + // Create 250 queue subs with auto-unsubscribe to each server for + // group bar and group baz. So 500 total per queue group. cons := []*nats.Conn{ncA, ncB} for _, c := range cons { - for i := 0; i < 50; i++ { - qsub, err := c.QueueSubscribe("foo", "bar", cb) + for i := 0; i < 250; i++ { + qsub, err := c.QueueSubscribe("foo", "bar", barCb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if err := qsub.AutoUnsubscribe(1); err != nil { + t.Fatalf("Error on auto-unsubscribe: %v", err) + } + qsub, err = c.QueueSubscribe("foo", "baz", bazCb) if err != nil { t.Fatalf("Error on subscribe: %v", err) } @@ -920,24 +933,39 @@ func TestRoutedQueueUnsubscribe(t *testing.T) { c.Flush() } - total := 100 + expected := int32(500) // Now send messages from each server - for i := 0; i < total; i++ { + for i := int32(0); i < expected; i++ { c := cons[i%2] - c.Publish("foo", []byte("hello")) + c.Publish("foo", []byte("Don't Drop Me!")) } for _, c := range cons { c.Flush() } - timeout := time.Now().Add(2 * time.Second) - for time.Now().Before(timeout) { - if atomic.LoadInt32(&received) == int32(total) { + wait := time.Now().Add(5 * time.Second) + for time.Now().Before(wait) { + nbar := atomic.LoadInt32(&rbar) + nbaz := atomic.LoadInt32(&rbaz) + if nbar == expected && nbaz == expected { + time.Sleep(500 * time.Millisecond) + // Now check all mappings are gone. + srvA.mu.Lock() + nrqsa := len(srvA.rqsubs) + srvA.mu.Unlock() + srvB.mu.Lock() + nrqsb := len(srvB.rqsubs) + srvB.mu.Unlock() + if nrqsa != 0 || nrqsb != 0 { + t.Fatalf("Expected rqs mappings to have cleared, but got A:%d, B:%d\n", + nrqsa, nrqsb) + } return } - time.Sleep(15 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } - t.Fatalf("Should have received %v messages, got %v", total, atomic.LoadInt32(&received)) + t.Fatalf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'\n", + expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz)) } func TestRouteFailedConnRemovedFromTmpMap(t *testing.T) { diff --git a/server/server.go b/server/server.go index 75b38fe9..fdff47bd 100644 --- a/server/server.go +++ b/server/server.go @@ -83,12 +83,20 @@ type Server struct { routeInfo Info routeInfoJSON []byte quitCh chan struct{} - grMu sync.Mutex - grTmpClients map[uint64]*client - grRunning bool - grWG sync.WaitGroup // to wait on various go routines - cproto int64 // number of clients supporting async INFO - configTime time.Time // last time config was loaded + + // Tracking for remote QRSID tags. + rqsMu sync.RWMutex + rqsubs map[string]rqsub + rqsubsTimer *time.Timer + + // Tracking Go routines + grMu sync.Mutex + grTmpClients map[uint64]*client + grRunning bool + grWG sync.WaitGroup // to wait on various go routines + + cproto int64 // number of clients supporting async INFO + configTime time.Time // last time config was loaded logging struct { sync.RWMutex @@ -383,6 +391,8 @@ func (s *Server) Shutdown() { s.profiler.Close() } + // Clear any remote qsub mappings + s.clearRemoteQSubs() s.mu.Unlock() // Release go routines that wait on that channel @@ -965,8 +975,9 @@ func (s *Server) removeClient(c *client) { // NumRoutes will report the number of registered routes. func (s *Server) NumRoutes() int { s.mu.Lock() - defer s.mu.Unlock() - return len(s.routes) + nr := len(s.routes) + s.mu.Unlock() + return nr } // NumRemotes will report number of registered remotes.