Gateways: Fix race for request reply

This addresses the following race:
- client connection creates a subscription on a reply subject
- client connection sends a request
- server sends the subscription to inbound gateway
- server sends the message to outbound gateway (those may be
  to different servers)
- receiving server sends to sub interested in request subject
- app sends reply
- its server then check for interest on the reply's subject

In interestOnly mode, there is a possibility that this server
has not received the interest on the reply subject yet and would
then drop the reply.

This PR detects above scenario and will prefix the reply subject
to identify the origin cluster if it is detected that the last
subscription from the sending connection was created less than
a second ago.
Once the destination has this prefix, the destination cluster
will always send back that message to origin cluster even if
there is no registered interest.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2019-03-05 15:55:21 -07:00
parent 2a7b2a9578
commit bb4e8ae0f9
9 changed files with 762 additions and 123 deletions

View File

@@ -300,7 +300,7 @@ func (a *Account) removeServiceImport(subject string) {
delete(a.imports.services, subject)
a.mu.Unlock()
if a.srv != nil && a.srv.gateway.enabled {
a.srv.gatewayHandleServiceImport(a, []byte(subject), -1)
a.srv.gatewayHandleServiceImport(a, []byte(subject), nil, -1)
}
}

View File

@@ -271,6 +271,16 @@ type readCache struct {
rsz int32 // Read buffer size
srs int32 // Short reads, used for dynamic buffer resizing.
// When gateways are enabled, this holds the last subscription created
// by this connection and time of creation. When a message needs to
// cross a gateway and has a reply, the reply is prefixed with the cluster
// name of origin if this last subscription is a match for the "reply"
// subject. This is in order to solve req/reply race where the reply may be
// processed in a destination cluster before the subscription interest for
// that reply makes it there (due to different outbound/inbound connections).
lastSub *subscription
lastSubExpire time.Time
}
const (
@@ -2280,7 +2290,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
collect = true
}
qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect)
qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect, false)
}
// Now deal with gateways
@@ -2316,7 +2326,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
// and possibly to inbound GW connections for
// which we are in interest-only mode.
if c.kind == CLIENT && c.srv.gateway.enabled {
c.srv.gatewayHandleServiceImport(rm.acc, nrr, 1)
c.srv.gatewayHandleServiceImport(rm.acc, nrr, c, 1)
}
}
// FIXME(dlc) - Do L1 cache trick from above.
@@ -2329,7 +2339,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
}
sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF)
queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs)
queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs, false)
// If this is not a gateway connection but gateway is enabled,
// try to send this converted message to all gateways.
if sendToGWs {
@@ -2374,7 +2384,9 @@ func (c *client) addSubToRouteTargets(sub *subscription) {
}
// This processes the sublist results for a given message.
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, collect bool) [][]byte {
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte,
collectQueueNames, allowGWQueuesWithoutFilter bool) [][]byte {
var queues [][]byte
// msg header for clients.
msgh := c.msgb[1:msgHeadProtoLen]
@@ -2435,7 +2447,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
// leaf nodes or routes even if there are no queue filters since we collect
// them above and do not process inline like normal clients.
if c.kind != CLIENT && qf == nil {
goto sendToRoutesOrLeafs
// For gateway connections, if allowGWQueuesWithoutFilter is true,
// really treat this as if it was a client connection and possibly
// pick queue subs.
if !(c.kind == GATEWAY && allowGWQueuesWithoutFilter) {
goto sendToRoutesOrLeafs
}
}
// Check to see if we have our own rand yet. Global rand
@@ -2487,7 +2504,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
continue
} else {
c.addSubToRouteTargets(sub)
if collect {
if collectQueueNames {
queues = append(queues, sub.queue)
}
}
@@ -2508,7 +2525,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
if c.deliverMsg(sub, mh, msg) {
// Clear rsub
rsub = nil
if collect {
if collectQueueNames {
queues = append(queues, sub.queue)
}
break
@@ -2519,7 +2536,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote or leaf node.
c.addSubToRouteTargets(rsub)
if collect {
if collectQueueNames {
queues = append(queues, rsub.queue)
}
}

View File

@@ -15,6 +15,7 @@ package server
import (
"bytes"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"fmt"
@@ -32,7 +33,10 @@ const (
defaultSolicitGatewaysDelay = time.Second
defaultGatewayConnectDelay = time.Second
defaultGatewayReconnectDelay = time.Second
defaultGatewayRecentSubExpiration = time.Second
defaultGatewayMaxRUnsubBeforeSwitch = 1000
gwReplyPrefix = "$GR."
gwReplyStart = len(gwReplyPrefix) + 5 // len of prefix above + len of hash (4) + "."
)
var (
@@ -93,6 +97,7 @@ type srvGateway struct {
info *Info // Gateway Info protocol
infoJSON []byte // Marshal'ed Info protocol
runknown bool // Rejects unknown (not configured) gateway connections
replyPfx []byte // Will be "$GR.<this cluster name hash>."
// We maintain the interest of subjects and queues per account.
// For a given account, entries in the map could be something like this:
@@ -110,8 +115,9 @@ type srvGateway struct {
m map[string]map[string]*sitally
}
resolver netResolver // Used to resolve host name before calling net.Dial()
sqbsz int // Max buffer size to send queue subs protocol. Used for testing.
resolver netResolver // Used to resolve host name before calling net.Dial()
sqbsz int // Max buffer size to send queue subs protocol. Used for testing.
recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply
}
// Subject interest tally. Also indicates if the key in the map is a
@@ -124,6 +130,7 @@ type sitally struct {
type gatewayCfg struct {
sync.RWMutex
*RemoteGatewayOpts
replyPfx []byte
urls map[string]*url.URL
connAttempts int
implicit bool
@@ -218,6 +225,19 @@ func validateGatewayOptions(o *Options) error {
return nil
}
// Computes a hash of 4 characters for the given gateway name.
// This will be used for routing of replies.
func getReplyPrefixForGateway(name string) []byte {
sha := sha256.New()
sha.Write([]byte(name))
fullHash := []byte(fmt.Sprintf("%x", sha.Sum(nil)))
prefix := make([]byte, 0, len(gwReplyPrefix)+5)
prefix = append(prefix, gwReplyPrefix...)
prefix = append(prefix, fullHash[:4]...)
prefix = append(prefix, '.')
return prefix
}
// Initialize the s.gateway structure. We do this even if the server
// does not have a gateway configured. In some part of the code, the
// server will check the number of outbound gateways, etc.. and so
@@ -232,6 +252,7 @@ func newGateway(opts *Options) (*srvGateway, error) {
URLs: make(map[string]struct{}),
resolver: opts.Gateway.resolver,
runknown: opts.Gateway.RejectUnknown,
replyPfx: getReplyPrefixForGateway(opts.Gateway.Name),
}
gateway.Lock()
defer gateway.Unlock()
@@ -250,6 +271,7 @@ func newGateway(opts *Options) (*srvGateway, error) {
}
cfg := &gatewayCfg{
RemoteGatewayOpts: rgo.clone(),
replyPfx: getReplyPrefixForGateway(rgo.Name),
urls: make(map[string]*url.URL, len(rgo.URLs)),
}
if opts.Gateway.TLSConfig != nil && cfg.TLSConfig == nil {
@@ -270,6 +292,7 @@ func newGateway(opts *Options) (*srvGateway, error) {
if gateway.sqbsz == 0 {
gateway.sqbsz = maxBufSize
}
gateway.recSubExp = defaultGatewayRecentSubExpiration
gateway.enabled = opts.Gateway.Name != "" && opts.Gateway.Port != 0
return gateway, nil
@@ -1029,9 +1052,11 @@ func (s *Server) sendSubsToGateway(c *client, accountName []byte) {
// Instruct to send all subs (RS+/-) for this account from now on.
c.mu.Lock()
e := c.gw.insim[string(accountName)]
if e != nil {
e.mode = modeInterestOnly
if e == nil {
e = &insie{}
c.gw.insim[string(accountName)] = e
}
e.mode = modeInterestOnly
c.mu.Unlock()
} else {
// Send queues for all accounts
@@ -1173,6 +1198,7 @@ func (s *Server) processImplicitGateway(info *Info) {
opts := s.getOpts()
cfg = &gatewayCfg{
RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName},
replyPfx: getReplyPrefixForGateway(gwName),
urls: make(map[string]*url.URL, len(info.GatewayURLs)),
implicit: true,
}
@@ -1901,7 +1927,7 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription
}
}
if proto != nil {
c.sendProto(proto, true)
c.sendProto(proto, false)
if c.trace {
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
}
@@ -1947,7 +1973,7 @@ func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscriptio
proto = append(proto, CR_LF...)
}
c.mu.Lock()
c.sendProto(proto, true)
c.sendProto(proto, false)
if c.trace {
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
}
@@ -2014,6 +2040,11 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
}
}
if first || last {
if first && sub.client != nil {
c := sub.client
c.in.lastSub = sub
c.in.lastSubExpire = time.Now().Add(s.gateway.recSubExp)
}
if entry.q {
s.sendQueueSubOrUnsubToGateways(accName, sub, first)
} else {
@@ -2022,6 +2053,33 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
}
}
// Returns true if the given subject starts with `$GR.`
func subjectStartsWithGatewayReplyPrefix(subj []byte) bool {
return len(subj) > gwReplyStart && string(subj[:len(gwReplyPrefix)]) == gwReplyPrefix
}
// Evaluates if the given reply should be mapped (adding the origin cluster
// hash as a prefix) or not.
func (c *client) shouldMapReplyForGatewaySend(reply []byte) bool {
if c.in.lastSub == nil {
return false
}
if time.Now().After(c.in.lastSubExpire) {
c.in.lastSub = nil
return false
}
if subjectStartsWithGatewayReplyPrefix(reply) {
return false
}
return matchLiteral(string(reply), string(c.in.lastSub.subject))
}
var subPool = &sync.Pool{
New: func() interface{} {
return &subscription{}
},
}
// May send a message to all outbound gateways. It is possible
// that the message is not sent to a given gateway if for instance
// it is known that this gateway has no interest in the account or
@@ -2038,46 +2096,84 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
for i := 0; i < len(gw.outo); i++ {
gws = append(gws, gw.outo[i])
}
thisClusterReplyPrefix := gw.replyPfx
gw.RUnlock()
if len(gws) == 0 {
return
}
var (
subj = string(subject)
queuesa = [512]byte{}
queues = queuesa[:0]
accName = acc.Name
subj = string(subject)
queuesa = [512]byte{}
queues = queuesa[:0]
accName = acc.Name
mreplya [256]byte
mreply []byte
dstPfx []byte
checkReply = reply != nil
)
// Get a subscription from the pool
sub := subPool.Get().(*subscription)
// Check if the subject is on "$GR.<cluster hash>.",
// and if so, send to that GW regardless of its
// interest on the real subject (that is, skip the
// check of subject interest).
if subjectStartsWithGatewayReplyPrefix(subject) {
dstPfx = subject[:gwReplyStart]
}
for i := 0; i < len(gws); i++ {
gwc := gws[i]
// Plain sub interest and queue sub results for this account/subject
psi, qr := gwc.gatewayInterest(accName, subj)
if !psi && qr == nil {
continue
}
queues = queuesa[:0]
if qr != nil {
for i := 0; i < len(qr.qsubs); i++ {
qsubs := qr.qsubs[i]
if len(qsubs) > 0 {
queue := qsubs[0].queue
add := true
for _, qn := range qgroups {
if bytes.Equal(queue, qn) {
add = false
break
if dstPfx != nil {
gwc.mu.Lock()
ok := gwc.gw.cfg != nil && bytes.Equal(dstPfx, gwc.gw.cfg.replyPfx)
gwc.mu.Unlock()
if !ok {
continue
}
} else {
// Plain sub interest and queue sub results for this account/subject
psi, qr := gwc.gatewayInterest(accName, subj)
if !psi && qr == nil {
continue
}
queues = queuesa[:0]
if qr != nil {
for i := 0; i < len(qr.qsubs); i++ {
qsubs := qr.qsubs[i]
if len(qsubs) > 0 {
queue := qsubs[0].queue
add := true
for _, qn := range qgroups {
if bytes.Equal(queue, qn) {
add = false
break
}
}
if add {
qgroups = append(qgroups, queue)
queues = append(queues, queue...)
queues = append(queues, ' ')
}
}
if add {
qgroups = append(qgroups, queue)
queues = append(queues, queue...)
queues = append(queues, ' ')
}
}
}
if !psi && len(queues) == 0 {
continue
}
}
if !psi && len(queues) == 0 {
continue
if checkReply {
// Check/map only once
checkReply = false
// Assume we will use original
mreply = reply
// If there was a recent matching subscription on that connection
// and the reply is not already mapped, then map (add prefix).
if c.shouldMapReplyForGatewaySend(reply) {
mreply = mreplya[:0]
mreply = append(mreply, thisClusterReplyPrefix...)
mreply = append(mreply, reply...)
}
}
mh := c.msgb[:msgHeadProtoLen]
mh = append(mh, accName...)
@@ -2087,29 +2183,35 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
if len(queues) > 0 {
if reply != nil {
mh = append(mh, "+ "...) // Signal that there is a reply.
mh = append(mh, reply...)
mh = append(mh, mreply...)
mh = append(mh, ' ')
} else {
mh = append(mh, "| "...) // Only queues
}
mh = append(mh, queues...)
} else if reply != nil {
mh = append(mh, reply...)
mh = append(mh, mreply...)
mh = append(mh, ' ')
}
mh = append(mh, c.pa.szb...)
mh = append(mh, CR_LF...)
sub := subscription{client: gwc, subject: c.pa.subject}
c.deliverMsg(&sub, mh, msg)
// We reuse the subscription object that we pass to deliverMsg.
sub.client = gwc
sub.subject = c.pa.subject
c.deliverMsg(sub, mh, msg)
}
// Done with subscription, put back to pool. We don't need
// to reset content since we explicitly set when using it.
subPool.Put(sub)
}
func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change int32) {
func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, c *client, change int32) {
sid := make([]byte, 0, len(acc.Name)+len(subject)+1)
sid = append(sid, acc.Name...)
sid = append(sid, ' ')
sid = append(sid, subject...)
sub := &subscription{subject: subject, sid: sid}
sub := &subscription{client: c, subject: subject, sid: sid}
var rspa [1024]byte
rsproto := rspa[:0]
@@ -2125,7 +2227,7 @@ func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change
s.mu.Lock()
for _, r := range s.routes {
r.mu.Lock()
r.sendProto(rsproto, true)
r.sendProto(rsproto, false)
if r.trace {
r.traceOutOp("", rsproto[:len(rsproto)-LEN_CR_LF])
}
@@ -2161,7 +2263,8 @@ func (s *Server) gatewayHandleAccountNoInterest(c *client, accName []byte) {
func (c *client) sendAccountUnsubToGateway(accName []byte) {
// Check if we have sent the A- or not.
c.mu.Lock()
if _, sent := c.gw.insim[string(accName)]; !sent {
e, sent := c.gw.insim[string(accName)]
if e != nil || !sent {
// Add a nil value to indicate that we have sent an A-
// so that we know to send A+ when needed.
c.gw.insim[string(accName)] = nil
@@ -2170,7 +2273,7 @@ func (c *client) sendAccountUnsubToGateway(accName []byte) {
proto = append(proto, aUnsubBytes...)
proto = append(proto, accName...)
proto = append(proto, CR_LF...)
c.sendProto(proto, true)
c.sendProto(proto, false)
if c.trace {
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
}
@@ -2226,7 +2329,7 @@ func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName
proto = append(proto, ' ')
proto = append(proto, subject...)
proto = append(proto, CR_LF...)
c.sendProto(proto, true)
c.sendProto(proto, false)
if c.trace {
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
}
@@ -2238,6 +2341,17 @@ func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName
}
}
func (g *srvGateway) getReplyPrefix() []byte {
g.RLock()
replyPfx := g.replyPfx
g.RUnlock()
return replyPfx
}
func (s *Server) isGatewayReplyForThisCluster(subj []byte) bool {
return string(s.gateway.getReplyPrefix()) == string(subj[:gwReplyStart])
}
// Process a message coming from a remote gateway. Send to any sub/qsub
// in our cluster that is matching. When receiving a message for an
// account or subject for which there is no interest in this cluster
@@ -2262,6 +2376,34 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
return
}
// If we receive a message on $GR.<cluster>.<subj>
// we will drop the prefix before processing interest
// in this cluster, but we also need to resend to
// other gateways.
sendBackToGateways := false
// First thing to do is to check if the subject starts
// with "$GR.<hash>.".
if subjectStartsWithGatewayReplyPrefix(c.pa.subject) {
// If it does, then is this server/cluster the actual
// destination for this message?
if !c.srv.isGatewayReplyForThisCluster(c.pa.subject) {
// We could report, for now, just drop.
return
}
// Adjust the subject to past the prefix
c.pa.subject = c.pa.subject[gwReplyStart:]
// Use a stack buffer to rewrite c.pa.cache since we
// only need it for getAccAndResultFromCache()
var _pacache [256]byte
pacache := _pacache[:0]
pacache = append(pacache, c.pa.account...)
pacache = append(pacache, ' ')
pacache = append(pacache, c.pa.subject...)
c.pa.pacache = pacache
sendBackToGateways = true
}
acc, r := c.getAccAndResultFromCache()
if acc == nil {
c.Debugf("Unknown account %q for gateway message on subject: %q", c.pa.account, c.pa.subject)
@@ -2277,19 +2419,27 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
c.checkForImportServices(acc, msg)
}
// If there is no interest on plain subs, possibly send an RS-,
// even if there is qsubs interest.
if len(r.psubs) == 0 {
c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject)
if !sendBackToGateways {
// If there is no interest on plain subs, possibly send an RS-,
// even if there is qsubs interest.
if len(r.psubs) == 0 {
c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject)
// If there is also no queue filter, then no point in continuing
// (even if r.qsubs i > 0).
if len(c.pa.queues) == 0 {
return
// If there is also no queue filter, then no point in continuing
// (even if r.qsubs i > 0).
if len(c.pa.queues) == 0 {
return
}
}
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false, false)
} else {
// We normally would not allow sending to a queue unless the
// RMSG contains the queue groups, however, if the incoming
// message was a "$GR." then we need to act as if this was
// a CLIENT connection..
qnames := c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, true, true)
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames)
}
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
}
// Indicates that the remote which we are sending messages to

View File

@@ -1406,6 +1406,59 @@ func TestGatewayAccountInterest(t *testing.T) {
checkCount(t, gwcc, 1)
}
func TestGatewayAccountUnsub(t *testing.T) {
ob := testDefaultOptionsForGateway("B")
sb := runGatewayServer(ob)
defer sb.Shutdown()
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
sa := runGatewayServer(oa)
defer sa.Shutdown()
waitForOutboundGateways(t, sa, 1, time.Second)
waitForOutboundGateways(t, sb, 1, time.Second)
waitForInboundGateways(t, sa, 1, time.Second)
waitForInboundGateways(t, sb, 1, time.Second)
// Connect on B
ncb := natsConnect(t, fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port))
defer ncb.Close()
// Create subscription
natsSub(t, ncb, "foo", func(m *nats.Msg) {
ncb.Publish(m.Reply, []byte("reply"))
})
// Connect on A
nca := natsConnect(t, fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port))
defer nca.Close()
// Send a request
if _, err := nca.Request("foo", []byte("req"), time.Second); err != nil {
t.Fatalf("Error getting reply: %v", err)
}
// Now close connection on B
ncb.Close()
// Publish lots of messages on "foo" from A.
// We should receive an A- shortly and the number
// of outbound messages from A to B should not be
// close to the number of messages sent here.
total := 5000
for i := 0; i < total; i++ {
natsPub(t, nca, "foo", []byte("hello"))
}
natsFlush(t, nca)
c := sa.getOutboundGatewayConnection("B")
c.mu.Lock()
out := c.outMsgs
c.mu.Unlock()
if out >= int64(80*total)/100 {
t.Fatalf("Unexpected number of messages sent from A to B: %v", out)
}
}
func TestGatewaySubjectInterest(t *testing.T) {
o1 := testDefaultOptionsForGateway("A")
setAccountUserPassInOptions(o1, "$foo", "ivan", "password")
@@ -2869,18 +2922,16 @@ func TestGatewaySendAllSubs(t *testing.T) {
waitForInboundGateways(t, sb, 2, time.Second)
waitForInboundGateways(t, sc, 2, time.Second)
// On B, create a sub that will reply to requests
bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
ncB := natsConnect(t, bURL)
defer ncB.Close()
natsSub(t, ncB, "foo", func(m *nats.Msg) {
ncB.Publish(m.Reply, m.Data)
})
natsFlush(t, ncB)
checkExpectedSubs(t, 1, sb)
// On A, create a sub to register some interest
aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
ncA := natsConnect(t, aURL)
defer ncA.Close()
natsSub(t, ncA, "sub.on.a.*", func(m *nats.Msg) {})
natsFlush(t, ncA)
checkExpectedSubs(t, 1, sa)
// On C, have some delayed activity while it receives
// unwanted messages and switches to sendAllSubs.
// On C, have some sub activity while it receives
// unwanted messages and switches to interestOnly mode.
cURL := fmt.Sprintf("nats://%s:%d", oc.Host, oc.Port)
ncC := natsConnect(t, cURL)
defer ncC.Close()
@@ -2910,6 +2961,11 @@ func TestGatewaySendAllSubs(t *testing.T) {
}
}()
// From B publish on subjects for which C has an interest
bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
ncB := natsConnect(t, bURL)
defer ncB.Close()
go func() {
defer wg.Done()
time.Sleep(10 * time.Millisecond)
@@ -2925,22 +2981,15 @@ func TestGatewaySendAllSubs(t *testing.T) {
}
}()
// From A, send a lot of requests.
aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
ncA := natsConnect(t, aURL)
defer ncA.Close()
// From B, send a lot of messages that A is interested in,
// but not C.
// TODO(ik): May need to change that if we change the threshold
// for when the switch happens.
total := 300
for i := 0; i < total; i++ {
req := fmt.Sprintf("%d", i)
reply, err := ncA.Request("foo", []byte(req), time.Second)
if err != nil {
if err := ncB.Publish(fmt.Sprintf("sub.on.a.%d", i), []byte("hi")); err != nil {
t.Fatalf("Error waiting for reply: %v", err)
}
if string(reply.Data) != req {
t.Fatalf("Expected reply %q, got %q", req, reply.Data)
}
}
close(done)
@@ -2996,17 +3045,6 @@ func TestGatewaySendAllSubs(t *testing.T) {
return nil
})
for i := 0; i < total; i++ {
req := fmt.Sprintf("%d", i)
reply, err := ncA.Request("foo", []byte(req), time.Second)
if err != nil {
t.Fatalf("Error waiting for reply: %v", err)
}
if string(reply.Data) != req {
t.Fatalf("Expected reply %q, got %q", req, reply.Data)
}
}
// Also, after all that, if a sub is created on C, it should
// be sent to B (but not A). Check that this is the case.
// So first send from A on the subject that we are going to
@@ -3218,7 +3256,7 @@ func TestGatewayServiceImport(t *testing.T) {
subB := natsSubSync(t, clientB, "reply")
natsFlush(t, clientB)
for i := 0; i < 1; i++ {
for i := 0; i < 2; i++ {
// Send the request from clientB on foo.request,
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
natsFlush(t, clientB)
@@ -3263,10 +3301,14 @@ func TestGatewayServiceImport(t *testing.T) {
t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs)
}
// For B, we expect it to send on the two subjects: test.request and foo.request
// then send the reply (MSG + RMSG).
if i == 0 {
expected = 3
expected = 4
} else {
expected = 5
// The second time, one of the account will be suppressed, so we will get
// only 2 more messages.
expected = 6
}
vz, _ = sb.Varz(nil)
if vz.OutMsgs != expected {
@@ -3522,7 +3564,7 @@ func TestGatewayServiceImportWithQueue(t *testing.T) {
// on server B.
checkForRegisteredQSubInterest(t, sb, "A", "$foo", "test.request", 1, time.Second)
for i := 0; i < 1; i++ {
for i := 0; i < 2; i++ {
// Send the request from clientB on foo.request,
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
natsFlush(t, clientB)
@@ -3559,16 +3601,20 @@ func TestGatewayServiceImportWithQueue(t *testing.T) {
t.Fatalf("Unexpected msg: %v", msg)
}
expected := int64(i + 2)
expected := int64((i + 1) * 2)
vz, _ := sa.Varz(nil)
if vz.OutMsgs != expected {
t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs)
}
// For B, we expect it to send on the two subjects: test.request and foo.request
// then send the reply (MSG + RMSG).
if i == 0 {
expected = 3
} else {
expected = 4
} else {
// The second time, one of the account will be suppressed, so we will get
// only 2 more messages.
expected = 6
}
vz, _ = sb.Varz(nil)
if vz.OutMsgs != expected {
@@ -4477,3 +4523,67 @@ func TestGatewayMemUsage(t *testing.T) {
s.Shutdown()
}
}
func TestGatewayMapReplyOnlyForRecentSub(t *testing.T) {
o2 := testDefaultOptionsForGateway("B")
s2 := runGatewayServer(o2)
defer s2.Shutdown()
o1 := testGatewayOptionsFromToWithServers(t, "A", "B", s2)
s1 := runGatewayServer(o1)
defer s1.Shutdown()
waitForOutboundGateways(t, s1, 1, time.Second)
waitForOutboundGateways(t, s2, 1, time.Second)
// Change s1's recent sub expiration default value
s1.mu.Lock()
s1.gateway.pasi.Lock()
s1.gateway.recSubExp = 100 * time.Millisecond
s1.gateway.pasi.Unlock()
s1.mu.Unlock()
// Setup a replier on s2
nc2 := natsConnect(t, fmt.Sprintf("nats://%s:%d", o2.Host, o2.Port))
defer nc2.Close()
count := 0
errCh := make(chan error, 1)
natsSub(t, nc2, "foo", func(m *nats.Msg) {
// Send reply regardless..
nc2.Publish(m.Reply, []byte("reply"))
if count == 0 {
if strings.HasPrefix(m.Reply, nats.InboxPrefix) {
errCh <- fmt.Errorf("First reply expected to have a special prefix, got %v", m.Reply)
return
}
count++
} else {
if !strings.HasPrefix(m.Reply, nats.InboxPrefix) {
errCh <- fmt.Errorf("Second reply expected to have normal inbox, got %v", m.Reply)
return
}
errCh <- nil
}
})
natsFlush(t, nc2)
checkExpectedSubs(t, 1, s2)
// Create requestor on s1
nc1 := natsConnect(t, fmt.Sprintf("nats://%s:%d", o1.Host, o1.Port))
defer nc1.Close()
// Send first request, reply should be mapped
nc1.Request("foo", []byte("msg1"), time.Second)
// Wait more than the recent sub expiration (that we have set to 100ms)
time.Sleep(200 * time.Millisecond)
// Send second request (reply should not be mapped)
nc1.Request("foo", []byte("msg2"), time.Second)
select {
case e := <-errCh:
if e != nil {
t.Fatalf(e.Error())
}
case <-time.After(time.Second):
t.Fatalf("Did not get replies")
}
}

View File

@@ -1241,7 +1241,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
collect = true
}
qnames = c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, collect)
qnames = c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, collect, false)
}
// Now deal with gateways

View File

@@ -177,7 +177,6 @@ func TestNoRaceRoutedQueueAutoUnsubscribe(t *testing.T) {
nbar := atomic.LoadInt32(&rbar)
nbaz := atomic.LoadInt32(&rbaz)
if nbar == expected && nbaz == expected {
time.Sleep(500 * time.Millisecond)
return nil
}
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
@@ -329,3 +328,231 @@ func TestNoRaceSlowConsumerPendingBytes(t *testing.T) {
}
t.Fatal("Connection should have been closed")
}
func TestNoRaceGatewayNoMissingReplies(t *testing.T) {
// This test will have following setup:
//
// responder1 requestor
// | |
// v v
// [A1]<-------gw------------[B1]
// | \ |
// | \______gw__________ | route
// | _\| |
// [ ]--------gw----------->[ ]
// [A2]<-------gw------------[B2]
// [ ] [ ]
// ^
// |
// responder2
//
// There is a possible race that when the requestor creates
// a subscription on the reply subject, the subject interest
// being sent from the inbound gateway, and B1 having none,
// the SUB first goes to B2 before being sent to A1 from
// B2's inbound GW. But the request can go from B1 to A1
// right away and the responder1 connecting to A1 may send
// back the reply before the interest on the reply makes it
// to A1 (from B2).
// This test will also verify that if the responder is instead
// connected to A2, the reply is properly received by requestor
// on B1.
// For this test we want to be in interestOnly mode, so
// make it happen quickly
gatewayMaxRUnsubBeforeSwitch = 1
defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }()
// Start with setting up A2 and B2.
ob2 := testDefaultOptionsForGateway("B")
sb2 := runGatewayServer(ob2)
defer sb2.Shutdown()
oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
sa2 := runGatewayServer(oa2)
defer sa2.Shutdown()
waitForOutboundGateways(t, sa2, 1, time.Second)
waitForInboundGateways(t, sa2, 1, time.Second)
waitForOutboundGateways(t, sb2, 1, time.Second)
waitForInboundGateways(t, sb2, 1, time.Second)
// Now start A1 which will connect to B2
oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
oa1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", oa2.Cluster.Host, oa2.Cluster.Port))
sa1 := runGatewayServer(oa1)
defer sa1.Shutdown()
waitForOutboundGateways(t, sa1, 1, time.Second)
waitForInboundGateways(t, sb2, 2, time.Second)
checkClusterFormed(t, sa1, sa2)
// Finally, start B1 that will connect to A1.
ob1 := testGatewayOptionsFromToWithServers(t, "B", "A", sa1)
ob1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob2.Cluster.Host, ob2.Cluster.Port))
sb1 := runGatewayServer(ob1)
defer sb1.Shutdown()
// Check that we have the outbound gateway from B1 to A1
checkFor(t, 3*time.Second, 15*time.Millisecond, func() error {
c := sb1.getOutboundGatewayConnection("A")
if c == nil {
return fmt.Errorf("Outbound connection to A not created yet")
}
c.mu.Lock()
name := c.opts.Name
nc := c.nc
c.mu.Unlock()
if name != sa1.ID() {
// Force a disconnect
nc.Close()
return fmt.Errorf("Was unable to have B1 connect to A1")
}
return nil
})
waitForInboundGateways(t, sa1, 1, time.Second)
checkClusterFormed(t, sb1, sb2)
// Slow down GWs connections
// testSlowDownGatewayConnections(t, sa1, sa2, sb1, sb2)
// For this test, since we are using qsubs on A and B, and we
// want to make sure that it is received only on B, make the
// recent sub expiration high (especially when running on
// Travis with GOGC=10
setRecentSubExpiration := func(s *Server) {
s.mu.Lock()
s.gateway.pasi.Lock()
s.gateway.recSubExp = 10 * time.Second
s.gateway.pasi.Unlock()
s.mu.Unlock()
}
setRecentSubExpiration(sb1)
setRecentSubExpiration(sb2)
a1URL := fmt.Sprintf("nats://%s:%d", oa1.Host, oa1.Port)
a2URL := fmt.Sprintf("nats://%s:%d", oa2.Host, oa2.Port)
b1URL := fmt.Sprintf("nats://%s:%d", ob1.Host, ob1.Port)
b2URL := fmt.Sprintf("nats://%s:%d", ob2.Host, ob2.Port)
ncb1 := natsConnect(t, b1URL)
defer ncb1.Close()
ncb2 := natsConnect(t, b2URL)
defer ncb2.Close()
natsSubSync(t, ncb1, "just.a.sub")
natsSubSync(t, ncb2, "just.a.sub")
checkExpectedSubs(t, 2, sb1, sb2)
// For this test, we want A to be checking B's interest in order
// to send messages (which would cause replies to be dropped if
// there is no interest registered on A). So from A servers,
// send to various subjects and cause B's to switch to interestOnly
// mode.
nca1 := natsConnect(t, a1URL)
defer nca1.Close()
for i := 0; i < 10; i++ {
natsPub(t, nca1, fmt.Sprintf("reject.%d", i), []byte("hello"))
}
nca2 := natsConnect(t, a2URL)
defer nca2.Close()
for i := 0; i < 10; i++ {
natsPub(t, nca2, fmt.Sprintf("reject.%d", i), []byte("hello"))
}
checkSwitchedMode := func(t *testing.T, s *Server) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
var switchedMode bool
c := s.getOutboundGatewayConnection("B")
ei, _ := c.gw.outsim.Load(globalAccountName)
if ei != nil {
e := ei.(*outsie)
e.RLock()
switchedMode = e.ni == nil && e.mode == modeInterestOnly
e.RUnlock()
}
if !switchedMode {
return fmt.Errorf("Still not switched mode")
}
return nil
})
}
checkSwitchedMode(t, sa1)
checkSwitchedMode(t, sa2)
// Setup a subscriber on _INBOX.> on each of A's servers.
total := 1000
expected := int32(total)
rcvOnA := int32(0)
qrcvOnA := int32(0)
natsSub(t, nca1, "myreply.>", func(_ *nats.Msg) {
atomic.AddInt32(&rcvOnA, 1)
})
natsQueueSub(t, nca2, "myreply.>", "bar", func(_ *nats.Msg) {
atomic.AddInt32(&qrcvOnA, 1)
})
checkExpectedSubs(t, 2, sa1, sa2)
// Ok.. so now we will run the actual test where we
// create a responder on A1 and make sure that every
// single request from B1 gets the reply. Will repeat
// test with responder connected to A2.
sendReqs := func(t *testing.T, subConn *nats.Conn) {
t.Helper()
responder := natsSub(t, subConn, "foo", func(m *nats.Msg) {
nca1.Publish(m.Reply, []byte("reply"))
})
natsFlush(t, subConn)
checkExpectedSubs(t, 3, sa1, sa2)
// We are not going to use Request() because this sets
// a wildcard subscription on an INBOX and less likely
// to produce the race. Instead we will explicitly set
// the subscription on the reply subject and create one
// per request.
for i := 0; i < total/2; i++ {
reply := fmt.Sprintf("myreply.%d", i)
replySub := natsQueueSubSync(t, ncb1, reply, "bar")
natsFlush(t, ncb1)
// Let's make sure we have interest on B2.
if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 {
checkFor(t, time.Second, time.Millisecond, func() error {
if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 {
return fmt.Errorf("B still not registered interest on %s", reply)
}
return nil
})
}
natsPubReq(t, ncb1, "foo", reply, []byte("request"))
if _, err := replySub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not receive reply: %v", err)
}
natsUnsub(t, replySub)
}
responder.Unsubscribe()
natsFlush(t, subConn)
checkExpectedSubs(t, 2, sa1, sa2)
}
sendReqs(t, nca1)
sendReqs(t, nca2)
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if n := atomic.LoadInt32(&rcvOnA); n != expected {
return fmt.Errorf("Subs on A expected to get %v replies, got %v", expected, n)
}
return nil
})
// We should not have received a single message on the queue sub
// on cluster A because messages will have been delivered to
// the member on cluster B.
if n := atomic.LoadInt32(&qrcvOnA); n != 0 {
t.Fatalf("Queue sub on A should not have received message, got %v", n)
}
}

View File

@@ -299,7 +299,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) {
c.mu.Unlock()
}
}
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false, false)
}
// Helper function for routes and gateways to create qfilters need for
@@ -1038,7 +1038,12 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra
// the lock, which could cause pingTimer to think that this
// connection is stale otherwise.
c.last = time.Now()
c.flushOutbound()
if !c.flushOutbound() {
// Another go routine is flushing already and does not
// have the lock. Give it a chance to finish...
c.mu.Unlock()
c.mu.Lock()
}
if closed = c.flags.isSet(clearConnection); closed {
break
}

