diff --git a/server/accounts.go b/server/accounts.go index e81eb158..cde1cc08 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -300,7 +300,7 @@ func (a *Account) removeServiceImport(subject string) { delete(a.imports.services, subject) a.mu.Unlock() if a.srv != nil && a.srv.gateway.enabled { - a.srv.gatewayHandleServiceImport(a, []byte(subject), -1) + a.srv.gatewayHandleServiceImport(a, []byte(subject), nil, -1) } } diff --git a/server/client.go b/server/client.go index e2ef7030..8976f4a0 100644 --- a/server/client.go +++ b/server/client.go @@ -271,6 +271,16 @@ type readCache struct { rsz int32 // Read buffer size srs int32 // Short reads, used for dynamic buffer resizing. + + // When gateways are enabled, this holds the last subscription created + // by this connection and time of creation. When a message needs to + // cross a gateway and has a reply, the reply is prefixed with the cluster + // name of origin if this last subscription is a match for the "reply" + // subject. This is in order to solve req/reply race where the reply may be + // processed in a destination cluster before the subscription interest for + // that reply makes it there (due to different outbound/inbound connections). + lastSub *subscription + lastSubExpire time.Time } const ( @@ -2280,7 +2290,7 @@ func (c *client) processInboundClientMsg(msg []byte) { atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { collect = true } - qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect) + qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect, false) } // Now deal with gateways @@ -2316,7 +2326,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { // and possibly to inbound GW connections for // which we are in interest-only mode. if c.kind == CLIENT && c.srv.gateway.enabled { - c.srv.gatewayHandleServiceImport(rm.acc, nrr, 1) + c.srv.gatewayHandleServiceImport(rm.acc, nrr, c, 1) } } // FIXME(dlc) - Do L1 cache trick from above. @@ -2329,7 +2339,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { } sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) - queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs) + queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs, false) // If this is not a gateway connection but gateway is enabled, // try to send this converted message to all gateways. if sendToGWs { @@ -2374,7 +2384,9 @@ func (c *client) addSubToRouteTargets(sub *subscription) { } // This processes the sublist results for a given message. -func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, collect bool) [][]byte { +func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, + collectQueueNames, allowGWQueuesWithoutFilter bool) [][]byte { + var queues [][]byte // msg header for clients. msgh := c.msgb[1:msgHeadProtoLen] @@ -2435,7 +2447,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // leaf nodes or routes even if there are no queue filters since we collect // them above and do not process inline like normal clients. if c.kind != CLIENT && qf == nil { - goto sendToRoutesOrLeafs + // For gateway connections, if allowGWQueuesWithoutFilter is true, + // really treat this as if it was a client connection and possibly + // pick queue subs. + if !(c.kind == GATEWAY && allowGWQueuesWithoutFilter) { + goto sendToRoutesOrLeafs + } } // Check to see if we have our own rand yet. Global rand @@ -2487,7 +2504,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, continue } else { c.addSubToRouteTargets(sub) - if collect { + if collectQueueNames { queues = append(queues, sub.queue) } } @@ -2508,7 +2525,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, if c.deliverMsg(sub, mh, msg) { // Clear rsub rsub = nil - if collect { + if collectQueueNames { queues = append(queues, sub.queue) } break @@ -2519,7 +2536,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // If we are here we tried to deliver to a local qsub // but failed. So we will send it to a remote or leaf node. c.addSubToRouteTargets(rsub) - if collect { + if collectQueueNames { queues = append(queues, rsub.queue) } } diff --git a/server/gateway.go b/server/gateway.go index a6bdbd4d..d54626bd 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "crypto/sha256" "crypto/tls" "encoding/json" "fmt" @@ -32,7 +33,10 @@ const ( defaultSolicitGatewaysDelay = time.Second defaultGatewayConnectDelay = time.Second defaultGatewayReconnectDelay = time.Second + defaultGatewayRecentSubExpiration = time.Second defaultGatewayMaxRUnsubBeforeSwitch = 1000 + gwReplyPrefix = "$GR." + gwReplyStart = len(gwReplyPrefix) + 5 // len of prefix above + len of hash (4) + "." ) var ( @@ -93,6 +97,7 @@ type srvGateway struct { info *Info // Gateway Info protocol infoJSON []byte // Marshal'ed Info protocol runknown bool // Rejects unknown (not configured) gateway connections + replyPfx []byte // Will be "$GR.." // We maintain the interest of subjects and queues per account. // For a given account, entries in the map could be something like this: @@ -110,8 +115,9 @@ type srvGateway struct { m map[string]map[string]*sitally } - resolver netResolver // Used to resolve host name before calling net.Dial() - sqbsz int // Max buffer size to send queue subs protocol. Used for testing. + resolver netResolver // Used to resolve host name before calling net.Dial() + sqbsz int // Max buffer size to send queue subs protocol. Used for testing. + recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply } // Subject interest tally. Also indicates if the key in the map is a @@ -124,6 +130,7 @@ type sitally struct { type gatewayCfg struct { sync.RWMutex *RemoteGatewayOpts + replyPfx []byte urls map[string]*url.URL connAttempts int implicit bool @@ -218,6 +225,19 @@ func validateGatewayOptions(o *Options) error { return nil } +// Computes a hash of 4 characters for the given gateway name. +// This will be used for routing of replies. +func getReplyPrefixForGateway(name string) []byte { + sha := sha256.New() + sha.Write([]byte(name)) + fullHash := []byte(fmt.Sprintf("%x", sha.Sum(nil))) + prefix := make([]byte, 0, len(gwReplyPrefix)+5) + prefix = append(prefix, gwReplyPrefix...) + prefix = append(prefix, fullHash[:4]...) + prefix = append(prefix, '.') + return prefix +} + // Initialize the s.gateway structure. We do this even if the server // does not have a gateway configured. In some part of the code, the // server will check the number of outbound gateways, etc.. and so @@ -232,6 +252,7 @@ func newGateway(opts *Options) (*srvGateway, error) { URLs: make(map[string]struct{}), resolver: opts.Gateway.resolver, runknown: opts.Gateway.RejectUnknown, + replyPfx: getReplyPrefixForGateway(opts.Gateway.Name), } gateway.Lock() defer gateway.Unlock() @@ -250,6 +271,7 @@ func newGateway(opts *Options) (*srvGateway, error) { } cfg := &gatewayCfg{ RemoteGatewayOpts: rgo.clone(), + replyPfx: getReplyPrefixForGateway(rgo.Name), urls: make(map[string]*url.URL, len(rgo.URLs)), } if opts.Gateway.TLSConfig != nil && cfg.TLSConfig == nil { @@ -270,6 +292,7 @@ func newGateway(opts *Options) (*srvGateway, error) { if gateway.sqbsz == 0 { gateway.sqbsz = maxBufSize } + gateway.recSubExp = defaultGatewayRecentSubExpiration gateway.enabled = opts.Gateway.Name != "" && opts.Gateway.Port != 0 return gateway, nil @@ -1029,9 +1052,11 @@ func (s *Server) sendSubsToGateway(c *client, accountName []byte) { // Instruct to send all subs (RS+/-) for this account from now on. c.mu.Lock() e := c.gw.insim[string(accountName)] - if e != nil { - e.mode = modeInterestOnly + if e == nil { + e = &insie{} + c.gw.insim[string(accountName)] = e } + e.mode = modeInterestOnly c.mu.Unlock() } else { // Send queues for all accounts @@ -1173,6 +1198,7 @@ func (s *Server) processImplicitGateway(info *Info) { opts := s.getOpts() cfg = &gatewayCfg{ RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName}, + replyPfx: getReplyPrefixForGateway(gwName), urls: make(map[string]*url.URL, len(info.GatewayURLs)), implicit: true, } @@ -1901,7 +1927,7 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription } } if proto != nil { - c.sendProto(proto, true) + c.sendProto(proto, false) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -1947,7 +1973,7 @@ func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscriptio proto = append(proto, CR_LF...) } c.mu.Lock() - c.sendProto(proto, true) + c.sendProto(proto, false) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -2014,6 +2040,11 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha } } if first || last { + if first && sub.client != nil { + c := sub.client + c.in.lastSub = sub + c.in.lastSubExpire = time.Now().Add(s.gateway.recSubExp) + } if entry.q { s.sendQueueSubOrUnsubToGateways(accName, sub, first) } else { @@ -2022,6 +2053,33 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha } } +// Returns true if the given subject starts with `$GR.` +func subjectStartsWithGatewayReplyPrefix(subj []byte) bool { + return len(subj) > gwReplyStart && string(subj[:len(gwReplyPrefix)]) == gwReplyPrefix +} + +// Evaluates if the given reply should be mapped (adding the origin cluster +// hash as a prefix) or not. +func (c *client) shouldMapReplyForGatewaySend(reply []byte) bool { + if c.in.lastSub == nil { + return false + } + if time.Now().After(c.in.lastSubExpire) { + c.in.lastSub = nil + return false + } + if subjectStartsWithGatewayReplyPrefix(reply) { + return false + } + return matchLiteral(string(reply), string(c.in.lastSub.subject)) +} + +var subPool = &sync.Pool{ + New: func() interface{} { + return &subscription{} + }, +} + // May send a message to all outbound gateways. It is possible // that the message is not sent to a given gateway if for instance // it is known that this gateway has no interest in the account or @@ -2038,46 +2096,84 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr for i := 0; i < len(gw.outo); i++ { gws = append(gws, gw.outo[i]) } + thisClusterReplyPrefix := gw.replyPfx gw.RUnlock() if len(gws) == 0 { return } var ( - subj = string(subject) - queuesa = [512]byte{} - queues = queuesa[:0] - accName = acc.Name + subj = string(subject) + queuesa = [512]byte{} + queues = queuesa[:0] + accName = acc.Name + mreplya [256]byte + mreply []byte + dstPfx []byte + checkReply = reply != nil ) + + // Get a subscription from the pool + sub := subPool.Get().(*subscription) + + // Check if the subject is on "$GR..", + // and if so, send to that GW regardless of its + // interest on the real subject (that is, skip the + // check of subject interest). + if subjectStartsWithGatewayReplyPrefix(subject) { + dstPfx = subject[:gwReplyStart] + } for i := 0; i < len(gws); i++ { gwc := gws[i] - // Plain sub interest and queue sub results for this account/subject - psi, qr := gwc.gatewayInterest(accName, subj) - if !psi && qr == nil { - continue - } - queues = queuesa[:0] - if qr != nil { - for i := 0; i < len(qr.qsubs); i++ { - qsubs := qr.qsubs[i] - if len(qsubs) > 0 { - queue := qsubs[0].queue - add := true - for _, qn := range qgroups { - if bytes.Equal(queue, qn) { - add = false - break + if dstPfx != nil { + gwc.mu.Lock() + ok := gwc.gw.cfg != nil && bytes.Equal(dstPfx, gwc.gw.cfg.replyPfx) + gwc.mu.Unlock() + if !ok { + continue + } + } else { + // Plain sub interest and queue sub results for this account/subject + psi, qr := gwc.gatewayInterest(accName, subj) + if !psi && qr == nil { + continue + } + queues = queuesa[:0] + if qr != nil { + for i := 0; i < len(qr.qsubs); i++ { + qsubs := qr.qsubs[i] + if len(qsubs) > 0 { + queue := qsubs[0].queue + add := true + for _, qn := range qgroups { + if bytes.Equal(queue, qn) { + add = false + break + } + } + if add { + qgroups = append(qgroups, queue) + queues = append(queues, queue...) + queues = append(queues, ' ') } - } - if add { - qgroups = append(qgroups, queue) - queues = append(queues, queue...) - queues = append(queues, ' ') } } } + if !psi && len(queues) == 0 { + continue + } } - if !psi && len(queues) == 0 { - continue + if checkReply { + // Check/map only once + checkReply = false + // Assume we will use original + mreply = reply + // If there was a recent matching subscription on that connection + // and the reply is not already mapped, then map (add prefix). + if c.shouldMapReplyForGatewaySend(reply) { + mreply = mreplya[:0] + mreply = append(mreply, thisClusterReplyPrefix...) + mreply = append(mreply, reply...) + } } mh := c.msgb[:msgHeadProtoLen] mh = append(mh, accName...) @@ -2087,29 +2183,35 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr if len(queues) > 0 { if reply != nil { mh = append(mh, "+ "...) // Signal that there is a reply. - mh = append(mh, reply...) + mh = append(mh, mreply...) mh = append(mh, ' ') } else { mh = append(mh, "| "...) // Only queues } mh = append(mh, queues...) } else if reply != nil { - mh = append(mh, reply...) + mh = append(mh, mreply...) mh = append(mh, ' ') } mh = append(mh, c.pa.szb...) mh = append(mh, CR_LF...) - sub := subscription{client: gwc, subject: c.pa.subject} - c.deliverMsg(&sub, mh, msg) + + // We reuse the subscription object that we pass to deliverMsg. + sub.client = gwc + sub.subject = c.pa.subject + c.deliverMsg(sub, mh, msg) } + // Done with subscription, put back to pool. We don't need + // to reset content since we explicitly set when using it. + subPool.Put(sub) } -func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change int32) { +func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, c *client, change int32) { sid := make([]byte, 0, len(acc.Name)+len(subject)+1) sid = append(sid, acc.Name...) sid = append(sid, ' ') sid = append(sid, subject...) - sub := &subscription{subject: subject, sid: sid} + sub := &subscription{client: c, subject: subject, sid: sid} var rspa [1024]byte rsproto := rspa[:0] @@ -2125,7 +2227,7 @@ func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change s.mu.Lock() for _, r := range s.routes { r.mu.Lock() - r.sendProto(rsproto, true) + r.sendProto(rsproto, false) if r.trace { r.traceOutOp("", rsproto[:len(rsproto)-LEN_CR_LF]) } @@ -2161,7 +2263,8 @@ func (s *Server) gatewayHandleAccountNoInterest(c *client, accName []byte) { func (c *client) sendAccountUnsubToGateway(accName []byte) { // Check if we have sent the A- or not. c.mu.Lock() - if _, sent := c.gw.insim[string(accName)]; !sent { + e, sent := c.gw.insim[string(accName)] + if e != nil || !sent { // Add a nil value to indicate that we have sent an A- // so that we know to send A+ when needed. c.gw.insim[string(accName)] = nil @@ -2170,7 +2273,7 @@ func (c *client) sendAccountUnsubToGateway(accName []byte) { proto = append(proto, aUnsubBytes...) proto = append(proto, accName...) proto = append(proto, CR_LF...) - c.sendProto(proto, true) + c.sendProto(proto, false) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -2226,7 +2329,7 @@ func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName proto = append(proto, ' ') proto = append(proto, subject...) proto = append(proto, CR_LF...) - c.sendProto(proto, true) + c.sendProto(proto, false) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -2238,6 +2341,17 @@ func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName } } +func (g *srvGateway) getReplyPrefix() []byte { + g.RLock() + replyPfx := g.replyPfx + g.RUnlock() + return replyPfx +} + +func (s *Server) isGatewayReplyForThisCluster(subj []byte) bool { + return string(s.gateway.getReplyPrefix()) == string(subj[:gwReplyStart]) +} + // Process a message coming from a remote gateway. Send to any sub/qsub // in our cluster that is matching. When receiving a message for an // account or subject for which there is no interest in this cluster @@ -2262,6 +2376,34 @@ func (c *client) processInboundGatewayMsg(msg []byte) { return } + // If we receive a message on $GR.. + // we will drop the prefix before processing interest + // in this cluster, but we also need to resend to + // other gateways. + sendBackToGateways := false + + // First thing to do is to check if the subject starts + // with "$GR..". + if subjectStartsWithGatewayReplyPrefix(c.pa.subject) { + // If it does, then is this server/cluster the actual + // destination for this message? + if !c.srv.isGatewayReplyForThisCluster(c.pa.subject) { + // We could report, for now, just drop. + return + } + // Adjust the subject to past the prefix + c.pa.subject = c.pa.subject[gwReplyStart:] + // Use a stack buffer to rewrite c.pa.cache since we + // only need it for getAccAndResultFromCache() + var _pacache [256]byte + pacache := _pacache[:0] + pacache = append(pacache, c.pa.account...) + pacache = append(pacache, ' ') + pacache = append(pacache, c.pa.subject...) + c.pa.pacache = pacache + sendBackToGateways = true + } + acc, r := c.getAccAndResultFromCache() if acc == nil { c.Debugf("Unknown account %q for gateway message on subject: %q", c.pa.account, c.pa.subject) @@ -2277,19 +2419,27 @@ func (c *client) processInboundGatewayMsg(msg []byte) { c.checkForImportServices(acc, msg) } - // If there is no interest on plain subs, possibly send an RS-, - // even if there is qsubs interest. - if len(r.psubs) == 0 { - c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject) + if !sendBackToGateways { + // If there is no interest on plain subs, possibly send an RS-, + // even if there is qsubs interest. + if len(r.psubs) == 0 { + c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject) - // If there is also no queue filter, then no point in continuing - // (even if r.qsubs i > 0). - if len(c.pa.queues) == 0 { - return + // If there is also no queue filter, then no point in continuing + // (even if r.qsubs i > 0). + if len(c.pa.queues) == 0 { + return + } } + c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false, false) + } else { + // We normally would not allow sending to a queue unless the + // RMSG contains the queue groups, however, if the incoming + // message was a "$GR." then we need to act as if this was + // a CLIENT connection.. + qnames := c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, true, true) + c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames) } - - c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false) } // Indicates that the remote which we are sending messages to diff --git a/server/gateway_test.go b/server/gateway_test.go index b665d6e4..04fb25a3 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1406,6 +1406,59 @@ func TestGatewayAccountInterest(t *testing.T) { checkCount(t, gwcc, 1) } +func TestGatewayAccountUnsub(t *testing.T) { + ob := testDefaultOptionsForGateway("B") + sb := runGatewayServer(ob) + defer sb.Shutdown() + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + waitForOutboundGateways(t, sa, 1, time.Second) + waitForOutboundGateways(t, sb, 1, time.Second) + waitForInboundGateways(t, sa, 1, time.Second) + waitForInboundGateways(t, sb, 1, time.Second) + + // Connect on B + ncb := natsConnect(t, fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)) + defer ncb.Close() + // Create subscription + natsSub(t, ncb, "foo", func(m *nats.Msg) { + ncb.Publish(m.Reply, []byte("reply")) + }) + + // Connect on A + nca := natsConnect(t, fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)) + defer nca.Close() + // Send a request + if _, err := nca.Request("foo", []byte("req"), time.Second); err != nil { + t.Fatalf("Error getting reply: %v", err) + } + + // Now close connection on B + ncb.Close() + + // Publish lots of messages on "foo" from A. + // We should receive an A- shortly and the number + // of outbound messages from A to B should not be + // close to the number of messages sent here. + total := 5000 + for i := 0; i < total; i++ { + natsPub(t, nca, "foo", []byte("hello")) + } + natsFlush(t, nca) + + c := sa.getOutboundGatewayConnection("B") + c.mu.Lock() + out := c.outMsgs + c.mu.Unlock() + + if out >= int64(80*total)/100 { + t.Fatalf("Unexpected number of messages sent from A to B: %v", out) + } +} + func TestGatewaySubjectInterest(t *testing.T) { o1 := testDefaultOptionsForGateway("A") setAccountUserPassInOptions(o1, "$foo", "ivan", "password") @@ -2869,18 +2922,16 @@ func TestGatewaySendAllSubs(t *testing.T) { waitForInboundGateways(t, sb, 2, time.Second) waitForInboundGateways(t, sc, 2, time.Second) - // On B, create a sub that will reply to requests - bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) - ncB := natsConnect(t, bURL) - defer ncB.Close() - natsSub(t, ncB, "foo", func(m *nats.Msg) { - ncB.Publish(m.Reply, m.Data) - }) - natsFlush(t, ncB) - checkExpectedSubs(t, 1, sb) + // On A, create a sub to register some interest + aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port) + ncA := natsConnect(t, aURL) + defer ncA.Close() + natsSub(t, ncA, "sub.on.a.*", func(m *nats.Msg) {}) + natsFlush(t, ncA) + checkExpectedSubs(t, 1, sa) - // On C, have some delayed activity while it receives - // unwanted messages and switches to sendAllSubs. + // On C, have some sub activity while it receives + // unwanted messages and switches to interestOnly mode. cURL := fmt.Sprintf("nats://%s:%d", oc.Host, oc.Port) ncC := natsConnect(t, cURL) defer ncC.Close() @@ -2910,6 +2961,11 @@ func TestGatewaySendAllSubs(t *testing.T) { } }() + // From B publish on subjects for which C has an interest + bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) + ncB := natsConnect(t, bURL) + defer ncB.Close() + go func() { defer wg.Done() time.Sleep(10 * time.Millisecond) @@ -2925,22 +2981,15 @@ func TestGatewaySendAllSubs(t *testing.T) { } }() - // From A, send a lot of requests. - aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port) - ncA := natsConnect(t, aURL) - defer ncA.Close() + // From B, send a lot of messages that A is interested in, + // but not C. // TODO(ik): May need to change that if we change the threshold // for when the switch happens. total := 300 for i := 0; i < total; i++ { - req := fmt.Sprintf("%d", i) - reply, err := ncA.Request("foo", []byte(req), time.Second) - if err != nil { + if err := ncB.Publish(fmt.Sprintf("sub.on.a.%d", i), []byte("hi")); err != nil { t.Fatalf("Error waiting for reply: %v", err) } - if string(reply.Data) != req { - t.Fatalf("Expected reply %q, got %q", req, reply.Data) - } } close(done) @@ -2996,17 +3045,6 @@ func TestGatewaySendAllSubs(t *testing.T) { return nil }) - for i := 0; i < total; i++ { - req := fmt.Sprintf("%d", i) - reply, err := ncA.Request("foo", []byte(req), time.Second) - if err != nil { - t.Fatalf("Error waiting for reply: %v", err) - } - if string(reply.Data) != req { - t.Fatalf("Expected reply %q, got %q", req, reply.Data) - } - } - // Also, after all that, if a sub is created on C, it should // be sent to B (but not A). Check that this is the case. // So first send from A on the subject that we are going to @@ -3218,7 +3256,7 @@ func TestGatewayServiceImport(t *testing.T) { subB := natsSubSync(t, clientB, "reply") natsFlush(t, clientB) - for i := 0; i < 1; i++ { + for i := 0; i < 2; i++ { // Send the request from clientB on foo.request, natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) natsFlush(t, clientB) @@ -3263,10 +3301,14 @@ func TestGatewayServiceImport(t *testing.T) { t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs) } + // For B, we expect it to send on the two subjects: test.request and foo.request + // then send the reply (MSG + RMSG). if i == 0 { - expected = 3 + expected = 4 } else { - expected = 5 + // The second time, one of the account will be suppressed, so we will get + // only 2 more messages. + expected = 6 } vz, _ = sb.Varz(nil) if vz.OutMsgs != expected { @@ -3522,7 +3564,7 @@ func TestGatewayServiceImportWithQueue(t *testing.T) { // on server B. checkForRegisteredQSubInterest(t, sb, "A", "$foo", "test.request", 1, time.Second) - for i := 0; i < 1; i++ { + for i := 0; i < 2; i++ { // Send the request from clientB on foo.request, natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) natsFlush(t, clientB) @@ -3559,16 +3601,20 @@ func TestGatewayServiceImportWithQueue(t *testing.T) { t.Fatalf("Unexpected msg: %v", msg) } - expected := int64(i + 2) + expected := int64((i + 1) * 2) vz, _ := sa.Varz(nil) if vz.OutMsgs != expected { t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs) } + // For B, we expect it to send on the two subjects: test.request and foo.request + // then send the reply (MSG + RMSG). if i == 0 { - expected = 3 - } else { expected = 4 + } else { + // The second time, one of the account will be suppressed, so we will get + // only 2 more messages. + expected = 6 } vz, _ = sb.Varz(nil) if vz.OutMsgs != expected { @@ -4477,3 +4523,67 @@ func TestGatewayMemUsage(t *testing.T) { s.Shutdown() } } + +func TestGatewayMapReplyOnlyForRecentSub(t *testing.T) { + o2 := testDefaultOptionsForGateway("B") + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + o1 := testGatewayOptionsFromToWithServers(t, "A", "B", s2) + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + waitForOutboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s2, 1, time.Second) + + // Change s1's recent sub expiration default value + s1.mu.Lock() + s1.gateway.pasi.Lock() + s1.gateway.recSubExp = 100 * time.Millisecond + s1.gateway.pasi.Unlock() + s1.mu.Unlock() + + // Setup a replier on s2 + nc2 := natsConnect(t, fmt.Sprintf("nats://%s:%d", o2.Host, o2.Port)) + defer nc2.Close() + count := 0 + errCh := make(chan error, 1) + natsSub(t, nc2, "foo", func(m *nats.Msg) { + // Send reply regardless.. + nc2.Publish(m.Reply, []byte("reply")) + if count == 0 { + if strings.HasPrefix(m.Reply, nats.InboxPrefix) { + errCh <- fmt.Errorf("First reply expected to have a special prefix, got %v", m.Reply) + return + } + count++ + } else { + if !strings.HasPrefix(m.Reply, nats.InboxPrefix) { + errCh <- fmt.Errorf("Second reply expected to have normal inbox, got %v", m.Reply) + return + } + errCh <- nil + } + }) + natsFlush(t, nc2) + checkExpectedSubs(t, 1, s2) + + // Create requestor on s1 + nc1 := natsConnect(t, fmt.Sprintf("nats://%s:%d", o1.Host, o1.Port)) + defer nc1.Close() + // Send first request, reply should be mapped + nc1.Request("foo", []byte("msg1"), time.Second) + // Wait more than the recent sub expiration (that we have set to 100ms) + time.Sleep(200 * time.Millisecond) + // Send second request (reply should not be mapped) + nc1.Request("foo", []byte("msg2"), time.Second) + + select { + case e := <-errCh: + if e != nil { + t.Fatalf(e.Error()) + } + case <-time.After(time.Second): + t.Fatalf("Did not get replies") + } +} diff --git a/server/leafnode.go b/server/leafnode.go index bdf01dae..3966c1e1 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1241,7 +1241,7 @@ func (c *client) processInboundLeafMsg(msg []byte) { atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { collect = true } - qnames = c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, collect) + qnames = c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, collect, false) } // Now deal with gateways diff --git a/server/norace_test.go b/server/norace_test.go index 0897810c..756b4a82 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -177,7 +177,6 @@ func TestNoRaceRoutedQueueAutoUnsubscribe(t *testing.T) { nbar := atomic.LoadInt32(&rbar) nbaz := atomic.LoadInt32(&rbaz) if nbar == expected && nbaz == expected { - time.Sleep(500 * time.Millisecond) return nil } return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'", @@ -329,3 +328,231 @@ func TestNoRaceSlowConsumerPendingBytes(t *testing.T) { } t.Fatal("Connection should have been closed") } + +func TestNoRaceGatewayNoMissingReplies(t *testing.T) { + // This test will have following setup: + // + // responder1 requestor + // | | + // v v + // [A1]<-------gw------------[B1] + // | \ | + // | \______gw__________ | route + // | _\| | + // [ ]--------gw----------->[ ] + // [A2]<-------gw------------[B2] + // [ ] [ ] + // ^ + // | + // responder2 + // + // There is a possible race that when the requestor creates + // a subscription on the reply subject, the subject interest + // being sent from the inbound gateway, and B1 having none, + // the SUB first goes to B2 before being sent to A1 from + // B2's inbound GW. But the request can go from B1 to A1 + // right away and the responder1 connecting to A1 may send + // back the reply before the interest on the reply makes it + // to A1 (from B2). + // This test will also verify that if the responder is instead + // connected to A2, the reply is properly received by requestor + // on B1. + + // For this test we want to be in interestOnly mode, so + // make it happen quickly + gatewayMaxRUnsubBeforeSwitch = 1 + defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }() + + // Start with setting up A2 and B2. + ob2 := testDefaultOptionsForGateway("B") + sb2 := runGatewayServer(ob2) + defer sb2.Shutdown() + + oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2) + sa2 := runGatewayServer(oa2) + defer sa2.Shutdown() + + waitForOutboundGateways(t, sa2, 1, time.Second) + waitForInboundGateways(t, sa2, 1, time.Second) + waitForOutboundGateways(t, sb2, 1, time.Second) + waitForInboundGateways(t, sb2, 1, time.Second) + + // Now start A1 which will connect to B2 + oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2) + oa1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", oa2.Cluster.Host, oa2.Cluster.Port)) + sa1 := runGatewayServer(oa1) + defer sa1.Shutdown() + + waitForOutboundGateways(t, sa1, 1, time.Second) + waitForInboundGateways(t, sb2, 2, time.Second) + + checkClusterFormed(t, sa1, sa2) + + // Finally, start B1 that will connect to A1. + ob1 := testGatewayOptionsFromToWithServers(t, "B", "A", sa1) + ob1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob2.Cluster.Host, ob2.Cluster.Port)) + sb1 := runGatewayServer(ob1) + defer sb1.Shutdown() + + // Check that we have the outbound gateway from B1 to A1 + checkFor(t, 3*time.Second, 15*time.Millisecond, func() error { + c := sb1.getOutboundGatewayConnection("A") + if c == nil { + return fmt.Errorf("Outbound connection to A not created yet") + } + c.mu.Lock() + name := c.opts.Name + nc := c.nc + c.mu.Unlock() + if name != sa1.ID() { + // Force a disconnect + nc.Close() + return fmt.Errorf("Was unable to have B1 connect to A1") + } + return nil + }) + + waitForInboundGateways(t, sa1, 1, time.Second) + checkClusterFormed(t, sb1, sb2) + + // Slow down GWs connections + // testSlowDownGatewayConnections(t, sa1, sa2, sb1, sb2) + + // For this test, since we are using qsubs on A and B, and we + // want to make sure that it is received only on B, make the + // recent sub expiration high (especially when running on + // Travis with GOGC=10 + setRecentSubExpiration := func(s *Server) { + s.mu.Lock() + s.gateway.pasi.Lock() + s.gateway.recSubExp = 10 * time.Second + s.gateway.pasi.Unlock() + s.mu.Unlock() + } + setRecentSubExpiration(sb1) + setRecentSubExpiration(sb2) + + a1URL := fmt.Sprintf("nats://%s:%d", oa1.Host, oa1.Port) + a2URL := fmt.Sprintf("nats://%s:%d", oa2.Host, oa2.Port) + b1URL := fmt.Sprintf("nats://%s:%d", ob1.Host, ob1.Port) + b2URL := fmt.Sprintf("nats://%s:%d", ob2.Host, ob2.Port) + + ncb1 := natsConnect(t, b1URL) + defer ncb1.Close() + + ncb2 := natsConnect(t, b2URL) + defer ncb2.Close() + + natsSubSync(t, ncb1, "just.a.sub") + natsSubSync(t, ncb2, "just.a.sub") + checkExpectedSubs(t, 2, sb1, sb2) + + // For this test, we want A to be checking B's interest in order + // to send messages (which would cause replies to be dropped if + // there is no interest registered on A). So from A servers, + // send to various subjects and cause B's to switch to interestOnly + // mode. + nca1 := natsConnect(t, a1URL) + defer nca1.Close() + for i := 0; i < 10; i++ { + natsPub(t, nca1, fmt.Sprintf("reject.%d", i), []byte("hello")) + } + nca2 := natsConnect(t, a2URL) + defer nca2.Close() + for i := 0; i < 10; i++ { + natsPub(t, nca2, fmt.Sprintf("reject.%d", i), []byte("hello")) + } + + checkSwitchedMode := func(t *testing.T, s *Server) { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + var switchedMode bool + c := s.getOutboundGatewayConnection("B") + ei, _ := c.gw.outsim.Load(globalAccountName) + if ei != nil { + e := ei.(*outsie) + e.RLock() + switchedMode = e.ni == nil && e.mode == modeInterestOnly + e.RUnlock() + } + if !switchedMode { + return fmt.Errorf("Still not switched mode") + } + return nil + }) + } + checkSwitchedMode(t, sa1) + checkSwitchedMode(t, sa2) + + // Setup a subscriber on _INBOX.> on each of A's servers. + total := 1000 + expected := int32(total) + rcvOnA := int32(0) + qrcvOnA := int32(0) + natsSub(t, nca1, "myreply.>", func(_ *nats.Msg) { + atomic.AddInt32(&rcvOnA, 1) + }) + natsQueueSub(t, nca2, "myreply.>", "bar", func(_ *nats.Msg) { + atomic.AddInt32(&qrcvOnA, 1) + }) + checkExpectedSubs(t, 2, sa1, sa2) + + // Ok.. so now we will run the actual test where we + // create a responder on A1 and make sure that every + // single request from B1 gets the reply. Will repeat + // test with responder connected to A2. + sendReqs := func(t *testing.T, subConn *nats.Conn) { + t.Helper() + responder := natsSub(t, subConn, "foo", func(m *nats.Msg) { + nca1.Publish(m.Reply, []byte("reply")) + }) + natsFlush(t, subConn) + checkExpectedSubs(t, 3, sa1, sa2) + + // We are not going to use Request() because this sets + // a wildcard subscription on an INBOX and less likely + // to produce the race. Instead we will explicitly set + // the subscription on the reply subject and create one + // per request. + for i := 0; i < total/2; i++ { + reply := fmt.Sprintf("myreply.%d", i) + replySub := natsQueueSubSync(t, ncb1, reply, "bar") + natsFlush(t, ncb1) + + // Let's make sure we have interest on B2. + if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 { + checkFor(t, time.Second, time.Millisecond, func() error { + if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 { + return fmt.Errorf("B still not registered interest on %s", reply) + } + return nil + }) + } + natsPubReq(t, ncb1, "foo", reply, []byte("request")) + if _, err := replySub.NextMsg(time.Second); err != nil { + t.Fatalf("Did not receive reply: %v", err) + } + natsUnsub(t, replySub) + } + + responder.Unsubscribe() + natsFlush(t, subConn) + checkExpectedSubs(t, 2, sa1, sa2) + } + sendReqs(t, nca1) + sendReqs(t, nca2) + + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if n := atomic.LoadInt32(&rcvOnA); n != expected { + return fmt.Errorf("Subs on A expected to get %v replies, got %v", expected, n) + } + return nil + }) + + // We should not have received a single message on the queue sub + // on cluster A because messages will have been delivered to + // the member on cluster B. + if n := atomic.LoadInt32(&qrcvOnA); n != 0 { + t.Fatalf("Queue sub on A should not have received message, got %v", n) + } +} diff --git a/server/route.go b/server/route.go index 408f984c..1e495063 100644 --- a/server/route.go +++ b/server/route.go @@ -299,7 +299,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) { c.mu.Unlock() } } - c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false) + c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false, false) } // Helper function for routes and gateways to create qfilters need for @@ -1038,7 +1038,12 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra // the lock, which could cause pingTimer to think that this // connection is stale otherwise. c.last = time.Now() - c.flushOutbound() + if !c.flushOutbound() { + // Another go routine is flushing already and does not + // have the lock. Give it a chance to finish... + c.mu.Unlock() + c.mu.Lock() + } if closed = c.flags.isSet(clearConnection); closed { break } diff --git a/test/bench_test.go b/test/bench_test.go index cc6c80cf..17b0f0c4 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -983,6 +983,8 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish startCh := make(chan bool) l := b.N / numPublishers + lastMsgSendOp := []byte("PUB end.test 2\r\nok\r\n") + pubLoop := func(c net.Conn, ch chan bool) { bw := bufio.NewWriterSize(c, defaultSendBufSize) @@ -998,7 +1000,7 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish return } } - if _, err := bw.Write([]byte("PUB end.test 2\r\nok\r\n")); err != nil { + if _, err := bw.Write(lastMsgSendOp); err != nil { b.Errorf("Received error on PUB write: %v\n", err) return } @@ -1011,6 +1013,7 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish // Publish Connections SPINUP for i := 0; i < numPublishers; i++ { c := createClientConn(b, oa.Host, oa.Port) + defer c.Close() doDefaultConnect(b, c) flushConnection(b, c) ch := make(chan bool) @@ -1019,7 +1022,18 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish <-ch } - b.SetBytes(int64(len(sendOp) + len(msgOp))) + // To report the number of bytes: + // from publisher to server on cluster A: + numBytes := len(sendOp) + if subInterest { + // from server in cluster A to server on cluster B: + // RMSG $G foo \r\n + numBytes += len("RMSG $G foo xxxx ") + len(payload) + 2 + + // From server in cluster B to sub: + numBytes += len(msgOp) + } + b.SetBytes(int64(numBytes)) b.ResetTimer() // Closing this will start all publishers at once (roughly) @@ -1031,51 +1045,51 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish b.StopTimer() } -func Benchmark_Gateways___Optimistic_1kx01x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_1kx01x0(b *testing.B) { gatewaysBench(b, true, sizedString(1024), 1, false) } -func Benchmark_Gateways___Optimistic_2kx01x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_2kx01x0(b *testing.B) { gatewaysBench(b, true, sizedString(2048), 1, false) } -func Benchmark_Gateways___Optimistic_4kx01x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_4kx01x0(b *testing.B) { gatewaysBench(b, true, sizedString(4096), 1, false) } -func Benchmark_Gateways___Optimistic_1kx10x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_1kx10x0(b *testing.B) { gatewaysBench(b, true, sizedString(1024), 10, false) } -func Benchmark_Gateways___Optimistic_2kx10x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_2kx10x0(b *testing.B) { gatewaysBench(b, true, sizedString(2048), 10, false) } -func Benchmark_Gateways___Optimistic_4kx10x0(b *testing.B) { +func Benchmark_Gateways_Optimistic_4kx10x0(b *testing.B) { gatewaysBench(b, true, sizedString(4096), 10, false) } -func Benchmark_Gateways___Optimistic_1kx01x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_1kx01x1(b *testing.B) { gatewaysBench(b, true, sizedString(1024), 1, true) } -func Benchmark_Gateways___Optimistic_2kx01x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_2kx01x1(b *testing.B) { gatewaysBench(b, true, sizedString(2048), 1, true) } -func Benchmark_Gateways___Optimistic_4kx01x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_4kx01x1(b *testing.B) { gatewaysBench(b, true, sizedString(4096), 1, true) } -func Benchmark_Gateways___Optimistic_1kx10x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_1kx10x1(b *testing.B) { gatewaysBench(b, true, sizedString(1024), 10, true) } -func Benchmark_Gateways___Optimistic_2kx10x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_2kx10x1(b *testing.B) { gatewaysBench(b, true, sizedString(2048), 10, true) } -func Benchmark_Gateways___Optimistic_4kx10x1(b *testing.B) { +func Benchmark_Gateways_Optimistic_4kx10x1(b *testing.B) { gatewaysBench(b, true, sizedString(4096), 10, true) } @@ -1126,3 +1140,96 @@ func Benchmark_Gateways_InterestOnly_2kx10x1(b *testing.B) { func Benchmark_Gateways_InterestOnly_4kx10x1(b *testing.B) { gatewaysBench(b, false, sizedString(4096), 10, true) } + +// This bench only sends the requests to verify impact of reply +// reply mapping in GW code. +func gatewaySendRequestsBench(b *testing.B, singleReplySub bool) { + server.SetGatewaysSolicitDelay(10 * time.Millisecond) + defer server.ResetGatewaysSolicitDelay() + + ob := testDefaultOptionsForGateway("B") + sb := RunServer(ob) + defer sb.Shutdown() + + gwbURL, err := url.Parse(fmt.Sprintf("nats://%s:%d", ob.Gateway.Host, ob.Gateway.Port)) + if err != nil { + b.Fatalf("Error parsing url: %v", err) + } + oa := testDefaultOptionsForGateway("A") + oa.Gateway.Gateways = []*server.RemoteGatewayOpts{ + &server.RemoteGatewayOpts{ + Name: "B", + URLs: []*url.URL{gwbURL}, + }, + } + sa := RunServer(oa) + defer sa.Shutdown() + + sub := createClientConn(b, ob.Host, ob.Port) + defer sub.Close() + doDefaultConnect(b, sub) + sendProto(b, sub, "SUB foo 1\r\n") + flushConnection(b, sub) + + lenMsg := len("MSG foo reply.xxxxxxxxxx 1 2\r\nok\r\n") + expected := b.N * lenMsg + if !singleReplySub { + expected += b.N * len("$GR.1234.") + } + ch := make(chan bool, 1) + go drainConnection(b, sub, ch, expected) + + c := createClientConn(b, oa.Host, oa.Port) + defer c.Close() + doDefaultConnect(b, c) + if singleReplySub { + sendProto(b, c, "SUB reply.* 1\r\n") + } + flushConnection(b, c) + + // If a single sub for replies is created, wait for more + // than the duration under which we do reply mapping + if singleReplySub { + time.Sleep(1100 * time.Millisecond) + } + + bw := bufio.NewWriterSize(c, defaultSendBufSize) + + // From pub to server in cluster A: + numBytes := len("PUB foo reply.0123456789 2\r\nok\r\n") + if !singleReplySub { + // Add the preceding SUB + numBytes += len("SUB reply.0123456789 0123456789\r\n") + } + // From server in cluster A to cluster B + numBytes += len("RMSG $G foo reply.0123456789 2\r\nok\r\n") + // If mapping of reply... + if !singleReplySub { + // the mapping uses about 10 more bytes. So add them + // for RMSG from server to server, and MSG to sub. + numBytes += 20 + } + // From server in cluster B to sub + numBytes += lenMsg + b.SetBytes(int64(numBytes)) + b.ResetTimer() + + var subStr string + for i := 0; i < b.N; i++ { + if !singleReplySub { + subStr = fmt.Sprintf("SUB reply.%010d %010d\r\n", i+1, i+1) + } + bw.Write([]byte(fmt.Sprintf("%sPUB foo reply.%010d 2\r\nok\r\n", subStr, i+1))) + } + bw.Flush() + + <-ch +} + +func Benchmark_Gateways_Requests_CreateOneSubForAll(b *testing.B) { + gatewaySendRequestsBench(b, true) +} + +func Benchmark_Gateways_Requests_CreateOneSubForEach(b *testing.B) { + gatewaySendRequestsBench(b, true) +} diff --git a/test/gateway_test.go b/test/gateway_test.go index 28e5e395..1b209fec 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -14,6 +14,7 @@ package test import ( + "bufio" "bytes" "fmt" "net" @@ -415,11 +416,33 @@ func TestGatewaySendAllSubs(t *testing.T) { // switch. for i := 0; i < 10001; i++ { gASend(fmt.Sprintf("RMSG $G foo.%d 2\r\nok\r\n", i)) - if i < 1000 { + if i <= 1000 { gAExpect(runsubRe) } } - // Since B has no sub, we should get 2 INFOs with start/end - // commands. - expectNumberOfProtos(t, gAExpect, infoRe, 2) + // Expect an INFO + RS+ $G not.used + INFO + buf := bufio.NewReader(gA) + for i := 0; i < 3; i++ { + line, _, err := buf.ReadLine() + if err != nil { + t.Fatalf("Error reading: %v", err) + } + switch i { + case 0: + case 2: + if !bytes.HasPrefix(line, []byte("INFO {")) { + t.Fatalf("Expected INFO, got: %s", line) + } + case 1: + if !bytes.HasPrefix(line, []byte("RS+ ")) { + t.Fatalf("Expected RS+, got: %s", line) + } + } + } + // After this point, any new sub or unsub on B should be + // sent to A. + clientSend("SUB foo 1\r\n") + gAExpect(rsubRe) + clientSend("UNSUB 1\r\n") + gAExpect(runsubRe) }