Support for queue subscriber retries over routes

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-06-01 13:43:15 -06:00
parent 3bdab1b777
commit 049db6e854
8 changed files with 397 additions and 124 deletions

View File

@@ -47,8 +47,9 @@ func init() {
const (
// Scratch buffer size for the processMsg() calls.
msgScratchSize = 512
msgHeadProto = "MSG "
msgScratchSize = 512
msgHeadProto = "MSG "
msgHeadProtoLen = len(msgHeadProto)
)
// For controlling dynamic buffer sizes.
@@ -987,7 +988,7 @@ func (c *client) processPub(arg []byte) error {
}
if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) {
c.sendErr("Invalid Subject")
c.sendErr("Invalid Publish Subject")
}
return nil
}
@@ -1093,6 +1094,7 @@ func (c *client) canSubscribe(sub []byte) bool {
return len(c.perms.sub.Match(string(sub)).psubs) > 0
}
// Low level unsubscribe for a given client.
func (c *client) unsubscribe(sub *subscription) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -1103,10 +1105,21 @@ func (c *client) unsubscribe(sub *subscription) {
return
}
c.traceOp("<-> %s", "DELSUB", sub.sid)
delete(c.subs, string(sub.sid))
if c.srv != nil {
c.srv.sl.Remove(sub)
}
// If we are a queue subscriber on a client connection and we have routes,
// we will remember the remote sid and the queue group in case a route
// tries to deliver us a message. Remote queue subscribers are directed
// so we need to know what to do to avoid unnecessary message drops
// from [auto-]unsubscribe.
if c.typ == CLIENT && c.srv != nil &&
len(sub.queue) > 0 && c.srv.NumRoutes() > 0 {
c.srv.holdRemoteQSub(sub)
}
}
func (c *client) processUnsub(arg []byte) error {
@@ -1306,6 +1319,16 @@ func (c *client) pubAllowed() bool {
return allowed
}
// prepMsgHeader will prepare the message header prefix
func (c *client) prepMsgHeader() []byte {
// Use the scratch buffer..
msgh := c.msgb[:msgHeadProtoLen]
// msg header
msgh = append(msgh, c.pa.subject...)
return append(msgh, ' ')
}
// processMsg is called to process an inbound msg from a client.
func (c *client) processMsg(msg []byte) {
// Snapshot server.
@@ -1334,6 +1357,8 @@ func (c *client) processMsg(msg []byte) {
return
}
// Match the subscriptions. We will use our own L1 map if
// it's still valid, avoiding contention on the shared sublist.
var r *SublistResult
var ok bool
@@ -1371,114 +1396,67 @@ func (c *client) processMsg(msg []byte) {
return
}
// Check for pedantic and bad subject.
if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) {
if c.typ == ROUTER {
c.processRoutedMsg(r, msg)
return
}
// Scratch buffer..
msgh := c.msgb[:len(msgHeadProto)]
// msg header
msgh = append(msgh, c.pa.subject...)
msgh = append(msgh, ' ')
// Client connection processing here.
msgh := c.prepMsgHeader()
si := len(msgh)
isRoute := c.typ == ROUTER
isRouteQsub := false
// Used to only send messages once across any given route.
var rmap map[string]struct{}
// If we are a route and we have a queue subscription, deliver direct
// since they are sent direct via L2 semantics. If the match is a queue
// subscription, we will return from here regardless if we find a sub.
if isRoute {
isQueue, sub, err := srv.routeSidQueueSubscriber(c.pa.sid)
if isQueue {
// We got an invalid QRSID, so stop here
if err != nil {
c.Errorf("Unable to deliver messaage: %v", err)
return
// Loop over all normal subscriptions that match.
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
if sub.client.typ == ROUTER {
// Check to see if we have already sent it here.
if rmap == nil {
rmap = make(map[string]struct{}, srv.numRoutes())
}
sub.client.mu.Lock()
if sub.client.nc == nil ||
sub.client.route == nil ||
sub.client.route.remoteID == "" {
c.Debugf("Bad or Missing ROUTER Identity, not processing msg")
sub.client.mu.Unlock()
continue
}
if _, ok := rmap[sub.client.route.remoteID]; ok {
c.Debugf("Ignoring route, already processed and sent msg")
sub.client.mu.Unlock()
continue
}
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// Process queue subs
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
// Find a subscription that is able to deliver this message
// starting at a random index.
startIndex := c.in.prand.Intn(len(qsubs))
for i := 0; i < len(qsubs); i++ {
index := (startIndex + i) % len(qsubs)
sub := qsubs[index]
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
if c.deliverMsg(sub, mh, msg) {
return
}
}
isRouteQsub = true
// At this point we know fo sure that it's a queue subscription and
// we didn't make a delivery attempt, because either a subscriber limit
// was exceeded or a subscription is already gone.
// So, let the code below find yet another matching subscription.
// We are at risk that a message might go back and forth between routes
// during these attempts, but at the end it shall either be delivered
// (at most once) or dropped.
}
}
// Don't process normal subscriptions in case of a queue subscription resend.
// Otherwise, we'd end up with potentially delivering the same message twice.
if !isRouteQsub {
// Used to only send normal subscriptions once across a given route.
var rmap map[string]struct{}
// Loop over all normal subscriptions that match.
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
if sub.client.typ == ROUTER {
// Skip if sourced from a ROUTER and going to another ROUTER.
// This is 1-Hop semantics for ROUTERs.
if isRoute {
continue
}
// Check to see if we have already sent it here.
if rmap == nil {
rmap = make(map[string]struct{}, srv.numRoutes())
}
sub.client.mu.Lock()
if sub.client.nc == nil || sub.client.route == nil ||
sub.client.route.remoteID == "" {
c.Debugf("Bad or Missing ROUTER Identity, not processing msg")
sub.client.mu.Unlock()
continue
}
if _, ok := rmap[sub.client.route.remoteID]; ok {
c.Debugf("Ignoring route, already processed")
sub.client.mu.Unlock()
continue
}
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
}
// Now process any queue subs we have if not a route...
// or if we did not make a delivery attempt yet.
if isRouteQsub || !isRoute {
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// Process queue subs
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
// Find a subscription that is able to deliver this message
// starting at a random index.
startIndex := c.in.prand.Intn(len(qsubs))
for i := 0; i < len(qsubs); i++ {
index := (startIndex + i) % len(qsubs)
sub := qsubs[index]
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
if c.deliverMsg(sub, mh, msg) {
break
}
break
}
}
}

View File

@@ -23,6 +23,7 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -911,3 +912,64 @@ func TestDynamicBuffers(t *testing.T) {
}
c.checkBuffers(t, minBufSize, minBufSize)
}
// Similar to the routed version. Make sure we receive all of the
// messages with auto-unsubscribe enabled.
func TestQueueAutoUnsubscribe(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
rbar := int32(0)
barCb := func(m *nats.Msg) {
atomic.AddInt32(&rbar, 1)
}
rbaz := int32(0)
bazCb := func(m *nats.Msg) {
atomic.AddInt32(&rbaz, 1)
}
// Create 1000 subscriptions with auto-unsubscribe of 1.
// Do two groups, one bar and one baz.
for i := 0; i < 1000; i++ {
qsub, err := nc.QueueSubscribe("foo", "bar", barCb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := qsub.AutoUnsubscribe(1); err != nil {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
qsub, err = nc.QueueSubscribe("foo", "baz", bazCb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := qsub.AutoUnsubscribe(1); err != nil {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
}
nc.Flush()
expected := int32(1000)
for i := int32(0); i < expected; i++ {
nc.Publish("foo", []byte("Don't Drop Me!"))
}
nc.Flush()
wait := time.Now().Add(5 * time.Second)
for time.Now().Before(wait) {
nbar := atomic.LoadInt32(&rbar)
nbaz := atomic.LoadInt32(&rbaz)
if nbar == expected && nbaz == expected {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'\n",
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
}

View File

@@ -109,4 +109,7 @@ const (
// MAX_PUB_ARGS Maximum possible number of arguments from PUB proto.
MAX_PUB_ARGS = 3
// DEFAULT_REMOTE_QSUBS_SWEEPER
DEFAULT_REMOTE_QSUBS_SWEEPER = 30 * time.Second
)

View File

@@ -87,6 +87,7 @@ type Options struct {
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
WriteDeadline time.Duration `json:"-"`
RQSubsSweep time.Duration `json:"-"`
CustomClientAuthentication Authentication `json:"-"`
CustomRouterAuthentication Authentication `json:"-"`
@@ -949,6 +950,9 @@ func processOptions(opts *Options) {
if opts.WriteDeadline == time.Duration(0) {
opts.WriteDeadline = DEFAULT_FLUSH_DEADLINE
}
if opts.RQSubsSweep == time.Duration(0) {
opts.RQSubsSweep = DEFAULT_REMOTE_QSUBS_SWEEPER
}
}
// ConfigureOptions accepts a flag set and augment it with NATS Server

View File

@@ -40,6 +40,7 @@ func TestDefaultOptions(t *testing.T) {
MaxPayload: MAX_PAYLOAD_SIZE,
MaxPending: MAX_PENDING_SIZE,
WriteDeadline: DEFAULT_FLUSH_DEADLINE,
RQSubsSweep: DEFAULT_REMOTE_QSUBS_SWEEPER,
}
opts := &Options{}

View File

@@ -61,12 +61,196 @@ type connectInfo struct {
Name string `json:"name"`
}
// Used to hold onto mappings for unsubscribed
// routed queue subscribers.
type rqsub struct {
group []byte
atime time.Time
}
// Route protocol constants
const (
ConProto = "CONNECT %s" + _CRLF_
InfoProto = "INFO %s" + _CRLF_
)
// Clear up the timer and any map held for remote qsubs.
func (s *Server) clearRemoteQSubs() {
s.rqsMu.Lock()
defer s.rqsMu.Unlock()
if s.rqsubsTimer != nil {
s.rqsubsTimer.Stop()
s.rqsubsTimer = nil
}
s.rqsubs = nil
}
// Check to see if we can remove any of the remote qsubs mappings
func (s *Server) purgeRemoteQSubs() {
ri := s.getOpts().RQSubsSweep
s.rqsMu.Lock()
exp := time.Now().Add(-ri)
for k, rqsub := range s.rqsubs {
if exp.After(rqsub.atime) {
delete(s.rqsubs, k)
}
}
if s.rqsubsTimer != nil {
// Reset timer.
s.rqsubsTimer = time.AfterFunc(ri, s.purgeRemoteQSubs)
}
s.rqsMu.Unlock()
}
// Lookup a remote queue group sid.
func (s *Server) lookupRemoteQGroup(sid string) []byte {
s.rqsMu.RLock()
rqsub := s.rqsubs[sid]
s.rqsMu.RUnlock()
return rqsub.group
}
// This will hold onto a remote queue subscriber to allow
// for mapping and handling if we get a message after the
// subscription goes away.
func (s *Server) holdRemoteQSub(sub *subscription) {
// Should not happen, but protect anyway.
if len(sub.queue) == 0 {
return
}
// Add the entry
s.rqsMu.Lock()
// Start timer if needed.
if s.rqsubsTimer == nil {
ri := s.getOpts().RQSubsSweep
s.rqsubsTimer = time.AfterFunc(ri, s.purgeRemoteQSubs)
}
// Create map if needed.
if s.rqsubs == nil {
s.rqsubs = make(map[string]rqsub)
}
group := make([]byte, len(sub.queue))
copy(group, sub.queue)
rqsub := rqsub{group: group, atime: time.Now()}
s.rqsubs[routeSid(sub)] = rqsub
s.rqsMu.Unlock()
}
// This is for when we receive a directed message for a queue subscriber
// that has gone away. We reroute like a new message but scope to only
// the queue subscribers that it was originally intended for. We will
// prefer local clients, but will bounce to another route if needed.
func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
c.Debugf("Attempting redelivery of message for absent queue subscriber on group '%q'", group)
// We only care about qsubs here. Data structure not setup for optimized
// lookup for our specific group however.
var qsubs []*subscription
for _, qs := range r.qsubs {
if len(qs) != 0 && bytes.Compare(group, qs[0].queue) == 0 {
qsubs = qs
break
}
}
// If no match return.
if qsubs == nil {
c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group)
return
}
// We have a matched group of queue subscribers.
// We prefer a local subscriber since that was the original target.
// Spin prand if needed.
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// Hold onto a remote if we come across it to utilize in case no locals exist.
var rsub *subscription
startIndex := c.in.prand.Intn(len(qsubs))
for i := 0; i < len(qsubs); i++ {
index := (startIndex + i) % len(qsubs)
sub := qsubs[index]
if sub == nil {
continue
}
if rsub == nil && bytes.HasPrefix(sub.sid, []byte(QRSID)) {
rsub = sub
continue
}
mh := c.msgHeader(msgh[:len(msgh)], sub)
if c.deliverMsg(sub, mh, msg) {
c.Debugf("Redelivery succeeded for message on group '%q'", group)
return
}
}
// If we are here we failed to find a local, see if we snapshotted a
// remote sub, and if so deliver to that.
if rsub != nil {
mh := c.msgHeader(msgh[:len(msgh)], rsub)
if c.deliverMsg(rsub, mh, msg) {
c.Debugf("Re-routing message on group '%q' to remote server", group)
return
}
}
c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group)
}
// processRoutedMsg processes messages inbound from a route.
func (c *client) processRoutedMsg(r *SublistResult, msg []byte) {
// Snapshot server.
srv := c.srv
msgh := c.prepMsgHeader()
si := len(msgh)
// If we have a queue subscription, deliver direct
// since they are sent direct via L2 semantics over routes.
// If the match is a queue subscription, we will return from
// here regardless if we find a sub.
isq, sub, err := srv.routeSidQueueSubscriber(c.pa.sid)
if isq {
if err != nil {
// We got an invalid QRSID, so stop here
c.Errorf("Unable to deliver routed queue message: %v", err)
return
}
didDeliver := false
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
didDeliver = c.deliverMsg(sub, mh, msg)
}
if !didDeliver && c.srv != nil {
group := c.srv.lookupRemoteQGroup(string(c.pa.sid))
c.reRouteQMsg(r, msgh, msg, group)
}
return
}
// Normal pub/sub message here
// Loop over all normal subscriptions that match.
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, if so we ignore to
// enforce 1-hop semantics.
if sub.client.typ == ROUTER {
continue
}
sub.client.mu.Lock()
if sub.client.nc == nil {
sub.client.mu.Unlock()
continue
}
sub.client.mu.Unlock()
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
}
// Lock should be held entering here.
func (c *client) sendConnect(tlsRequired bool) {
var user, pass string
@@ -498,6 +682,8 @@ func (s *Server) routeSidQueueSubscriber(rsid []byte) (bool, *subscription, erro
return true, nil, nil
}
// Creates a routable sid that can be used
// to reach remote subscriptions.
func routeSid(sub *subscription) string {
var qi string
if len(sub.queue) > 0 {

View File

@@ -872,9 +872,10 @@ func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
nc.Close()
}
func TestRoutedQueueUnsubscribe(t *testing.T) {
func TestRoutedQueueAutoUnsubscribe(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/seed.conf")
optsA.NoSigs, optsA.NoLog = true, true
optsA.RQSubsSweep = 250 * time.Millisecond
srvA := RunServer(optsA)
defer srvA.Shutdown()
@@ -900,16 +901,28 @@ func TestRoutedQueueUnsubscribe(t *testing.T) {
}
defer ncB.Close()
received := int32(0)
cb := func(m *nats.Msg) {
atomic.AddInt32(&received, 1)
rbar := int32(0)
barCb := func(m *nats.Msg) {
atomic.AddInt32(&rbar, 1)
}
rbaz := int32(0)
bazCb := func(m *nats.Msg) {
atomic.AddInt32(&rbaz, 1)
}
// Create 50 queue subs with auto-unsubscribe to each server.
// Create 250 queue subs with auto-unsubscribe to each server for
// group bar and group baz. So 500 total per queue group.
cons := []*nats.Conn{ncA, ncB}
for _, c := range cons {
for i := 0; i < 50; i++ {
qsub, err := c.QueueSubscribe("foo", "bar", cb)
for i := 0; i < 250; i++ {
qsub, err := c.QueueSubscribe("foo", "bar", barCb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := qsub.AutoUnsubscribe(1); err != nil {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
qsub, err = c.QueueSubscribe("foo", "baz", bazCb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
@@ -920,24 +933,39 @@ func TestRoutedQueueUnsubscribe(t *testing.T) {
c.Flush()
}
total := 100
expected := int32(500)
// Now send messages from each server
for i := 0; i < total; i++ {
for i := int32(0); i < expected; i++ {
c := cons[i%2]
c.Publish("foo", []byte("hello"))
c.Publish("foo", []byte("Don't Drop Me!"))
}
for _, c := range cons {
c.Flush()
}
timeout := time.Now().Add(2 * time.Second)
for time.Now().Before(timeout) {
if atomic.LoadInt32(&received) == int32(total) {
wait := time.Now().Add(5 * time.Second)
for time.Now().Before(wait) {
nbar := atomic.LoadInt32(&rbar)
nbaz := atomic.LoadInt32(&rbaz)
if nbar == expected && nbaz == expected {
time.Sleep(500 * time.Millisecond)
// Now check all mappings are gone.
srvA.mu.Lock()
nrqsa := len(srvA.rqsubs)
srvA.mu.Unlock()
srvB.mu.Lock()
nrqsb := len(srvB.rqsubs)
srvB.mu.Unlock()
if nrqsa != 0 || nrqsb != 0 {
t.Fatalf("Expected rqs mappings to have cleared, but got A:%d, B:%d\n",
nrqsa, nrqsb)
}
return
}
time.Sleep(15 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("Should have received %v messages, got %v", total, atomic.LoadInt32(&received))
t.Fatalf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'\n",
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
}
func TestRouteFailedConnRemovedFromTmpMap(t *testing.T) {

View File

@@ -83,12 +83,20 @@ type Server struct {
routeInfo Info
routeInfoJSON []byte
quitCh chan struct{}
grMu sync.Mutex
grTmpClients map[uint64]*client
grRunning bool
grWG sync.WaitGroup // to wait on various go routines
cproto int64 // number of clients supporting async INFO
configTime time.Time // last time config was loaded
// Tracking for remote QRSID tags.
rqsMu sync.RWMutex
rqsubs map[string]rqsub
rqsubsTimer *time.Timer
// Tracking Go routines
grMu sync.Mutex
grTmpClients map[uint64]*client
grRunning bool
grWG sync.WaitGroup // to wait on various go routines
cproto int64 // number of clients supporting async INFO
configTime time.Time // last time config was loaded
logging struct {
sync.RWMutex
@@ -383,6 +391,8 @@ func (s *Server) Shutdown() {
s.profiler.Close()
}
// Clear any remote qsub mappings
s.clearRemoteQSubs()
s.mu.Unlock()
// Release go routines that wait on that channel
@@ -965,8 +975,9 @@ func (s *Server) removeClient(c *client) {
// NumRoutes will report the number of registered routes.
func (s *Server) NumRoutes() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.routes)
nr := len(s.routes)
s.mu.Unlock()
return nr
}
// NumRemotes will report number of registered remotes.