From bb4e8ae0f9bdbb29fda75d5bc4b83e7f077b8771 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 5 Mar 2019 15:55:21 -0700 Subject: [PATCH] Gateways: Fix race for request reply This addresses the following race: - client connection creates a subscription on a reply subject - client connection sends a request - server sends the subscription to inbound gateway - server sends the message to outbound gateway (those may be to different servers) - receiving server sends to sub interested in request subject - app sends reply - its server then check for interest on the reply's subject In interestOnly mode, there is a possibility that this server has not received the interest on the reply subject yet and would then drop the reply. This PR detects above scenario and will prefix the reply subject to identify the origin cluster if it is detected that the last subscription from the sending connection was created less than a second ago. Once the destination has this prefix, the destination cluster will always send back that message to origin cluster even if there is no registered interest. Signed-off-by: Ivan Kozlovic --- server/accounts.go | 2 +- server/client.go | 33 ++++-- server/gateway.go | 256 ++++++++++++++++++++++++++++++++--------- server/gateway_test.go | 188 +++++++++++++++++++++++------- server/leafnode.go | 2 +- server/norace_test.go | 229 +++++++++++++++++++++++++++++++++++- server/route.go | 9 +- test/bench_test.go | 135 +++++++++++++++++++--- test/gateway_test.go | 31 ++++- 9 files changed, 762 insertions(+), 123 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index e81eb158..cde1cc08 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -300,7 +300,7 @@ func (a *Account) removeServiceImport(subject string) { delete(a.imports.services, subject) a.mu.Unlock() if a.srv != nil && a.srv.gateway.enabled { - a.srv.gatewayHandleServiceImport(a, []byte(subject), -1) + a.srv.gatewayHandleServiceImport(a, []byte(subject), nil, -1) } } diff --git a/server/client.go b/server/client.go index e2ef7030..8976f4a0 100644 --- a/server/client.go +++ b/server/client.go @@ -271,6 +271,16 @@ type readCache struct { rsz int32 // Read buffer size srs int32 // Short reads, used for dynamic buffer resizing. + + // When gateways are enabled, this holds the last subscription created + // by this connection and time of creation. When a message needs to + // cross a gateway and has a reply, the reply is prefixed with the cluster + // name of origin if this last subscription is a match for the "reply" + // subject. This is in order to solve req/reply race where the reply may be + // processed in a destination cluster before the subscription interest for + // that reply makes it there (due to different outbound/inbound connections). + lastSub *subscription + lastSubExpire time.Time } const ( @@ -2280,7 +2290,7 @@ func (c *client) processInboundClientMsg(msg []byte) { atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { collect = true } - qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect) + qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect, false) } // Now deal with gateways @@ -2316,7 +2326,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { // and possibly to inbound GW connections for // which we are in interest-only mode. if c.kind == CLIENT && c.srv.gateway.enabled { - c.srv.gatewayHandleServiceImport(rm.acc, nrr, 1) + c.srv.gatewayHandleServiceImport(rm.acc, nrr, c, 1) } } // FIXME(dlc) - Do L1 cache trick from above. @@ -2329,7 +2339,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { } sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) - queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs) + queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs, false) // If this is not a gateway connection but gateway is enabled, // try to send this converted message to all gateways. if sendToGWs { @@ -2374,7 +2384,9 @@ func (c *client) addSubToRouteTargets(sub *subscription) { } // This processes the sublist results for a given message. -func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, collect bool) [][]byte { +func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, + collectQueueNames, allowGWQueuesWithoutFilter bool) [][]byte { + var queues [][]byte // msg header for clients. msgh := c.msgb[1:msgHeadProtoLen] @@ -2435,7 +2447,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // leaf nodes or routes even if there are no queue filters since we collect // them above and do not process inline like normal clients. if c.kind != CLIENT && qf == nil { - goto sendToRoutesOrLeafs + // For gateway connections, if allowGWQueuesWithoutFilter is true, + // really treat this as if it was a client connection and possibly + // pick queue subs. + if !(c.kind == GATEWAY && allowGWQueuesWithoutFilter) { + goto sendToRoutesOrLeafs + } } // Check to see if we have our own rand yet. Global rand @@ -2487,7 +2504,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, continue } else { c.addSubToRouteTargets(sub) - if collect { + if collectQueueNames { queues = append(queues, sub.queue) } } @@ -2508,7 +2525,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, if c.deliverMsg(sub, mh, msg) { // Clear rsub rsub = nil - if collect { + if collectQueueNames { queues = append(queues, sub.queue) } break @@ -2519,7 +2536,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // If we are here we tried to deliver to a local qsub // but failed. So we will send it to a remote or leaf node. c.addSubToRouteTargets(rsub) - if collect { + if collectQueueNames { queues = append(queues, rsub.queue) } } diff --git a/server/gateway.go b/server/gateway.go index a6bdbd4d..d54626bd 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "crypto/sha256" "crypto/tls" "encoding/json" "fmt" @@ -32,7 +33,10 @@ const ( defaultSolicitGatewaysDelay = time.Second defaultGatewayConnectDelay = time.Second defaultGatewayReconnectDelay = time.Second + defaultGatewayRecentSubExpiration = time.Second defaultGatewayMaxRUnsubBeforeSwitch = 1000 + gwReplyPrefix = "$GR." + gwReplyStart = len(gwReplyPrefix) + 5 // len of prefix above + len of hash (4) + "." ) var ( @@ -93,6 +97,7 @@ type srvGateway struct { info *Info // Gateway Info protocol infoJSON []byte // Marshal'ed Info protocol runknown bool // Rejects unknown (not configured) gateway connections + replyPfx []byte // Will be "$GR.." // We maintain the interest of subjects and queues per account. // For a given account, entries in the map could be something like this: @@ -110,8 +115,9 @@ type srvGateway struct { m map[string]map[string]*sitally } - resolver netResolver // Used to resolve host name before calling net.Dial() - sqbsz int // Max buffer size to send queue subs protocol. Used for testing. + resolver netResolver // Used to resolve host name before calling net.Dial() + sqbsz int // Max buffer size to send queue subs protocol. Used for testing. + recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply } // Subject interest tally. Also indicates if the key in the map is a @@ -124,6 +130,7 @@ type sitally struct { type gatewayCfg struct { sync.RWMutex *RemoteGatewayOpts + replyPfx []byte urls map[string]*url.URL connAttempts int implicit bool @@ -218,6 +225,19 @@ func validateGatewayOptions(o *Options) error { return nil } +// Computes a hash of 4 characters for the given gateway name. +// This will be used for routing of replies. +func getReplyPrefixForGateway(name string) []byte { + sha := sha256.New() + sha.Write([]byte(name)) + fullHash := []byte(fmt.Sprintf("%x", sha.Sum(nil))) + prefix := make([]byte, 0, len(gwReplyPrefix)+5) + prefix = append(prefix, gwReplyPrefix...) + prefix = append(prefix, fullHash[:4]...) + prefix = append(prefix, '.') + return prefix +} + // Initialize the s.gateway structure. We do this even if the server // does not have a gateway configured. In some part of the code, the // server will check the number of outbound gateways, etc.. and so @@ -232,6 +252,7 @@ func newGateway(opts *Options) (*srvGateway, error) { URLs: make(map[string]struct{}), resolver: opts.Gateway.resolver, runknown: opts.Gateway.RejectUnknown, + replyPfx: getReplyPrefixForGateway(opts.Gateway.Name), } gateway.Lock() defer gateway.Unlock() @@ -250,6 +271,7 @@ func newGateway(opts *Options) (*srvGateway, error) { } cfg := &gatewayCfg{ RemoteGatewayOpts: rgo.clone(), + replyPfx: getReplyPrefixForGateway(rgo.Name), urls: make(map[string]*url.URL, len(rgo.URLs)), } if opts.Gateway.TLSConfig != nil && cfg.TLSConfig == nil { @@ -270,6 +292,7 @@ func newGateway(opts *Options) (*srvGateway, error) { if gateway.sqbsz == 0 { gateway.sqbsz = maxBufSize } + gateway.recSubExp = defaultGatewayRecentSubExpiration gateway.enabled = opts.Gateway.Name != "" && opts.Gateway.Port != 0 return gateway, nil @@ -1029,9 +1052,11 @@ func (s *Server) sendSubsToGateway(c *client, accountName []byte) { // Instruct to send all subs (RS+/-) for this account from now on. c.mu.Lock() e := c.gw.insim[string(accountName)] - if e != nil { - e.mode = modeInterestOnly + if e == nil { + e = &insie{} + c.gw.insim[string(accountName)] = e } + e.mode = modeInterestOnly c.mu.Unlock() } else { // Send queues for all accounts @@ -1173,6 +1198,7 @@ func (s *Server) processImplicitGateway(info *Info) { opts := s.getOpts() cfg = &gatewayCfg{ RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName}, + replyPfx: getReplyPrefixForGateway(gwName), urls: make(map[string]*url.URL, len(info.GatewayURLs)), implicit: true, } @@ -1901,7 +1927,7 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription } } if proto != nil { - c.sendProto(proto, true) + c.sendProto(proto, false) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -1947,7 +1973,7 @@ func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscriptio proto = append(proto, CR_LF...) } c.mu.Lock() - c.sendProto(proto, true) + c.sendProto(proto, false) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -2014,6 +2040,11 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha } } if first || last { + if first && sub.client != nil { + c := sub.client + c.in.lastSub = sub + c.in.lastSubExpire = time.Now().Add(s.gateway.recSubExp) + } if entry.q { s.sendQueueSubOrUnsubToGateways(accName, sub, first) } else { @@ -2022,6 +2053,33 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha } } +// Returns true if the given subject starts with `$GR.` +func subjectStartsWithGatewayReplyPrefix(subj []byte) bool { + return len(subj) > gwReplyStart && string(subj[:len(gwReplyPrefix)]) == gwReplyPrefix +} + +// Evaluates if the given reply should be mapped (adding the origin cluster +// hash as a prefix) or not. +func (c *client) shouldMapReplyForGatewaySend(reply []byte) bool { + if c.in.lastSub == nil { + return false + } + if time.Now().After(c.in.lastSubExpire) { + c.in.lastSub = nil + return false + } + if subjectStartsWithGatewayReplyPrefix(reply) { + return false + } + return matchLiteral(string(reply), string(c.in.lastSub.subject)) +} + +var subPool = &sync.Pool{ + New: func() interface{} { + return &subscription{} + }, +} + // May send a message to all outbound gateways. It is possible // that the message is not sent to a given gateway if for instance // it is known that this gateway has no interest in the account or @@ -2038,46 +2096,84 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr for i := 0; i < len(gw.outo); i++ { gws = append(gws, gw.outo[i]) } + thisClusterReplyPrefix := gw.replyPfx gw.RUnlock() if len(gws) == 0 { return } var ( - subj = string(subject) - queuesa = [512]byte{} - queues = queuesa[:0] - accName = acc.Name + subj = string(subject) + queuesa = [512]byte{} + queues = queuesa[:0] + accName = acc.Name + mreplya [256]byte + mreply []byte + dstPfx []byte + checkReply = reply != nil ) + + // Get a subscription from the pool + sub := subPool.Get().(*subscription) + + // Check if the subject is on "$GR..", + // and if so, send to that GW regardless of its + // interest on the real subject (that is, skip the + // check of subject interest). + if subjectStartsWithGatewayReplyPrefix(subject) { + dstPfx = subject[:gwReplyStart] + } for i := 0; i < len(gws); i++ { gwc := gws[i] - // Plain sub interest and queue sub results for this account/subject - psi, qr := gwc.gatewayInterest(accName, subj) - if !psi && qr == nil { - continue - } - queues = queuesa[:0] - if qr != nil { - for i := 0; i < len(qr.qsubs); i++ { - qsubs := qr.qsubs[i] - if len(qsubs) > 0 { - queue := qsubs[0].queue - add := true - for _, qn := range qgroups { - if bytes.Equal(queue, qn) { - add = false - break + if dstPfx != nil { + gwc.mu.Lock() + ok := gwc.gw.cfg != nil && bytes.Equal(dstPfx, gwc.gw.cfg.replyPfx) + gwc.mu.Unlock() + if !ok { + continue + } + } else { + // Plain sub interest and queue sub results for this account/subject + psi, qr := gwc.gatewayInterest(accName, subj) + if !psi && qr == nil { + continue + } + queues = queuesa[:0] + if qr != nil { + for i := 0; i < len(qr.qsubs); i++ { + qsubs := qr.qsubs[i] + if len(qsubs) > 0 { + queue := qsubs[0].queue + add := true + for _, qn := range qgroups { + if bytes.Equal(queue, qn) { + add = false + break + } + } + if add { + qgroups = append(qgroups, queue) + queues = append(queues, queue...) + queues = append(queues, ' ') } - } - if add { - qgroups = append(qgroups, queue) - queues = append(queues, queue...) - queues = append(queues, ' ') } } } + if !psi && len(queues) == 0 { + continue + } } - if !psi && len(queues) == 0 { - continue + if checkReply { + // Check/map only once + checkReply = false + // Assume we will use original + mreply = reply + // If there was a recent matching subscription on that connection + // and the reply is not already mapped, then map (add prefix). + if c.shouldMapReplyForGatewaySend(reply) { + mreply = mreplya[:0] + mreply = append(mreply, thisClusterReplyPrefix...) + mreply = append(mreply, reply...) + } } mh := c.msgb[:msgHeadProtoLen] mh = append(mh, accName...) @@ -2087,29 +2183,35 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr if len(queues) > 0 { if reply != nil { mh = append(mh, "+ "...) // Signal that there is a reply. - mh = append(mh, reply...) + mh = append(mh, mreply...) mh = append(mh, ' ') } else { mh = append(mh, "| "...) // Only queues } mh = append(mh, queues...) } else if reply != nil { - mh = append(mh, reply...) + mh = append(mh, mreply...) mh = append(mh, ' ') } mh = append(mh, c.pa.szb...) mh = append(mh, CR_LF...) - sub := subscription{client: gwc, subject: c.pa.subject} - c.deliverMsg(&sub, mh, msg) + + // We reuse the subscription object that we pass to deliverMsg. + sub.client = gwc + sub.subject = c.pa.subject + c.deliverMsg(sub, mh, msg) } + // Done with subscription, put back to pool. We don't need + // to reset content since we explicitly set when using it. + subPool.Put(sub) } -func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change int32) { +func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, c *client, change int32) { sid := make([]byte, 0, len(acc.Name)+len(subject)+1) sid = append(sid, acc.Name...) sid = append(sid, ' ') sid = append(sid, subject...) - sub := &subscription{subject: subject, sid: sid} + sub := &subscription{client: c, subject: subject, sid: sid} var rspa [1024]byte rsproto := rspa[:0] @@ -2125,7 +2227,7 @@ func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change s.mu.Lock() for _, r := range s.routes { r.mu.Lock() - r.sendProto(rsproto, true) + r.sendProto(rsproto, false) if r.trace { r.traceOutOp("", rsproto[:len(rsproto)-LEN_CR_LF]) } @@ -2161,7 +2263,8 @@ func (s *Server) gatewayHandleAccountNoInterest(c *client, accName []byte) { func (c *client) sendAccountUnsubToGateway(accName []byte) { // Check if we have sent the A- or not. c.mu.Lock() - if _, sent := c.gw.insim[string(accName)]; !sent { + e, sent := c.gw.insim[string(accName)] + if e != nil || !sent { // Add a nil value to indicate that we have sent an A- // so that we know to send A+ when needed. c.gw.insim[string(accName)] = nil @@ -2170,7 +2273,7 @@ func (c *client) sendAccountUnsubToGateway(accName []byte) { proto = append(proto, aUnsubBytes...) proto = append(proto, accName...) proto = append(proto, CR_LF...) - c.sendProto(proto, true) + c.sendProto(proto, false) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -2226,7 +2329,7 @@ func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName proto = append(proto, ' ') proto = append(proto, subject...) proto = append(proto, CR_LF...) - c.sendProto(proto, true) + c.sendProto(proto, false) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -2238,6 +2341,17 @@ func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName } } +func (g *srvGateway) getReplyPrefix() []byte { + g.RLock() + replyPfx := g.replyPfx + g.RUnlock() + return replyPfx +} + +func (s *Server) isGatewayReplyForThisCluster(subj []byte) bool { + return string(s.gateway.getReplyPrefix()) == string(subj[:gwReplyStart]) +} + // Process a message coming from a remote gateway. Send to any sub/qsub // in our cluster that is matching. When receiving a message for an // account or subject for which there is no interest in this cluster @@ -2262,6 +2376,34 @@ func (c *client) processInboundGatewayMsg(msg []byte) { return } + // If we receive a message on $GR.. + // we will drop the prefix before processing interest + // in this cluster, but we also need to resend to + // other gateways. + sendBackToGateways := false + + // First thing to do is to check if the subject starts + // with "$GR..". + if subjectStartsWithGatewayReplyPrefix(c.pa.subject) { + // If it does, then is this server/cluster the actual + // destination for this message? + if !c.srv.isGatewayReplyForThisCluster(c.pa.subject) { + // We could report, for now, just drop. + return + } + // Adjust the subject to past the prefix + c.pa.subject = c.pa.subject[gwReplyStart:] + // Use a stack buffer to rewrite c.pa.cache since we + // only need it for getAccAndResultFromCache() + var _pacache [256]byte + pacache := _pacache[:0] + pacache = append(pacache, c.pa.account...) + pacache = append(pacache, ' ') + pacache = append(pacache, c.pa.subject...) + c.pa.pacache = pacache + sendBackToGateways = true + } + acc, r := c.getAccAndResultFromCache() if acc == nil { c.Debugf("Unknown account %q for gateway message on subject: %q", c.pa.account, c.pa.subject) @@ -2277,19 +2419,27 @@ func (c *client) processInboundGatewayMsg(msg []byte) { c.checkForImportServices(acc, msg) } - // If there is no interest on plain subs, possibly send an RS-, - // even if there is qsubs interest. - if len(r.psubs) == 0 { - c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject) + if !sendBackToGateways { + // If there is no interest on plain subs, possibly send an RS-, + // even if there is qsubs interest. + if len(r.psubs) == 0 { + c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject) - // If there is also no queue filter, then no point in continuing - // (even if r.qsubs i > 0). - if len(c.pa.queues) == 0 { - return + // If there is also no queue filter, then no point in continuing + // (even if r.qsubs i > 0). + if len(c.pa.queues) == 0 { + return + } } + c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false, false) + } else { + // We normally would not allow sending to a queue unless the + // RMSG contains the queue groups, however, if the incoming + // message was a "$GR." then we need to act as if this was + // a CLIENT connection.. + qnames := c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, true, true) + c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames) } - - c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false) } // Indicates that the remote which we are sending messages to diff --git a/server/gateway_test.go b/server/gateway_test.go index b665d6e4..04fb25a3 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1406,6 +1406,59 @@ func TestGatewayAccountInterest(t *testing.T) { checkCount(t, gwcc, 1) } +func TestGatewayAccountUnsub(t *testing.T) { + ob := testDefaultOptionsForGateway("B") + sb := runGatewayServer(ob) + defer sb.Shutdown() + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + waitForOutboundGateways(t, sa, 1, time.Second) + waitForOutboundGateways(t, sb, 1, time.Second) + waitForInboundGateways(t, sa, 1, time.Second) + waitForInboundGateways(t, sb, 1, time.Second) + + // Connect on B + ncb := natsConnect(t, fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)) + defer ncb.Close() + // Create subscription + natsSub(t, ncb, "foo", func(m *nats.Msg) { + ncb.Publish(m.Reply, []byte("reply")) + }) + + // Connect on A + nca := natsConnect(t, fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)) + defer nca.Close() + // Send a request + if _, err := nca.Request("foo", []byte("req"), time.Second); err != nil { + t.Fatalf("Error getting reply: %v", err) + } + + // Now close connection on B + ncb.Close() + + // Publish lots of messages on "foo" from A. + // We should receive an A- shortly and the number + // of outbound messages from A to B should not be + // close to the number of messages sent here. + total := 5000 + for i := 0; i < total; i++ { + natsPub(t, nca, "foo", []byte("hello")) + } + natsFlush(t, nca) + + c := sa.getOutboundGatewayConnection("B") + c.mu.Lock() + out := c.outMsgs + c.mu.Unlock() + + if out >= int64(80*total)/100 { + t.Fatalf("Unexpected number of messages sent from A to B: %v", out) + } +} + func TestGatewaySubjectInterest(t *testing.T) { o1 := testDefaultOptionsForGateway("A") setAccountUserPassInOptions(o1, "$foo", "ivan", "password") @@ -2869,18 +2922,16 @@ func TestGatewaySendAllSubs(t *testing.T) { waitForInboundGateways(t, sb, 2, time.Second) waitForInboundGateways(t, sc, 2, time.Second) - // On B, create a sub that will reply to requests - bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) - ncB := natsConnect(t, bURL) - defer ncB.Close() - natsSub(t, ncB, "foo", func(m *nats.Msg) { - ncB.Publish(m.Reply, m.Data) - }) - natsFlush(t, ncB) - checkExpectedSubs(t, 1, sb) + // On A, create a sub to register some interest + aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port) + ncA := natsConnect(t, aURL) + defer ncA.Close() + natsSub(t, ncA, "sub.on.a.*", func(m *nats.Msg) {}) + natsFlush(t, ncA) + checkExpectedSubs(t, 1, sa) - // On C, have some delayed activity while it receives - // unwanted messages and switches to sendAllSubs. + // On C, have some sub activity while it receives + // unwanted messages and switches to interestOnly mode. cURL := fmt.Sprintf("nats://%s:%d", oc.Host, oc.Port) ncC := natsConnect(t, cURL) defer ncC.Close() @@ -2910,6 +2961,11 @@ func TestGatewaySendAllSubs(t *testing.T) { } }() + // From B publish on subjects for which C has an interest + bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) + ncB := natsConnect(t, bURL) + defer ncB.Close() + go func() { defer wg.Done() time.Sleep(10 * time.Millisecond) @@ -2925,22 +2981,15 @@ func TestGatewaySendAllSubs(t *testing.T) { } }() - // From A, send a lot of requests. - aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port) - ncA := natsConnect(t, aURL) - defer ncA.Close() + // From B, send a lot of messages that A is interested in, + // but not C. // TODO(ik): May need to change that if we change the threshold // for when the switch happens. total := 300 for i := 0; i < total; i++ { - req := fmt.Sprintf("%d", i) - reply, err := ncA.Request("foo", []byte(req), time.Second) - if err != nil { + if err := ncB.Publish(fmt.Sprintf("sub.on.a.%d", i), []byte("hi")); err != nil { t.Fatalf("Error waiting for reply: %v", err) } - if string(reply.Data) != req { - t.Fatalf("Expected reply %q, got %q", req, reply.Data) - } } close(done) @@ -2996,17 +3045,6 @@ func TestGatewaySendAllSubs(t *testing.T) { return nil }) - for i := 0; i < total; i++ { - req := fmt.Sprintf("%d", i) - reply, err := ncA.Request("foo", []byte(req), time.Second) - if err != nil { - t.Fatalf("Error waiting for reply: %v", err) - } - if string(reply.Data) != req { - t.Fatalf("Expected reply %q, got %q", req, reply.Data) - } - } - // Also, after all that, if a sub is created on C, it should // be sent to B (but not A). Check that this is the case. // So first send from A on the subject that we are going to @@ -3218,7 +3256,7 @@ func TestGatewayServiceImport(t *testing.T) { subB := natsSubSync(t, clientB, "reply") natsFlush(t, clientB) - for i := 0; i < 1; i++ { + for i := 0; i < 2; i++ { // Send the request from clientB on foo.request, natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) natsFlush(t, clientB) @@ -3263,10 +3301,14 @@ func TestGatewayServiceImport(t *testing.T) { t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs) } + // For B, we expect it to send on the two subjects: test.request and foo.request + // then send the reply (MSG + RMSG). if i == 0 { - expected = 3 + expected = 4 } else { - expected = 5 + // The second time, one of the account will be suppressed, so we will get + // only 2 more messages. + expected = 6 } vz, _ = sb.Varz(nil) if vz.OutMsgs != expected { @@ -3522,7 +3564,7 @@ func TestGatewayServiceImportWithQueue(t *testing.T) { // on server B. checkForRegisteredQSubInterest(t, sb, "A", "$foo", "test.request", 1, time.Second) - for i := 0; i < 1; i++ { + for i := 0; i < 2; i++ { // Send the request from clientB on foo.request, natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) natsFlush(t, clientB) @@ -3559,16 +3601,20 @@ func TestGatewayServiceImportWithQueue(t *testing.T) { t.Fatalf("Unexpected msg: %v", msg) } - expected := int64(i + 2) + expected := int64((i + 1) * 2) vz, _ := sa.Varz(nil) if vz.OutMsgs != expected { t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs) } + // For B, we expect it to send on the two subjects: test.request and foo.request + // then send the reply (MSG + RMSG). if i == 0 { - expected = 3 - } else { expected = 4 + } else { + // The second time, one of the account will be suppressed, so we will get + // only 2 more messages. + expected = 6 } vz, _ = sb.Varz(nil) if vz.OutMsgs != expected { @@ -4477,3 +4523,67 @@ func TestGatewayMemUsage(t *testing.T) { s.Shutdown() } } + +func TestGatewayMapReplyOnlyForRecentSub(t *testing.T) { + o2 := testDefaultOptionsForGateway("B") + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + o1 := testGatewayOptionsFromToWithServers(t, "A", "B", s2) + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + waitForOutboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s2, 1, time.Second) + + // Change s1's recent sub expiration default value + s1.mu.Lock() + s1.gateway.pasi.Lock() + s1.gateway.recSubExp = 100 * time.Millisecond + s1.gateway.pasi.Unlock() + s1.mu.Unlock() + + // Setup a replier on s2 + nc2 := natsConnect(t, fmt.Sprintf("nats://%s:%d", o2.Host, o2.Port)) + defer nc2.Close() + count := 0 + errCh := make(chan error, 1) + natsSub(t, nc2, "foo", func(m *nats.Msg) { + // Send reply regardless.. + nc2.Publish(m.Reply, []byte("reply")) + if count == 0 { + if strings.HasPrefix(m.Reply, nats.InboxPrefix) { + errCh <- fmt.Errorf("First reply expected to have a special prefix, got %v", m.Reply) + return + } + count++ + } else { + if !strings.HasPrefix(m.Reply, nats.InboxPrefix) { + errCh <- fmt.Errorf("Second reply expected to have normal inbox, got %v", m.Reply) + return + } + errCh <- nil + } + }) + natsFlush(t, nc2) + checkExpectedSubs(t, 1, s2) + + // Create requestor on s1 + nc1 := natsConnect(t, fmt.Sprintf("nats://%s:%d", o1.Host, o1.Port)) + defer nc1.Close() + // Send first request, reply should be mapped + nc1.Request("foo", []byte("msg1"), time.Second) + // Wait more than the recent sub expiration (that we have set to 100ms) + time.Sleep(200 * time.Millisecond) + // Send second request (reply should not be mapped) + nc1.Request("foo", []byte("msg2"), time.Second) + + select { + case e := <-errCh: + if e != nil { + t.Fatalf(e.Error()) + } + case <-time.After(time.Second): + t.Fatalf("Did not get replies") + } +} diff --git a/server/leafnode.go b/server/leafnode.go index bdf01dae..3966c1e1 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1241,7 +1241,7 @@ func (c *client) processInboundLeafMsg(msg []byte) { atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { collect = true } - qnames = c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, collect) + qnames = c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, collect, false) } // Now deal with gateways diff --git a/server/norace_test.go b/server/norace_test.go index 0897810c..756b4a82 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -177,7 +177,6 @@ func TestNoRaceRoutedQueueAutoUnsubscribe(t *testing.T) { nbar := atomic.LoadInt32(&rbar) nbaz := atomic.LoadInt32(&rbaz) if nbar == expected && nbaz == expected { - time.Sleep(500 * time.Millisecond) return nil } return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'", @@ -329,3 +328,231 @@ func TestNoRaceSlowConsumerPendingBytes(t *testing.T) { } t.Fatal("Connection should have been closed") } + +func TestNoRaceGatewayNoMissingReplies(t *testing.T) { + // This test will have following setup: + // + // responder1 requestor + // | | + // v v + // [A1]<-------gw------------[B1] + // | \ | + // | \______gw__________ | route + // | _\| | + // [ ]--------gw----------->[ ] + // [A2]<-------gw------------[B2] + // [ ] [ ] + // ^ + // | + // responder2 + // + // There is a possible race that when the requestor creates + // a subscription on the reply subject, the subject interest + // being sent from the inbound gateway, and B1 having none, + // the SUB first goes to B2 before being sent to A1 from + // B2's inbound GW. But the request can go from B1 to A1 + // right away and the responder1 connecting to A1 may send + // back the reply before the interest on the reply makes it + // to A1 (from B2). + // This test will also verify that if the responder is instead + // connected to A2, the reply is properly received by requestor + // on B1. + + // For this test we want to be in interestOnly mode, so + // make it happen quickly + gatewayMaxRUnsubBeforeSwitch = 1 + defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }() + + // Start with setting up A2 and B2. + ob2 := testDefaultOptionsForGateway("B") + sb2 := runGatewayServer(ob2) + defer sb2.Shutdown() + + oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2) + sa2 := runGatewayServer(oa2) + defer sa2.Shutdown() + + waitForOutboundGateways(t, sa2, 1, time.Second) + waitForInboundGateways(t, sa2, 1, time.Second) + waitForOutboundGateways(t, sb2, 1, time.Second) + waitForInboundGateways(t, sb2, 1, time.Second) + + // Now start A1 which will connect to B2 + oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2) + oa1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", oa2.Cluster.Host, oa2.Cluster.Port)) + sa1 := runGatewayServer(oa1) + defer sa1.Shutdown() + + waitForOutboundGateways(t, sa1, 1, time.Second) + waitForInboundGateways(t, sb2, 2, time.Second) + + checkClusterFormed(t, sa1, sa2) + + // Finally, start B1 that will connect to A1. + ob1 := testGatewayOptionsFromToWithServers(t, "B", "A", sa1) + ob1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob2.Cluster.Host, ob2.Cluster.Port)) + sb1 := runGatewayServer(ob1) + defer sb1.Shutdown() + + // Check that we have the outbound gateway from B1 to A1 + checkFor(t, 3*time.Second, 15*time.Millisecond, func() error { + c := sb1.getOutboundGatewayConnection("A") + if c == nil { + return fmt.Errorf("Outbound connection to A not created yet") + } + c.mu.Lock() + name := c.opts.Name + nc := c.nc + c.mu.Unlock() + if name != sa1.ID() { + // Force a disconnect + nc.Close() + return fmt.Errorf("Was unable to have B1 connect to A1") + } + return nil + }) + + waitForInboundGateways(t, sa1, 1, time.Second) + checkClusterFormed(t, sb1, sb2) + + // Slow down GWs connections + // testSlowDownGatewayConnections(t, sa1, sa2, sb1, sb2) + + // For this test, since we are using qsubs on A and B, and we + // want to make sure that it is received only on B, make the + // recent sub expiration high (especially when running on + // Travis with GOGC=10 + setRecentSubExpiration := func(s *Server) { + s.mu.Lock() + s.gateway.pasi.Lock() + s.gateway.recSubExp = 10 * time.Second + s.gateway.pasi.Unlock() + s.mu.Unlock() + } + setRecentSubExpiration(sb1) + setRecentSubExpiration(sb2) + + a1URL := fmt.Sprintf("nats://%s:%d", oa1.Host, oa1.Port) + a2URL := fmt.Sprintf("nats://%s:%d", oa2.Host, oa2.Port) + b1URL := fmt.Sprintf("nats://%s:%d", ob1.Host, ob1.Port) + b2URL := fmt.Sprintf("nats://%s:%d", ob2.Host, ob2.Port) + + ncb1 := natsConnect(t, b1URL) + defer ncb1.Close() + + ncb2 := natsConnect(t, b2URL) + defer ncb2.Close() + + natsSubSync(t, ncb1, "just.a.sub") + natsSubSync(t, ncb2, "just.a.sub") + checkExpectedSubs(t, 2, sb1, sb2) + + // For this test, we want A to be checking B's interest in order + // to send messages (which would cause replies to be dropped if + // there is no interest registered on A). So from A servers, + // send to various subjects and cause B's to switch to interestOnly + // mode. + nca1 := natsConnect(t, a1URL) + defer nca1.Close() + for i := 0; i < 10; i++ { + natsPub(t, nca1, fmt.Sprintf("reject.%d", i), []byte("hello")) + } + nca2 := natsConnect(t, a2URL) + defer nca2.Close() + for i := 0; i < 10; i++ { + natsPub(t, nca2, fmt.Sprintf("reject.%d", i), []byte("hello")) + } + + checkSwitchedMode := func(t *testing.T, s *Server) { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + var switchedMode bool + c := s.getOutboundGatewayConnection("B") + ei, _ := c.gw.outsim.Load(globalAccountName) + if ei != nil { + e := ei.(*outsie) + e.RLock() + switchedMode = e.ni == nil && e.mode == modeInterestOnly + e.RUnlock() + } + if !switchedMode { + return fmt.Errorf("Still not switched mode") + } + return nil + }) + } + checkSwitchedMode(t, sa1) + checkSwitchedMode(t, sa2) + + // Setup a subscriber on _INBOX.> on each of A's servers. + total := 1000 + expected := int32(total) + rcvOnA := int32(0) + qrcvOnA := int32(0) + natsSub(t, nca1, "myreply.>", func(_ *nats.Msg) { + atomic.AddInt32(&rcvOnA, 1) + }) + natsQueueSub(t, nca2, "myreply.>", "bar", func(_ *nats.Msg) { + atomic.AddInt32(&qrcvOnA, 1) + }) + checkExpectedSubs(t, 2, sa1, sa2) + + // Ok.. so now we will run the actual test where we + // create a responder on A1 and make sure that every + // single request from B1 gets the reply. Will repeat + // test with responder connected to A2. + sendReqs := func(t *testing.T, subConn *nats.Conn) { + t.Helper() + responder := natsSub(t, subConn, "foo", func(m *nats.Msg) { + nca1.Publish(m.Reply, []byte("reply")) + }) + natsFlush(t, subConn) + checkExpectedSubs(t, 3, sa1, sa2) + + // We are not going to use Request() because this sets + // a wildcard subscription on an INBOX and less likely + // to produce the race. Instead we will explicitly set + // the subscription on the reply subject and create one + // per request. + for i := 0; i < total/2; i++ { + reply := fmt.Sprintf("myreply.%d", i) + replySub := natsQueueSubSync(t, ncb1, reply, "bar") + natsFlush(t, ncb1) + + // Let's make sure we have interest on B2. + if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 { + checkFor(t, time.Second, time.Millisecond, func() error { + if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 { + return fmt.Errorf("B still not registered interest on %s", reply) + } + return nil + }) + } + natsPubReq(t, ncb1, "foo", reply, []byte("request")) + if _, err := replySub.NextMsg(time.Second); err != nil { + t.Fatalf("Did not receive reply: %v", err) + } + natsUnsub(t, replySub) + } + + responder.Unsubscribe() + natsFlush(t, subConn) + checkExpectedSubs(t, 2, sa1, sa2) + } + sendReqs(t, nca1) + sendReqs(t, nca2) + + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if n := atomic.LoadInt32(&rcvOnA); n != expected { + return fmt.Errorf("Subs on A expected to get %v replies, got %v", expected, n) + } + return nil + }) + + // We should not have received a single message on the queue sub + // on cluster A because messages will have been delivered to + // the member on cluster B. + if n := atomic.LoadInt32(&qrcvOnA); n != 0 { + t.Fatalf("Queue sub on A should not have received message, got %v", n) + } +} diff --git a/server/route.go b/server/route.go index 408f984c..1e495063 100644 --- a/server/route.go +++ b/server/route.go @@ -299,7 +299,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) { c.mu.Unlock() } } - c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false) + c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false, false) } // Helper function for routes and gateways to create qfilters need for @@ -1038,7 +1038,12 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra // the lock, which could cause pingTimer to think that this // connection is stale otherwise. c.last = time.Now() - c.flushOutbound() + if !c.flushOutbound() { + // Another go routine is flushing already and does not + // have the lock. Give it a chance to finish... + c.mu.Unlock() + c.mu.Lock() + } if closed = c.flags.isSet(clearConnection); closed { break } diff --git a/test/bench_test.go b/test/bench_test.go index cc6c80cf..17b0f0c4 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -983,6 +983,8 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish startCh := make(chan bool) l := b.N / numPublishers + lastMsgSendOp := []byte("PUB end.test 2\r\nok\r\n") + pubLoop := func(c net.Conn, ch chan bool) { bw := bufio.NewWriterSize(c, defaultSendBufSize) @@ -998,7 +1000,7 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish return } } - if _, err := bw.Write([]byte("PUB end.test 2\r\nok\r\n")); err != nil { + if _, err := bw.Write(lastMsgSendOp); err != nil { b.Errorf("Received error on PUB write: %v\n", err) return } @@ -1011,6 +1013,7 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish // Publish Connections SPINUP for i := 0; i < numPublishers; i++ { c := createClientConn(b, oa.Host, oa.Port) + defer c.Close() doDefaultConnect(b, c) flushConnection(b, c) ch := make(chan bool) @@ -1019,7 +1022,18 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish <-ch } - b.SetBytes(int64(len(sendOp) + len(msgOp))) + // To report the number of bytes: + // from publisher to server on cluster A: + numBytes := len(sendOp) + if subInterest { + // from server in cluster A to server on cluster B: + // RMSG $G foo \r\n + numBytes += len("RMSG $G foo xxxx ") + len(payload) + 2 + + // From server in cluster B to sub: + numBytes += len(msgOp) + } + b.SetBytes(int64(numBytes)) b.ResetTimer() // Closing this will start all publishers at once (roughly) @@ -1031,51 +1045,51 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish b.StopTimer() } -func Benchmark_Gateways___Optimistic_1kx01x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_1kx01x0(b *testing.B) { gatewaysBench(b, true, sizedString(1024), 1, false) } -func Benchmark_Gateways___Optimistic_2kx01x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_2kx01x0(b *testing.B) { gatewaysBench(b, true, sizedString(2048), 1, false) } -func Benchmark_Gateways___Optimistic_4kx01x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_4kx01x0(b *testing.B) { gatewaysBench(b, true, sizedString(4096), 1, false) } -func Benchmark_Gateways___Optimistic_1kx10x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_1kx10x0(b *testing.B) { gatewaysBench(b, true, sizedString(1024), 10, false) } -func Benchmark_Gateways___Optimistic_2kx10x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_2kx10x0(b *testing.B) { gatewaysBench(b, true, sizedString(2048), 10, false) } -func Benchmark_Gateways___Optimistic_4kx10x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_4kx10x0(b *testing.B) { gatewaysBench(b, true, sizedString(4096), 10, false) } -func Benchmark_Gateways___Optimistic_1kx01x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_1kx01x1(b *testing.B) { gatewaysBench(b, true, sizedString(1024), 1, true) } -func Benchmark_Gateways___Optimistic_2kx01x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_2kx01x1(b *testing.B) { gatewaysBench(b, true, sizedString(2048), 1, true) } -func Benchmark_Gateways___Optimistic_4kx01x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_4kx01x1(b *testing.B) { gatewaysBench(b, true, sizedString(4096), 1, true) } -func Benchmark_Gateways___Optimistic_1kx10x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_1kx10x1(b *testing.B) { gatewaysBench(b, true, sizedString(1024), 10, true) } -func Benchmark_Gateways___Optimistic_2kx10x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_2kx10x1(b *testing.B) { gatewaysBench(b, true, sizedString(2048), 10, true) } -func Benchmark_Gateways___Optimistic_4kx10x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_4kx10x1(b *testing.B) { gatewaysBench(b, true, sizedString(4096), 10, true) } @@ -1126,3 +1140,96 @@ func Benchmark_Gateways_InterestOnly_2kx10x1(b *testing.B) { func Benchmark_Gateways_InterestOnly_4kx10x1(b *testing.B) { gatewaysBench(b, false, sizedString(4096), 10, true) } + +// This bench only sends the requests to verify impact of reply +// reply mapping in GW code. +func gatewaySendRequestsBench(b *testing.B, singleReplySub bool) { + server.SetGatewaysSolicitDelay(10 * time.Millisecond) + defer server.ResetGatewaysSolicitDelay() + + ob := testDefaultOptionsForGateway("B") + sb := RunServer(ob) + defer sb.Shutdown() + + gwbURL, err := url.Parse(fmt.Sprintf("nats://%s:%d", ob.Gateway.Host, ob.Gateway.Port)) + if err != nil { + b.Fatalf("Error parsing url: %v", err) + } + oa := testDefaultOptionsForGateway("A") + oa.Gateway.Gateways = []*server.RemoteGatewayOpts{ + &server.RemoteGatewayOpts{ + Name: "B", + URLs: []*url.URL{gwbURL}, + }, + } + sa := RunServer(oa) + defer sa.Shutdown() + + sub := createClientConn(b, ob.Host, ob.Port) + defer sub.Close() + doDefaultConnect(b, sub) + sendProto(b, sub, "SUB foo 1\r\n") + flushConnection(b, sub) + + lenMsg := len("MSG foo reply.xxxxxxxxxx 1 2\r\nok\r\n") + expected := b.N * lenMsg + if !singleReplySub { + expected += b.N * len("$GR.1234.") + } + ch := make(chan bool, 1) + go drainConnection(b, sub, ch, expected) + + c := createClientConn(b, oa.Host, oa.Port) + defer c.Close() + doDefaultConnect(b, c) + if singleReplySub { + sendProto(b, c, "SUB reply.* 1\r\n") + } + flushConnection(b, c) + + // If a single sub for replies is created, wait for more + // than the duration under which we do reply mapping + if singleReplySub { + time.Sleep(1100 * time.Millisecond) + } + + bw := bufio.NewWriterSize(c, defaultSendBufSize) + + // From pub to server in cluster A: + numBytes := len("PUB foo reply.0123456789 2\r\nok\r\n") + if !singleReplySub { + // Add the preceding SUB + numBytes += len("SUB reply.0123456789 0123456789\r\n") + } + // From server in cluster A to cluster B + numBytes += len("RMSG $G foo reply.0123456789 2\r\nok\r\n") + // If mapping of reply... + if !singleReplySub { + // the mapping uses about 10 more bytes. So add them + // for RMSG from server to server, and MSG to sub. + numBytes += 20 + } + // From server in cluster B to sub + numBytes += lenMsg + b.SetBytes(int64(numBytes)) + b.ResetTimer() + + var subStr string + for i := 0; i < b.N; i++ { + if !singleReplySub { + subStr = fmt.Sprintf("SUB reply.%010d %010d\r\n", i+1, i+1) + } + bw.Write([]byte(fmt.Sprintf("%sPUB foo reply.%010d 2\r\nok\r\n", subStr, i+1))) + } + bw.Flush() + + <-ch +} + +func Benchmark_Gateways_Requests_CreateOneSubForAll(b *testing.B) { + gatewaySendRequestsBench(b, true) +} + +func Benchmark_Gateways_Requests_CreateOneSubForEach(b *testing.B) { + gatewaySendRequestsBench(b, true) +} diff --git a/test/gateway_test.go b/test/gateway_test.go index 28e5e395..1b209fec 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -14,6 +14,7 @@ package test import ( + "bufio" "bytes" "fmt" "net" @@ -415,11 +416,33 @@ func TestGatewaySendAllSubs(t *testing.T) { // switch. for i := 0; i < 10001; i++ { gASend(fmt.Sprintf("RMSG $G foo.%d 2\r\nok\r\n", i)) - if i < 1000 { + if i <= 1000 { gAExpect(runsubRe) } } - // Since B has no sub, we should get 2 INFOs with start/end - // commands. - expectNumberOfProtos(t, gAExpect, infoRe, 2) + // Expect an INFO + RS+ $G not.used + INFO + buf := bufio.NewReader(gA) + for i := 0; i < 3; i++ { + line, _, err := buf.ReadLine() + if err != nil { + t.Fatalf("Error reading: %v", err) + } + switch i { + case 0: + case 2: + if !bytes.HasPrefix(line, []byte("INFO {")) { + t.Fatalf("Expected INFO, got: %s", line) + } + case 1: + if !bytes.HasPrefix(line, []byte("RS+ ")) { + t.Fatalf("Expected RS+, got: %s", line) + } + } + } + // After this point, any new sub or unsub on B should be + // sent to A. + clientSend("SUB foo 1\r\n") + gAExpect(rsubRe) + clientSend("UNSUB 1\r\n") + gAExpect(runsubRe) }