View File

@@ -983,6 +983,8 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
startCh := make(chan bool)
l := b.N / numPublishers
lastMsgSendOp := []byte("PUB end.test 2\r\nok\r\n")
pubLoop := func(c net.Conn, ch chan bool) {
bw := bufio.NewWriterSize(c, defaultSendBufSize)
@@ -998,7 +1000,7 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
return
}
}
if _, err := bw.Write([]byte("PUB end.test 2\r\nok\r\n")); err != nil {
if _, err := bw.Write(lastMsgSendOp); err != nil {
b.Errorf("Received error on PUB write: %v\n", err)
return
}
@@ -1011,6 +1013,7 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
// Publish Connections SPINUP
for i := 0; i < numPublishers; i++ {
c := createClientConn(b, oa.Host, oa.Port)
defer c.Close()
doDefaultConnect(b, c)
flushConnection(b, c)
ch := make(chan bool)
@@ -1019,7 +1022,18 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
<-ch
}
b.SetBytes(int64(len(sendOp) + len(msgOp)))
// To report the number of bytes:
// from publisher to server on cluster A:
numBytes := len(sendOp)
if subInterest {
// from server in cluster A to server on cluster B:
// RMSG $G foo <payload size> <payload>\r\n
numBytes += len("RMSG $G foo xxxx ") + len(payload) + 2
// From server in cluster B to sub:
numBytes += len(msgOp)
}
b.SetBytes(int64(numBytes))
b.ResetTimer()
// Closing this will start all publishers at once (roughly)
@@ -1031,51 +1045,51 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
b.StopTimer()
}
func Benchmark_Gateways___Optimistic_1kx01x0(b *testing.B) {
func Benchmark_Gateways_Optimistic_1kx01x0(b *testing.B) {
gatewaysBench(b, true, sizedString(1024), 1, false)
}
func Benchmark_Gateways___Optimistic_2kx01x0(b *testing.B) {
func Benchmark_Gateways_Optimistic_2kx01x0(b *testing.B) {
gatewaysBench(b, true, sizedString(2048), 1, false)
}
func Benchmark_Gateways___Optimistic_4kx01x0(b *testing.B) {
func Benchmark_Gateways_Optimistic_4kx01x0(b *testing.B) {
gatewaysBench(b, true, sizedString(4096), 1, false)
}
func Benchmark_Gateways___Optimistic_1kx10x0(b *testing.B) {
func Benchmark_Gateways_Optimistic_1kx10x0(b *testing.B) {
gatewaysBench(b, true, sizedString(1024), 10, false)
}
func Benchmark_Gateways___Optimistic_2kx10x0(b *testing.B) {
func Benchmark_Gateways_Optimistic_2kx10x0(b *testing.B) {
gatewaysBench(b, true, sizedString(2048), 10, false)
}
func Benchmark_Gateways___Optimistic_4kx10x0(b *testing.B) {
func Benchmark_Gateways_Optimistic_4kx10x0(b *testing.B) {
gatewaysBench(b, true, sizedString(4096), 10, false)
}
func Benchmark_Gateways___Optimistic_1kx01x1(b *testing.B) {
func Benchmark_Gateways_Optimistic_1kx01x1(b *testing.B) {
gatewaysBench(b, true, sizedString(1024), 1, true)
}
func Benchmark_Gateways___Optimistic_2kx01x1(b *testing.B) {
func Benchmark_Gateways_Optimistic_2kx01x1(b *testing.B) {
gatewaysBench(b, true, sizedString(2048), 1, true)
}
func Benchmark_Gateways___Optimistic_4kx01x1(b *testing.B) {
func Benchmark_Gateways_Optimistic_4kx01x1(b *testing.B) {
gatewaysBench(b, true, sizedString(4096), 1, true)
}
func Benchmark_Gateways___Optimistic_1kx10x1(b *testing.B) {
func Benchmark_Gateways_Optimistic_1kx10x1(b *testing.B) {
gatewaysBench(b, true, sizedString(1024), 10, true)
}
func Benchmark_Gateways___Optimistic_2kx10x1(b *testing.B) {
func Benchmark_Gateways_Optimistic_2kx10x1(b *testing.B) {
gatewaysBench(b, true, sizedString(2048), 10, true)
}
func Benchmark_Gateways___Optimistic_4kx10x1(b *testing.B) {
func Benchmark_Gateways_Optimistic_4kx10x1(b *testing.B) {
gatewaysBench(b, true, sizedString(4096), 10, true)
}
@@ -1126,3 +1140,96 @@ func Benchmark_Gateways_InterestOnly_2kx10x1(b *testing.B) {
func Benchmark_Gateways_InterestOnly_4kx10x1(b *testing.B) {
gatewaysBench(b, false, sizedString(4096), 10, true)
}
// This bench only sends the requests to verify impact of reply
// reply mapping in GW code.
func gatewaySendRequestsBench(b *testing.B, singleReplySub bool) {
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
defer server.ResetGatewaysSolicitDelay()
ob := testDefaultOptionsForGateway("B")
sb := RunServer(ob)
defer sb.Shutdown()
gwbURL, err := url.Parse(fmt.Sprintf("nats://%s:%d", ob.Gateway.Host, ob.Gateway.Port))
if err != nil {
b.Fatalf("Error parsing url: %v", err)
}
oa := testDefaultOptionsForGateway("A")
oa.Gateway.Gateways = []*server.RemoteGatewayOpts{
&server.RemoteGatewayOpts{
Name: "B",
URLs: []*url.URL{gwbURL},
},
}
sa := RunServer(oa)
defer sa.Shutdown()
sub := createClientConn(b, ob.Host, ob.Port)
defer sub.Close()
doDefaultConnect(b, sub)
sendProto(b, sub, "SUB foo 1\r\n")
flushConnection(b, sub)
lenMsg := len("MSG foo reply.xxxxxxxxxx 1 2\r\nok\r\n")
expected := b.N * lenMsg
if !singleReplySub {
expected += b.N * len("$GR.1234.")
}
ch := make(chan bool, 1)
go drainConnection(b, sub, ch, expected)
c := createClientConn(b, oa.Host, oa.Port)
defer c.Close()
doDefaultConnect(b, c)
if singleReplySub {
sendProto(b, c, "SUB reply.* 1\r\n")
}
flushConnection(b, c)
// If a single sub for replies is created, wait for more
// than the duration under which we do reply mapping
if singleReplySub {
time.Sleep(1100 * time.Millisecond)
}
bw := bufio.NewWriterSize(c, defaultSendBufSize)
// From pub to server in cluster A:
numBytes := len("PUB foo reply.0123456789 2\r\nok\r\n")
if !singleReplySub {
// Add the preceding SUB
numBytes += len("SUB reply.0123456789 0123456789\r\n")
}
// From server in cluster A to cluster B
numBytes += len("RMSG $G foo reply.0123456789 2\r\nok\r\n")
// If mapping of reply...
if !singleReplySub {
// the mapping uses about 10 more bytes. So add them
// for RMSG from server to server, and MSG to sub.
numBytes += 20
}
// From server in cluster B to sub
numBytes += lenMsg
b.SetBytes(int64(numBytes))
b.ResetTimer()
var subStr string
for i := 0; i < b.N; i++ {
if !singleReplySub {
subStr = fmt.Sprintf("SUB reply.%010d %010d\r\n", i+1, i+1)
}
bw.Write([]byte(fmt.Sprintf("%sPUB foo reply.%010d 2\r\nok\r\n", subStr, i+1)))
}
bw.Flush()
<-ch
}
func Benchmark_Gateways_Requests_CreateOneSubForAll(b *testing.B) {
gatewaySendRequestsBench(b, true)
}
func Benchmark_Gateways_Requests_CreateOneSubForEach(b *testing.B) {
gatewaySendRequestsBench(b, true)
}

View File

@@ -14,6 +14,7 @@
package test
import (
"bufio"
"bytes"
"fmt"
"net"
@@ -415,11 +416,33 @@ func TestGatewaySendAllSubs(t *testing.T) {
// switch.
for i := 0; i < 10001; i++ {
gASend(fmt.Sprintf("RMSG $G foo.%d 2\r\nok\r\n", i))
if i < 1000 {
if i <= 1000 {
gAExpect(runsubRe)
}
}
// Since B has no sub, we should get 2 INFOs with start/end
// commands.
expectNumberOfProtos(t, gAExpect, infoRe, 2)
// Expect an INFO + RS+ $G not.used + INFO
buf := bufio.NewReader(gA)
for i := 0; i < 3; i++ {
line, _, err := buf.ReadLine()
if err != nil {
t.Fatalf("Error reading: %v", err)
}
switch i {
case 0:
case 2:
if !bytes.HasPrefix(line, []byte("INFO {")) {
t.Fatalf("Expected INFO, got: %s", line)
}
case 1:
if !bytes.HasPrefix(line, []byte("RS+ ")) {
t.Fatalf("Expected RS+, got: %s", line)
}
}
}
// After this point, any new sub or unsub on B should be
// sent to A.
clientSend("SUB foo 1\r\n")
gAExpect(rsubRe)
clientSend("UNSUB 1\r\n")
gAExpect(runsubRe)
}