mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
@@ -303,7 +303,7 @@ func (a *Account) removeServiceImport(subject string) {
|
||||
delete(a.imports.services, subject)
|
||||
a.mu.Unlock()
|
||||
if a.srv != nil && a.srv.gateway.enabled {
|
||||
a.srv.gatewayHandleServiceImport(a, []byte(subject), -1)
|
||||
a.srv.gatewayHandleServiceImport(a, []byte(subject), nil, -1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -143,6 +143,13 @@ const (
|
||||
WrongGateway
|
||||
)
|
||||
|
||||
// Some flags passed to processMsgResultsEx
|
||||
const pmrNoFlag int = 0
|
||||
const (
|
||||
pmrCollectQueueNames int = 1 << iota
|
||||
pmrTreatGatewayAsClient
|
||||
)
|
||||
|
||||
type client struct {
|
||||
// Here first because of use of atomics, and memory alignment.
|
||||
stats
|
||||
@@ -2271,16 +2278,16 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
// Check for no interest, short circuit if so.
|
||||
// This is the fanout scale.
|
||||
if len(r.psubs)+len(r.qsubs) > 0 {
|
||||
var collect bool
|
||||
flag := pmrNoFlag
|
||||
// If we have queue subs in this cluster, then if we run in gateway
|
||||
// mode and the remote gateways have queue subs, then we need to
|
||||
// collect the queue groups this message was sent to so that we
|
||||
// exclude them when sending to gateways.
|
||||
if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
|
||||
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
|
||||
collect = true
|
||||
flag = pmrCollectQueueNames
|
||||
}
|
||||
qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect)
|
||||
qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, flag)
|
||||
}
|
||||
|
||||
// Now deal with gateways
|
||||
@@ -2316,7 +2323,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
// and possibly to inbound GW connections for
|
||||
// which we are in interest-only mode.
|
||||
if c.kind == CLIENT && c.srv.gateway.enabled {
|
||||
c.srv.gatewayHandleServiceImport(rm.acc, nrr, 1)
|
||||
c.srv.gatewayHandleServiceImport(rm.acc, nrr, c, 1)
|
||||
}
|
||||
}
|
||||
// FIXME(dlc) - Do L1 cache trick from above.
|
||||
@@ -2328,12 +2335,13 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
c.makeQFilter(rr.qsubs)
|
||||
}
|
||||
|
||||
sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF)
|
||||
queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs)
|
||||
// If this is not a gateway connection but gateway is enabled,
|
||||
// try to send this converted message to all gateways.
|
||||
if sendToGWs {
|
||||
if c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) {
|
||||
queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, pmrCollectQueueNames)
|
||||
c.sendMsgToGateways(rm.acc, msg, []byte(rm.to), nrr, queues)
|
||||
} else {
|
||||
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, pmrNoFlag)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2374,7 +2382,7 @@ func (c *client) addSubToRouteTargets(sub *subscription) {
|
||||
}
|
||||
|
||||
// This processes the sublist results for a given message.
|
||||
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, collect bool) [][]byte {
|
||||
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, flags int) [][]byte {
|
||||
var queues [][]byte
|
||||
// msg header for clients.
|
||||
msgh := c.msgb[1:msgHeadProtoLen]
|
||||
@@ -2435,7 +2443,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
// leaf nodes or routes even if there are no queue filters since we collect
|
||||
// them above and do not process inline like normal clients.
|
||||
if c.kind != CLIENT && qf == nil {
|
||||
goto sendToRoutesOrLeafs
|
||||
// However, if this is a gateway connection which should be treated
|
||||
// as a client, still go and pick queue subscriptions, otherwise
|
||||
// jump to sendToRoutesOrLeafs.
|
||||
if !(c.kind == GATEWAY && (flags&pmrTreatGatewayAsClient != 0)) {
|
||||
goto sendToRoutesOrLeafs
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if we have our own rand yet. Global rand
|
||||
@@ -2487,7 +2500,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
continue
|
||||
} else {
|
||||
c.addSubToRouteTargets(sub)
|
||||
if collect {
|
||||
if flags&pmrCollectQueueNames != 0 {
|
||||
queues = append(queues, sub.queue)
|
||||
}
|
||||
}
|
||||
@@ -2508,7 +2521,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
if c.deliverMsg(sub, mh, msg) {
|
||||
// Clear rsub
|
||||
rsub = nil
|
||||
if collect {
|
||||
if flags&pmrCollectQueueNames != 0 {
|
||||
queues = append(queues, sub.queue)
|
||||
}
|
||||
break
|
||||
@@ -2519,7 +2532,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
// If we are here we tried to deliver to a local qsub
|
||||
// but failed. So we will send it to a remote or leaf node.
|
||||
c.addSubToRouteTargets(rsub)
|
||||
if collect {
|
||||
if flags&pmrCollectQueueNames != 0 {
|
||||
queues = append(queues, rsub.queue)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -32,7 +33,10 @@ const (
|
||||
defaultSolicitGatewaysDelay = time.Second
|
||||
defaultGatewayConnectDelay = time.Second
|
||||
defaultGatewayReconnectDelay = time.Second
|
||||
defaultGatewayRecentSubExpiration = 5 * time.Second
|
||||
defaultGatewayMaxRUnsubBeforeSwitch = 1000
|
||||
gwReplyPrefix = "$GR."
|
||||
gwReplyStart = len(gwReplyPrefix) + 5 // len of prefix above + len of hash (4) + "."
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -93,6 +97,7 @@ type srvGateway struct {
|
||||
info *Info // Gateway Info protocol
|
||||
infoJSON []byte // Marshal'ed Info protocol
|
||||
runknown bool // Rejects unknown (not configured) gateway connections
|
||||
replyPfx []byte // Will be "$GR.<this cluster name hash>."
|
||||
|
||||
// We maintain the interest of subjects and queues per account.
|
||||
// For a given account, entries in the map could be something like this:
|
||||
@@ -110,8 +115,12 @@ type srvGateway struct {
|
||||
m map[string]map[string]*sitally
|
||||
}
|
||||
|
||||
resolver netResolver // Used to resolve host name before calling net.Dial()
|
||||
sqbsz int // Max buffer size to send queue subs protocol. Used for testing.
|
||||
// This is to track recent subscriptions for a given connection
|
||||
rsubs sync.Map
|
||||
|
||||
resolver netResolver // Used to resolve host name before calling net.Dial()
|
||||
sqbsz int // Max buffer size to send queue subs protocol. Used for testing.
|
||||
recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply
|
||||
}
|
||||
|
||||
// Subject interest tally. Also indicates if the key in the map is a
|
||||
@@ -124,6 +133,7 @@ type sitally struct {
|
||||
type gatewayCfg struct {
|
||||
sync.RWMutex
|
||||
*RemoteGatewayOpts
|
||||
replyPfx []byte
|
||||
urls map[string]*url.URL
|
||||
connAttempts int
|
||||
implicit bool
|
||||
@@ -218,6 +228,19 @@ func validateGatewayOptions(o *Options) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Computes a hash of 4 characters for the given gateway name.
|
||||
// This will be used for routing of replies.
|
||||
func getReplyPrefixForGateway(name string) []byte {
|
||||
sha := sha256.New()
|
||||
sha.Write([]byte(name))
|
||||
fullHash := []byte(fmt.Sprintf("%x", sha.Sum(nil)))
|
||||
prefix := make([]byte, 0, len(gwReplyPrefix)+5)
|
||||
prefix = append(prefix, gwReplyPrefix...)
|
||||
prefix = append(prefix, fullHash[:4]...)
|
||||
prefix = append(prefix, '.')
|
||||
return prefix
|
||||
}
|
||||
|
||||
// Initialize the s.gateway structure. We do this even if the server
|
||||
// does not have a gateway configured. In some part of the code, the
|
||||
// server will check the number of outbound gateways, etc.. and so
|
||||
@@ -232,6 +255,7 @@ func newGateway(opts *Options) (*srvGateway, error) {
|
||||
URLs: make(map[string]struct{}),
|
||||
resolver: opts.Gateway.resolver,
|
||||
runknown: opts.Gateway.RejectUnknown,
|
||||
replyPfx: getReplyPrefixForGateway(opts.Gateway.Name),
|
||||
}
|
||||
gateway.Lock()
|
||||
defer gateway.Unlock()
|
||||
@@ -250,6 +274,7 @@ func newGateway(opts *Options) (*srvGateway, error) {
|
||||
}
|
||||
cfg := &gatewayCfg{
|
||||
RemoteGatewayOpts: rgo.clone(),
|
||||
replyPfx: getReplyPrefixForGateway(rgo.Name),
|
||||
urls: make(map[string]*url.URL, len(rgo.URLs)),
|
||||
}
|
||||
if opts.Gateway.TLSConfig != nil && cfg.TLSConfig == nil {
|
||||
@@ -270,6 +295,7 @@ func newGateway(opts *Options) (*srvGateway, error) {
|
||||
if gateway.sqbsz == 0 {
|
||||
gateway.sqbsz = maxBufSize
|
||||
}
|
||||
gateway.recSubExp = defaultGatewayRecentSubExpiration
|
||||
|
||||
gateway.enabled = opts.Gateway.Name != "" && opts.Gateway.Port != 0
|
||||
return gateway, nil
|
||||
@@ -1029,9 +1055,11 @@ func (s *Server) sendSubsToGateway(c *client, accountName []byte) {
|
||||
// Instruct to send all subs (RS+/-) for this account from now on.
|
||||
c.mu.Lock()
|
||||
e := c.gw.insim[string(accountName)]
|
||||
if e != nil {
|
||||
e.mode = modeInterestOnly
|
||||
if e == nil {
|
||||
e = &insie{}
|
||||
c.gw.insim[string(accountName)] = e
|
||||
}
|
||||
e.mode = modeInterestOnly
|
||||
c.mu.Unlock()
|
||||
} else {
|
||||
// Send queues for all accounts
|
||||
@@ -1173,6 +1201,7 @@ func (s *Server) processImplicitGateway(info *Info) {
|
||||
opts := s.getOpts()
|
||||
cfg = &gatewayCfg{
|
||||
RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName},
|
||||
replyPfx: getReplyPrefixForGateway(gwName),
|
||||
urls: make(map[string]*url.URL, len(info.GatewayURLs)),
|
||||
implicit: true,
|
||||
}
|
||||
@@ -1901,7 +1930,7 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription
|
||||
}
|
||||
}
|
||||
if proto != nil {
|
||||
c.sendProto(proto, true)
|
||||
c.sendProto(proto, false)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
@@ -1947,7 +1976,7 @@ func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscriptio
|
||||
proto = append(proto, CR_LF...)
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.sendProto(proto, true)
|
||||
c.sendProto(proto, false)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
@@ -2014,6 +2043,30 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
|
||||
}
|
||||
}
|
||||
if first || last {
|
||||
if sub.client != nil {
|
||||
rsubs := &s.gateway.rsubs
|
||||
c := sub.client
|
||||
sli, _ := rsubs.Load(c)
|
||||
if first {
|
||||
var sl *Sublist
|
||||
if sli == nil {
|
||||
sl = NewSublistNoCache()
|
||||
rsubs.Store(c, sl)
|
||||
} else {
|
||||
sl = sli.(*Sublist)
|
||||
}
|
||||
sl.Insert(sub)
|
||||
time.AfterFunc(s.gateway.recSubExp, func() {
|
||||
sl.Remove(sub)
|
||||
})
|
||||
} else if sli != nil {
|
||||
sl := sli.(*Sublist)
|
||||
sl.Remove(sub)
|
||||
if sl.Count() == 0 {
|
||||
rsubs.Delete(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
if entry.q {
|
||||
s.sendQueueSubOrUnsubToGateways(accName, sub, first)
|
||||
} else {
|
||||
@@ -2022,6 +2075,35 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if the given subject starts with `$GR.`
|
||||
func subjectStartsWithGatewayReplyPrefix(subj []byte) bool {
|
||||
return len(subj) > gwReplyStart && string(subj[:len(gwReplyPrefix)]) == gwReplyPrefix
|
||||
}
|
||||
|
||||
// Evaluates if the given reply should be mapped (adding the origin cluster
|
||||
// hash as a prefix) or not.
|
||||
func (g *srvGateway) shouldMapReplyForGatewaySend(c *client, reply []byte) bool {
|
||||
sli, _ := g.rsubs.Load(c)
|
||||
if sli == nil {
|
||||
return false
|
||||
}
|
||||
sl := sli.(*Sublist)
|
||||
if sl.Count() == 0 {
|
||||
return false
|
||||
}
|
||||
if subjectStartsWithGatewayReplyPrefix(reply) {
|
||||
return false
|
||||
}
|
||||
r := sl.Match(string(reply))
|
||||
return len(r.psubs)+len(r.qsubs) > 0
|
||||
}
|
||||
|
||||
var subPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &subscription{}
|
||||
},
|
||||
}
|
||||
|
||||
// May send a message to all outbound gateways. It is possible
|
||||
// that the message is not sent to a given gateway if for instance
|
||||
// it is known that this gateway has no interest in the account or
|
||||
@@ -2038,46 +2120,84 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
|
||||
for i := 0; i < len(gw.outo); i++ {
|
||||
gws = append(gws, gw.outo[i])
|
||||
}
|
||||
thisClusterReplyPrefix := gw.replyPfx
|
||||
gw.RUnlock()
|
||||
if len(gws) == 0 {
|
||||
return
|
||||
}
|
||||
var (
|
||||
subj = string(subject)
|
||||
queuesa = [512]byte{}
|
||||
queues = queuesa[:0]
|
||||
accName = acc.Name
|
||||
subj = string(subject)
|
||||
queuesa = [512]byte{}
|
||||
queues = queuesa[:0]
|
||||
accName = acc.Name
|
||||
mreplya [256]byte
|
||||
mreply []byte
|
||||
dstPfx []byte
|
||||
checkReply = reply != nil
|
||||
)
|
||||
|
||||
// Get a subscription from the pool
|
||||
sub := subPool.Get().(*subscription)
|
||||
|
||||
// Check if the subject is on "$GR.<cluster hash>.",
|
||||
// and if so, send to that GW regardless of its
|
||||
// interest on the real subject (that is, skip the
|
||||
// check of subject interest).
|
||||
if subjectStartsWithGatewayReplyPrefix(subject) {
|
||||
dstPfx = subject[:gwReplyStart]
|
||||
}
|
||||
for i := 0; i < len(gws); i++ {
|
||||
gwc := gws[i]
|
||||
// Plain sub interest and queue sub results for this account/subject
|
||||
psi, qr := gwc.gatewayInterest(accName, subj)
|
||||
if !psi && qr == nil {
|
||||
continue
|
||||
}
|
||||
queues = queuesa[:0]
|
||||
if qr != nil {
|
||||
for i := 0; i < len(qr.qsubs); i++ {
|
||||
qsubs := qr.qsubs[i]
|
||||
if len(qsubs) > 0 {
|
||||
queue := qsubs[0].queue
|
||||
add := true
|
||||
for _, qn := range qgroups {
|
||||
if bytes.Equal(queue, qn) {
|
||||
add = false
|
||||
break
|
||||
if dstPfx != nil {
|
||||
gwc.mu.Lock()
|
||||
ok := gwc.gw.cfg != nil && bytes.Equal(dstPfx, gwc.gw.cfg.replyPfx)
|
||||
gwc.mu.Unlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// Plain sub interest and queue sub results for this account/subject
|
||||
psi, qr := gwc.gatewayInterest(accName, subj)
|
||||
if !psi && qr == nil {
|
||||
continue
|
||||
}
|
||||
queues = queuesa[:0]
|
||||
if qr != nil {
|
||||
for i := 0; i < len(qr.qsubs); i++ {
|
||||
qsubs := qr.qsubs[i]
|
||||
if len(qsubs) > 0 {
|
||||
queue := qsubs[0].queue
|
||||
add := true
|
||||
for _, qn := range qgroups {
|
||||
if bytes.Equal(queue, qn) {
|
||||
add = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if add {
|
||||
qgroups = append(qgroups, queue)
|
||||
queues = append(queues, queue...)
|
||||
queues = append(queues, ' ')
|
||||
}
|
||||
}
|
||||
if add {
|
||||
qgroups = append(qgroups, queue)
|
||||
queues = append(queues, queue...)
|
||||
queues = append(queues, ' ')
|
||||
}
|
||||
}
|
||||
}
|
||||
if !psi && len(queues) == 0 {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !psi && len(queues) == 0 {
|
||||
continue
|
||||
if checkReply {
|
||||
// Check/map only once
|
||||
checkReply = false
|
||||
// Assume we will use original
|
||||
mreply = reply
|
||||
// If there was a recent matching subscription on that connection
|
||||
// and the reply is not already mapped, then map (add prefix).
|
||||
if gw.shouldMapReplyForGatewaySend(c, reply) {
|
||||
mreply = mreplya[:0]
|
||||
mreply = append(mreply, thisClusterReplyPrefix...)
|
||||
mreply = append(mreply, reply...)
|
||||
}
|
||||
}
|
||||
mh := c.msgb[:msgHeadProtoLen]
|
||||
mh = append(mh, accName...)
|
||||
@@ -2087,29 +2207,37 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
|
||||
if len(queues) > 0 {
|
||||
if reply != nil {
|
||||
mh = append(mh, "+ "...) // Signal that there is a reply.
|
||||
mh = append(mh, reply...)
|
||||
mh = append(mh, mreply...)
|
||||
mh = append(mh, ' ')
|
||||
} else {
|
||||
mh = append(mh, "| "...) // Only queues
|
||||
}
|
||||
mh = append(mh, queues...)
|
||||
} else if reply != nil {
|
||||
mh = append(mh, reply...)
|
||||
mh = append(mh, mreply...)
|
||||
mh = append(mh, ' ')
|
||||
}
|
||||
mh = append(mh, c.pa.szb...)
|
||||
mh = append(mh, CR_LF...)
|
||||
sub := subscription{client: gwc, subject: c.pa.subject}
|
||||
c.deliverMsg(&sub, mh, msg)
|
||||
|
||||
// We reuse the subscription object that we pass to deliverMsg.
|
||||
// So set/reset important fields.
|
||||
sub.nm, sub.max = 0, 0
|
||||
sub.client = gwc
|
||||
sub.subject = c.pa.subject
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
}
|
||||
// Done with subscription, put back to pool. We don't need
|
||||
// to reset content since we explicitly set when using it.
|
||||
subPool.Put(sub)
|
||||
}
|
||||
|
||||
func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change int32) {
|
||||
func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, c *client, change int32) {
|
||||
sid := make([]byte, 0, len(acc.Name)+len(subject)+1)
|
||||
sid = append(sid, acc.Name...)
|
||||
sid = append(sid, ' ')
|
||||
sid = append(sid, subject...)
|
||||
sub := &subscription{subject: subject, sid: sid}
|
||||
sub := &subscription{client: c, subject: subject, sid: sid}
|
||||
|
||||
var rspa [1024]byte
|
||||
rsproto := rspa[:0]
|
||||
@@ -2125,7 +2253,7 @@ func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change
|
||||
s.mu.Lock()
|
||||
for _, r := range s.routes {
|
||||
r.mu.Lock()
|
||||
r.sendProto(rsproto, true)
|
||||
r.sendProto(rsproto, false)
|
||||
if r.trace {
|
||||
r.traceOutOp("", rsproto[:len(rsproto)-LEN_CR_LF])
|
||||
}
|
||||
@@ -2161,7 +2289,8 @@ func (s *Server) gatewayHandleAccountNoInterest(c *client, accName []byte) {
|
||||
func (c *client) sendAccountUnsubToGateway(accName []byte) {
|
||||
// Check if we have sent the A- or not.
|
||||
c.mu.Lock()
|
||||
if _, sent := c.gw.insim[string(accName)]; !sent {
|
||||
e, sent := c.gw.insim[string(accName)]
|
||||
if e != nil || !sent {
|
||||
// Add a nil value to indicate that we have sent an A-
|
||||
// so that we know to send A+ when needed.
|
||||
c.gw.insim[string(accName)] = nil
|
||||
@@ -2170,7 +2299,7 @@ func (c *client) sendAccountUnsubToGateway(accName []byte) {
|
||||
proto = append(proto, aUnsubBytes...)
|
||||
proto = append(proto, accName...)
|
||||
proto = append(proto, CR_LF...)
|
||||
c.sendProto(proto, true)
|
||||
c.sendProto(proto, false)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
@@ -2226,7 +2355,7 @@ func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName
|
||||
proto = append(proto, ' ')
|
||||
proto = append(proto, subject...)
|
||||
proto = append(proto, CR_LF...)
|
||||
c.sendProto(proto, true)
|
||||
c.sendProto(proto, false)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
@@ -2238,6 +2367,17 @@ func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName
|
||||
}
|
||||
}
|
||||
|
||||
func (g *srvGateway) getReplyPrefix() []byte {
|
||||
g.RLock()
|
||||
replyPfx := g.replyPfx
|
||||
g.RUnlock()
|
||||
return replyPfx
|
||||
}
|
||||
|
||||
func (s *Server) isGatewayReplyForThisCluster(subj []byte) bool {
|
||||
return string(s.gateway.getReplyPrefix()) == string(subj[:gwReplyStart])
|
||||
}
|
||||
|
||||
// Process a message coming from a remote gateway. Send to any sub/qsub
|
||||
// in our cluster that is matching. When receiving a message for an
|
||||
// account or subject for which there is no interest in this cluster
|
||||
@@ -2262,6 +2402,34 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
// If we receive a message on $GR.<cluster>.<subj>
|
||||
// we will drop the prefix before processing interest
|
||||
// in this cluster, but we also need to resend to
|
||||
// other gateways.
|
||||
sendBackToGateways := false
|
||||
|
||||
// First thing to do is to check if the subject starts
|
||||
// with "$GR.<hash>.".
|
||||
if subjectStartsWithGatewayReplyPrefix(c.pa.subject) {
|
||||
// If it does, then is this server/cluster the actual
|
||||
// destination for this message?
|
||||
if !c.srv.isGatewayReplyForThisCluster(c.pa.subject) {
|
||||
// We could report, for now, just drop.
|
||||
return
|
||||
}
|
||||
// Adjust the subject to past the prefix
|
||||
c.pa.subject = c.pa.subject[gwReplyStart:]
|
||||
// Use a stack buffer to rewrite c.pa.cache since we
|
||||
// only need it for getAccAndResultFromCache()
|
||||
var _pacache [256]byte
|
||||
pacache := _pacache[:0]
|
||||
pacache = append(pacache, c.pa.account...)
|
||||
pacache = append(pacache, ' ')
|
||||
pacache = append(pacache, c.pa.subject...)
|
||||
c.pa.pacache = pacache
|
||||
sendBackToGateways = true
|
||||
}
|
||||
|
||||
acc, r := c.getAccAndResultFromCache()
|
||||
if acc == nil {
|
||||
c.Debugf("Unknown account %q for gateway message on subject: %q", c.pa.account, c.pa.subject)
|
||||
@@ -2277,19 +2445,28 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
|
||||
c.checkForImportServices(acc, msg)
|
||||
}
|
||||
|
||||
// If there is no interest on plain subs, possibly send an RS-,
|
||||
// even if there is qsubs interest.
|
||||
if len(r.psubs) == 0 {
|
||||
c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject)
|
||||
if !sendBackToGateways {
|
||||
// If there is no interest on plain subs, possibly send an RS-,
|
||||
// even if there is qsubs interest.
|
||||
if len(r.psubs) == 0 {
|
||||
c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject)
|
||||
|
||||
// If there is also no queue filter, then no point in continuing
|
||||
// (even if r.qsubs i > 0).
|
||||
if len(c.pa.queues) == 0 {
|
||||
return
|
||||
// If there is also no queue filter, then no point in continuing
|
||||
// (even if r.qsubs i > 0).
|
||||
if len(c.pa.queues) == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, pmrNoFlag)
|
||||
} else {
|
||||
// We normally would not allow sending to a queue unless the
|
||||
// RMSG contains the queue groups, however, if the incoming
|
||||
// message was a "$GR." then we need to act as if this was
|
||||
// a CLIENT connection..
|
||||
qnames := c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply,
|
||||
pmrCollectQueueNames|pmrTreatGatewayAsClient)
|
||||
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames)
|
||||
}
|
||||
|
||||
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
|
||||
}
|
||||
|
||||
// Indicates that the remote which we are sending messages to
|
||||
|
||||
@@ -1406,6 +1406,59 @@ func TestGatewayAccountInterest(t *testing.T) {
|
||||
checkCount(t, gwcc, 1)
|
||||
}
|
||||
|
||||
func TestGatewayAccountUnsub(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
// Connect on B
|
||||
ncb := natsConnect(t, fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port))
|
||||
defer ncb.Close()
|
||||
// Create subscription
|
||||
natsSub(t, ncb, "foo", func(m *nats.Msg) {
|
||||
ncb.Publish(m.Reply, []byte("reply"))
|
||||
})
|
||||
|
||||
// Connect on A
|
||||
nca := natsConnect(t, fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port))
|
||||
defer nca.Close()
|
||||
// Send a request
|
||||
if _, err := nca.Request("foo", []byte("req"), time.Second); err != nil {
|
||||
t.Fatalf("Error getting reply: %v", err)
|
||||
}
|
||||
|
||||
// Now close connection on B
|
||||
ncb.Close()
|
||||
|
||||
// Publish lots of messages on "foo" from A.
|
||||
// We should receive an A- shortly and the number
|
||||
// of outbound messages from A to B should not be
|
||||
// close to the number of messages sent here.
|
||||
total := 5000
|
||||
for i := 0; i < total; i++ {
|
||||
natsPub(t, nca, "foo", []byte("hello"))
|
||||
}
|
||||
natsFlush(t, nca)
|
||||
|
||||
c := sa.getOutboundGatewayConnection("B")
|
||||
c.mu.Lock()
|
||||
out := c.outMsgs
|
||||
c.mu.Unlock()
|
||||
|
||||
if out >= int64(80*total)/100 {
|
||||
t.Fatalf("Unexpected number of messages sent from A to B: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewaySubjectInterest(t *testing.T) {
|
||||
o1 := testDefaultOptionsForGateway("A")
|
||||
setAccountUserPassInOptions(o1, "$foo", "ivan", "password")
|
||||
@@ -2869,18 +2922,16 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
waitForInboundGateways(t, sb, 2, time.Second)
|
||||
waitForInboundGateways(t, sc, 2, time.Second)
|
||||
|
||||
// On B, create a sub that will reply to requests
|
||||
bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
|
||||
ncB := natsConnect(t, bURL)
|
||||
defer ncB.Close()
|
||||
natsSub(t, ncB, "foo", func(m *nats.Msg) {
|
||||
ncB.Publish(m.Reply, m.Data)
|
||||
})
|
||||
natsFlush(t, ncB)
|
||||
checkExpectedSubs(t, 1, sb)
|
||||
// On A, create a sub to register some interest
|
||||
aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
|
||||
ncA := natsConnect(t, aURL)
|
||||
defer ncA.Close()
|
||||
natsSub(t, ncA, "sub.on.a.*", func(m *nats.Msg) {})
|
||||
natsFlush(t, ncA)
|
||||
checkExpectedSubs(t, 1, sa)
|
||||
|
||||
// On C, have some delayed activity while it receives
|
||||
// unwanted messages and switches to sendAllSubs.
|
||||
// On C, have some sub activity while it receives
|
||||
// unwanted messages and switches to interestOnly mode.
|
||||
cURL := fmt.Sprintf("nats://%s:%d", oc.Host, oc.Port)
|
||||
ncC := natsConnect(t, cURL)
|
||||
defer ncC.Close()
|
||||
@@ -2910,6 +2961,11 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
// From B publish on subjects for which C has an interest
|
||||
bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
|
||||
ncB := natsConnect(t, bURL)
|
||||
defer ncB.Close()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
@@ -2925,22 +2981,15 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
// From A, send a lot of requests.
|
||||
aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
|
||||
ncA := natsConnect(t, aURL)
|
||||
defer ncA.Close()
|
||||
// From B, send a lot of messages that A is interested in,
|
||||
// but not C.
|
||||
// TODO(ik): May need to change that if we change the threshold
|
||||
// for when the switch happens.
|
||||
total := 300
|
||||
for i := 0; i < total; i++ {
|
||||
req := fmt.Sprintf("%d", i)
|
||||
reply, err := ncA.Request("foo", []byte(req), time.Second)
|
||||
if err != nil {
|
||||
if err := ncB.Publish(fmt.Sprintf("sub.on.a.%d", i), []byte("hi")); err != nil {
|
||||
t.Fatalf("Error waiting for reply: %v", err)
|
||||
}
|
||||
if string(reply.Data) != req {
|
||||
t.Fatalf("Expected reply %q, got %q", req, reply.Data)
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
|
||||
@@ -2994,17 +3043,6 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
|
||||
for i := 0; i < total; i++ {
|
||||
req := fmt.Sprintf("%d", i)
|
||||
reply, err := ncA.Request("foo", []byte(req), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error waiting for reply: %v", err)
|
||||
}
|
||||
if string(reply.Data) != req {
|
||||
t.Fatalf("Expected reply %q, got %q", req, reply.Data)
|
||||
}
|
||||
}
|
||||
|
||||
// Also, after all that, if a sub is created on C, it should
|
||||
// be sent to B (but not A). Check that this is the case.
|
||||
// So first send from A on the subject that we are going to
|
||||
@@ -3216,7 +3254,7 @@ func TestGatewayServiceImport(t *testing.T) {
|
||||
subB := natsSubSync(t, clientB, "reply")
|
||||
natsFlush(t, clientB)
|
||||
|
||||
for i := 0; i < 1; i++ {
|
||||
for i := 0; i < 2; i++ {
|
||||
// Send the request from clientB on foo.request,
|
||||
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
|
||||
natsFlush(t, clientB)
|
||||
@@ -3261,10 +3299,14 @@ func TestGatewayServiceImport(t *testing.T) {
|
||||
t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs)
|
||||
}
|
||||
|
||||
// For B, we expect it to send on the two subjects: test.request and foo.request
|
||||
// then send the reply (MSG + RMSG).
|
||||
if i == 0 {
|
||||
expected = 3
|
||||
expected = 4
|
||||
} else {
|
||||
expected = 5
|
||||
// The second time, one of the account will be suppressed, so we will get
|
||||
// only 2 more messages.
|
||||
expected = 6
|
||||
}
|
||||
vz, _ = sb.Varz(nil)
|
||||
if vz.OutMsgs != expected {
|
||||
@@ -3520,7 +3562,7 @@ func TestGatewayServiceImportWithQueue(t *testing.T) {
|
||||
// on server B.
|
||||
checkForRegisteredQSubInterest(t, sb, "A", "$foo", "test.request", 1, time.Second)
|
||||
|
||||
for i := 0; i < 1; i++ {
|
||||
for i := 0; i < 2; i++ {
|
||||
// Send the request from clientB on foo.request,
|
||||
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
|
||||
natsFlush(t, clientB)
|
||||
@@ -3557,16 +3599,20 @@ func TestGatewayServiceImportWithQueue(t *testing.T) {
|
||||
t.Fatalf("Unexpected msg: %v", msg)
|
||||
}
|
||||
|
||||
expected := int64(i + 2)
|
||||
expected := int64((i + 1) * 2)
|
||||
vz, _ := sa.Varz(nil)
|
||||
if vz.OutMsgs != expected {
|
||||
t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs)
|
||||
}
|
||||
|
||||
// For B, we expect it to send on the two subjects: test.request and foo.request
|
||||
// then send the reply (MSG + RMSG).
|
||||
if i == 0 {
|
||||
expected = 3
|
||||
} else {
|
||||
expected = 4
|
||||
} else {
|
||||
// The second time, one of the account will be suppressed, so we will get
|
||||
// only 2 more messages.
|
||||
expected = 6
|
||||
}
|
||||
vz, _ = sb.Varz(nil)
|
||||
if vz.OutMsgs != expected {
|
||||
@@ -4476,3 +4522,67 @@ func TestGatewayMemUsage(t *testing.T) {
|
||||
s.Shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayMapReplyOnlyForRecentSub(t *testing.T) {
|
||||
o2 := testDefaultOptionsForGateway("B")
|
||||
s2 := runGatewayServer(o2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
o1 := testGatewayOptionsFromToWithServers(t, "A", "B", s2)
|
||||
s1 := runGatewayServer(o1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, s1, 1, time.Second)
|
||||
waitForOutboundGateways(t, s2, 1, time.Second)
|
||||
|
||||
// Change s1's recent sub expiration default value
|
||||
s1.mu.Lock()
|
||||
s1.gateway.pasi.Lock()
|
||||
s1.gateway.recSubExp = 100 * time.Millisecond
|
||||
s1.gateway.pasi.Unlock()
|
||||
s1.mu.Unlock()
|
||||
|
||||
// Setup a replier on s2
|
||||
nc2 := natsConnect(t, fmt.Sprintf("nats://%s:%d", o2.Host, o2.Port))
|
||||
defer nc2.Close()
|
||||
count := 0
|
||||
errCh := make(chan error, 1)
|
||||
natsSub(t, nc2, "foo", func(m *nats.Msg) {
|
||||
// Send reply regardless..
|
||||
nc2.Publish(m.Reply, []byte("reply"))
|
||||
if count == 0 {
|
||||
if strings.HasPrefix(m.Reply, nats.InboxPrefix) {
|
||||
errCh <- fmt.Errorf("First reply expected to have a special prefix, got %v", m.Reply)
|
||||
return
|
||||
}
|
||||
count++
|
||||
} else {
|
||||
if !strings.HasPrefix(m.Reply, nats.InboxPrefix) {
|
||||
errCh <- fmt.Errorf("Second reply expected to have normal inbox, got %v", m.Reply)
|
||||
return
|
||||
}
|
||||
errCh <- nil
|
||||
}
|
||||
})
|
||||
natsFlush(t, nc2)
|
||||
checkExpectedSubs(t, 1, s2)
|
||||
|
||||
// Create requestor on s1
|
||||
nc1 := natsConnect(t, fmt.Sprintf("nats://%s:%d", o1.Host, o1.Port))
|
||||
defer nc1.Close()
|
||||
// Send first request, reply should be mapped
|
||||
nc1.Request("foo", []byte("msg1"), time.Second)
|
||||
// Wait more than the recent sub expiration (that we have set to 100ms)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// Send second request (reply should not be mapped)
|
||||
nc1.Request("foo", []byte("msg2"), time.Second)
|
||||
|
||||
select {
|
||||
case e := <-errCh:
|
||||
if e != nil {
|
||||
t.Fatalf(e.Error())
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("Did not get replies")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1232,16 +1232,16 @@ func (c *client) processInboundLeafMsg(msg []byte) {
|
||||
// Check for no interest, short circuit if so.
|
||||
// This is the fanout scale.
|
||||
if len(r.psubs)+len(r.qsubs) > 0 {
|
||||
var collect bool
|
||||
flag := pmrNoFlag
|
||||
// If we have queue subs in this cluster, then if we run in gateway
|
||||
// mode and the remote gateways have queue subs, then we need to
|
||||
// collect the queue groups this message was sent to so that we
|
||||
// exclude them when sending to gateways.
|
||||
if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
|
||||
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
|
||||
collect = true
|
||||
flag = pmrCollectQueueNames
|
||||
}
|
||||
qnames = c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, collect)
|
||||
qnames = c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, flag)
|
||||
}
|
||||
|
||||
// Now deal with gateways
|
||||
|
||||
@@ -177,7 +177,6 @@ func TestNoRaceRoutedQueueAutoUnsubscribe(t *testing.T) {
|
||||
nbar := atomic.LoadInt32(&rbar)
|
||||
nbaz := atomic.LoadInt32(&rbaz)
|
||||
if nbar == expected && nbaz == expected {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
|
||||
@@ -329,3 +328,231 @@ func TestNoRaceSlowConsumerPendingBytes(t *testing.T) {
|
||||
}
|
||||
t.Fatal("Connection should have been closed")
|
||||
}
|
||||
|
||||
func TestNoRaceGatewayNoMissingReplies(t *testing.T) {
|
||||
// This test will have following setup:
|
||||
//
|
||||
// responder1 requestor
|
||||
// | |
|
||||
// v v
|
||||
// [A1]<-------gw------------[B1]
|
||||
// | \ |
|
||||
// | \______gw__________ | route
|
||||
// | _\| |
|
||||
// [ ]--------gw----------->[ ]
|
||||
// [A2]<-------gw------------[B2]
|
||||
// [ ] [ ]
|
||||
// ^
|
||||
// |
|
||||
// responder2
|
||||
//
|
||||
// There is a possible race that when the requestor creates
|
||||
// a subscription on the reply subject, the subject interest
|
||||
// being sent from the inbound gateway, and B1 having none,
|
||||
// the SUB first goes to B2 before being sent to A1 from
|
||||
// B2's inbound GW. But the request can go from B1 to A1
|
||||
// right away and the responder1 connecting to A1 may send
|
||||
// back the reply before the interest on the reply makes it
|
||||
// to A1 (from B2).
|
||||
// This test will also verify that if the responder is instead
|
||||
// connected to A2, the reply is properly received by requestor
|
||||
// on B1.
|
||||
|
||||
// For this test we want to be in interestOnly mode, so
|
||||
// make it happen quickly
|
||||
gatewayMaxRUnsubBeforeSwitch = 1
|
||||
defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }()
|
||||
|
||||
// Start with setting up A2 and B2.
|
||||
ob2 := testDefaultOptionsForGateway("B")
|
||||
sb2 := runGatewayServer(ob2)
|
||||
defer sb2.Shutdown()
|
||||
|
||||
oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
|
||||
sa2 := runGatewayServer(oa2)
|
||||
defer sa2.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa2, 1, time.Second)
|
||||
waitForInboundGateways(t, sa2, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb2, 1, time.Second)
|
||||
waitForInboundGateways(t, sb2, 1, time.Second)
|
||||
|
||||
// Now start A1 which will connect to B2
|
||||
oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
|
||||
oa1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", oa2.Cluster.Host, oa2.Cluster.Port))
|
||||
sa1 := runGatewayServer(oa1)
|
||||
defer sa1.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa1, 1, time.Second)
|
||||
waitForInboundGateways(t, sb2, 2, time.Second)
|
||||
|
||||
checkClusterFormed(t, sa1, sa2)
|
||||
|
||||
// Finally, start B1 that will connect to A1.
|
||||
ob1 := testGatewayOptionsFromToWithServers(t, "B", "A", sa1)
|
||||
ob1.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob2.Cluster.Host, ob2.Cluster.Port))
|
||||
sb1 := runGatewayServer(ob1)
|
||||
defer sb1.Shutdown()
|
||||
|
||||
// Check that we have the outbound gateway from B1 to A1
|
||||
checkFor(t, 3*time.Second, 15*time.Millisecond, func() error {
|
||||
c := sb1.getOutboundGatewayConnection("A")
|
||||
if c == nil {
|
||||
return fmt.Errorf("Outbound connection to A not created yet")
|
||||
}
|
||||
c.mu.Lock()
|
||||
name := c.opts.Name
|
||||
nc := c.nc
|
||||
c.mu.Unlock()
|
||||
if name != sa1.ID() {
|
||||
// Force a disconnect
|
||||
nc.Close()
|
||||
return fmt.Errorf("Was unable to have B1 connect to A1")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
waitForInboundGateways(t, sa1, 1, time.Second)
|
||||
checkClusterFormed(t, sb1, sb2)
|
||||
|
||||
// Slow down GWs connections
|
||||
// testSlowDownGatewayConnections(t, sa1, sa2, sb1, sb2)
|
||||
|
||||
// For this test, since we are using qsubs on A and B, and we
|
||||
// want to make sure that it is received only on B, make the
|
||||
// recent sub expiration high (especially when running on
|
||||
// Travis with GOGC=10
|
||||
setRecentSubExpiration := func(s *Server) {
|
||||
s.mu.Lock()
|
||||
s.gateway.pasi.Lock()
|
||||
s.gateway.recSubExp = 10 * time.Second
|
||||
s.gateway.pasi.Unlock()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
setRecentSubExpiration(sb1)
|
||||
setRecentSubExpiration(sb2)
|
||||
|
||||
a1URL := fmt.Sprintf("nats://%s:%d", oa1.Host, oa1.Port)
|
||||
a2URL := fmt.Sprintf("nats://%s:%d", oa2.Host, oa2.Port)
|
||||
b1URL := fmt.Sprintf("nats://%s:%d", ob1.Host, ob1.Port)
|
||||
b2URL := fmt.Sprintf("nats://%s:%d", ob2.Host, ob2.Port)
|
||||
|
||||
ncb1 := natsConnect(t, b1URL)
|
||||
defer ncb1.Close()
|
||||
|
||||
ncb2 := natsConnect(t, b2URL)
|
||||
defer ncb2.Close()
|
||||
|
||||
natsSubSync(t, ncb1, "just.a.sub")
|
||||
natsSubSync(t, ncb2, "just.a.sub")
|
||||
checkExpectedSubs(t, 2, sb1, sb2)
|
||||
|
||||
// For this test, we want A to be checking B's interest in order
|
||||
// to send messages (which would cause replies to be dropped if
|
||||
// there is no interest registered on A). So from A servers,
|
||||
// send to various subjects and cause B's to switch to interestOnly
|
||||
// mode.
|
||||
nca1 := natsConnect(t, a1URL)
|
||||
defer nca1.Close()
|
||||
for i := 0; i < 10; i++ {
|
||||
natsPub(t, nca1, fmt.Sprintf("reject.%d", i), []byte("hello"))
|
||||
}
|
||||
nca2 := natsConnect(t, a2URL)
|
||||
defer nca2.Close()
|
||||
for i := 0; i < 10; i++ {
|
||||
natsPub(t, nca2, fmt.Sprintf("reject.%d", i), []byte("hello"))
|
||||
}
|
||||
|
||||
checkSwitchedMode := func(t *testing.T, s *Server) {
|
||||
t.Helper()
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
var switchedMode bool
|
||||
c := s.getOutboundGatewayConnection("B")
|
||||
ei, _ := c.gw.outsim.Load(globalAccountName)
|
||||
if ei != nil {
|
||||
e := ei.(*outsie)
|
||||
e.RLock()
|
||||
switchedMode = e.ni == nil && e.mode == modeInterestOnly
|
||||
e.RUnlock()
|
||||
}
|
||||
if !switchedMode {
|
||||
return fmt.Errorf("Still not switched mode")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
checkSwitchedMode(t, sa1)
|
||||
checkSwitchedMode(t, sa2)
|
||||
|
||||
// Setup a subscriber on _INBOX.> on each of A's servers.
|
||||
total := 1000
|
||||
expected := int32(total)
|
||||
rcvOnA := int32(0)
|
||||
qrcvOnA := int32(0)
|
||||
natsSub(t, nca1, "myreply.>", func(_ *nats.Msg) {
|
||||
atomic.AddInt32(&rcvOnA, 1)
|
||||
})
|
||||
natsQueueSub(t, nca2, "myreply.>", "bar", func(_ *nats.Msg) {
|
||||
atomic.AddInt32(&qrcvOnA, 1)
|
||||
})
|
||||
checkExpectedSubs(t, 2, sa1, sa2)
|
||||
|
||||
// Ok.. so now we will run the actual test where we
|
||||
// create a responder on A1 and make sure that every
|
||||
// single request from B1 gets the reply. Will repeat
|
||||
// test with responder connected to A2.
|
||||
sendReqs := func(t *testing.T, subConn *nats.Conn) {
|
||||
t.Helper()
|
||||
responder := natsSub(t, subConn, "foo", func(m *nats.Msg) {
|
||||
nca1.Publish(m.Reply, []byte("reply"))
|
||||
})
|
||||
natsFlush(t, subConn)
|
||||
checkExpectedSubs(t, 3, sa1, sa2)
|
||||
|
||||
// We are not going to use Request() because this sets
|
||||
// a wildcard subscription on an INBOX and less likely
|
||||
// to produce the race. Instead we will explicitly set
|
||||
// the subscription on the reply subject and create one
|
||||
// per request.
|
||||
for i := 0; i < total/2; i++ {
|
||||
reply := fmt.Sprintf("myreply.%d", i)
|
||||
replySub := natsQueueSubSync(t, ncb1, reply, "bar")
|
||||
natsFlush(t, ncb1)
|
||||
|
||||
// Let's make sure we have interest on B2.
|
||||
if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 {
|
||||
checkFor(t, time.Second, time.Millisecond, func() error {
|
||||
if r := sb2.globalAccount().sl.Match(reply); len(r.qsubs) == 0 {
|
||||
return fmt.Errorf("B still not registered interest on %s", reply)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
natsPubReq(t, ncb1, "foo", reply, []byte("request"))
|
||||
if _, err := replySub.NextMsg(time.Second); err != nil {
|
||||
t.Fatalf("Did not receive reply: %v", err)
|
||||
}
|
||||
natsUnsub(t, replySub)
|
||||
}
|
||||
|
||||
responder.Unsubscribe()
|
||||
natsFlush(t, subConn)
|
||||
checkExpectedSubs(t, 2, sa1, sa2)
|
||||
}
|
||||
sendReqs(t, nca1)
|
||||
sendReqs(t, nca2)
|
||||
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
if n := atomic.LoadInt32(&rcvOnA); n != expected {
|
||||
return fmt.Errorf("Subs on A expected to get %v replies, got %v", expected, n)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// We should not have received a single message on the queue sub
|
||||
// on cluster A because messages will have been delivered to
|
||||
// the member on cluster B.
|
||||
if n := atomic.LoadInt32(&qrcvOnA); n != 0 {
|
||||
t.Fatalf("Queue sub on A should not have received message, got %v", n)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,7 +299,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
|
||||
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, pmrNoFlag)
|
||||
}
|
||||
|
||||
// Helper function for routes and gateways to create qfilters need for
|
||||
|
||||
@@ -983,6 +983,8 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
|
||||
startCh := make(chan bool)
|
||||
l := b.N / numPublishers
|
||||
|
||||
lastMsgSendOp := []byte("PUB end.test 2\r\nok\r\n")
|
||||
|
||||
pubLoop := func(c net.Conn, ch chan bool) {
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
|
||||
@@ -998,7 +1000,7 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
|
||||
return
|
||||
}
|
||||
}
|
||||
if _, err := bw.Write([]byte("PUB end.test 2\r\nok\r\n")); err != nil {
|
||||
if _, err := bw.Write(lastMsgSendOp); err != nil {
|
||||
b.Errorf("Received error on PUB write: %v\n", err)
|
||||
return
|
||||
}
|
||||
@@ -1006,11 +1008,13 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
return
|
||||
}
|
||||
flushConnection(b, c)
|
||||
}
|
||||
|
||||
// Publish Connections SPINUP
|
||||
for i := 0; i < numPublishers; i++ {
|
||||
c := createClientConn(b, oa.Host, oa.Port)
|
||||
defer c.Close()
|
||||
doDefaultConnect(b, c)
|
||||
flushConnection(b, c)
|
||||
ch := make(chan bool)
|
||||
@@ -1019,7 +1023,18 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
|
||||
<-ch
|
||||
}
|
||||
|
||||
b.SetBytes(int64(len(sendOp) + len(msgOp)))
|
||||
// To report the number of bytes:
|
||||
// from publisher to server on cluster A:
|
||||
numBytes := len(sendOp)
|
||||
if subInterest {
|
||||
// from server in cluster A to server on cluster B:
|
||||
// RMSG $G foo <payload size> <payload>\r\n
|
||||
numBytes += len("RMSG $G foo xxxx ") + len(payload) + 2
|
||||
|
||||
// From server in cluster B to sub:
|
||||
numBytes += len(msgOp)
|
||||
}
|
||||
b.SetBytes(int64(numBytes))
|
||||
b.ResetTimer()
|
||||
|
||||
// Closing this will start all publishers at once (roughly)
|
||||
@@ -1031,51 +1046,51 @@ func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublish
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_1kx01x0(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_1kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(1024), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_2kx01x0(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_2kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(2048), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_4kx01x0(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_4kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(4096), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_1kx10x0(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_1kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(1024), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_2kx10x0(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_2kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(2048), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_4kx10x0(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_4kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(4096), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_1kx01x1(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_1kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(1024), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_2kx01x1(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_2kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(2048), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_4kx01x1(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_4kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(4096), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_1kx10x1(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_1kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(1024), 10, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_2kx10x1(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_2kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(2048), 10, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_4kx10x1(b *testing.B) {
|
||||
func Benchmark_Gateways_Optimistic_4kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(4096), 10, true)
|
||||
}
|
||||
|
||||
@@ -1127,6 +1142,99 @@ func Benchmark_Gateways_InterestOnly_4kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(4096), 10, true)
|
||||
}
|
||||
|
||||
// This bench only sends the requests to verify impact of reply
|
||||
// reply mapping in GW code.
|
||||
func gatewaySendRequestsBench(b *testing.B, singleReplySub bool) {
|
||||
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := RunServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
gwbURL, err := url.Parse(fmt.Sprintf("nats://%s:%d", ob.Gateway.Host, ob.Gateway.Port))
|
||||
if err != nil {
|
||||
b.Fatalf("Error parsing url: %v", err)
|
||||
}
|
||||
oa := testDefaultOptionsForGateway("A")
|
||||
oa.Gateway.Gateways = []*server.RemoteGatewayOpts{
|
||||
&server.RemoteGatewayOpts{
|
||||
Name: "B",
|
||||
URLs: []*url.URL{gwbURL},
|
||||
},
|
||||
}
|
||||
sa := RunServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
sub := createClientConn(b, ob.Host, ob.Port)
|
||||
defer sub.Close()
|
||||
doDefaultConnect(b, sub)
|
||||
sendProto(b, sub, "SUB foo 1\r\n")
|
||||
flushConnection(b, sub)
|
||||
|
||||
lenMsg := len("MSG foo reply.xxxxxxxxxx 1 2\r\nok\r\n")
|
||||
expected := b.N * lenMsg
|
||||
if !singleReplySub {
|
||||
expected += b.N * len("$GR.1234.")
|
||||
}
|
||||
ch := make(chan bool, 1)
|
||||
go drainConnection(b, sub, ch, expected)
|
||||
|
||||
c := createClientConn(b, oa.Host, oa.Port)
|
||||
defer c.Close()
|
||||
doDefaultConnect(b, c)
|
||||
flushConnection(b, c)
|
||||
|
||||
// From pub to server in cluster A:
|
||||
numBytes := len("PUB foo reply.0123456789 2\r\nok\r\n")
|
||||
if !singleReplySub {
|
||||
// Add the preceding SUB
|
||||
numBytes += len("SUB reply.0123456789 0123456789\r\n")
|
||||
// And UNSUB...
|
||||
numBytes += len("UNSUB 0123456789\r\n")
|
||||
}
|
||||
// From server in cluster A to cluster B
|
||||
numBytes += len("RMSG $G foo reply.0123456789 2\r\nok\r\n")
|
||||
// If mapping of reply...
|
||||
if !singleReplySub {
|
||||
// the mapping uses about 10 more bytes. So add them
|
||||
// for RMSG from server to server, and MSG to sub.
|
||||
numBytes += 20
|
||||
}
|
||||
// From server in cluster B to sub
|
||||
numBytes += lenMsg
|
||||
b.SetBytes(int64(numBytes))
|
||||
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
var subStr string
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
if !singleReplySub {
|
||||
subStr = fmt.Sprintf("SUB reply.%010d %010d\r\n", i+1, i+1)
|
||||
}
|
||||
bw.Write([]byte(fmt.Sprintf("%sPUB foo reply.%010d 2\r\nok\r\n", subStr, i+1)))
|
||||
// Simulate that we are doing actual request/reply and therefore
|
||||
// unsub'ing the subs on the reply subject.
|
||||
if !singleReplySub && i > 1000 {
|
||||
bw.Write([]byte(fmt.Sprintf("UNSUB %010d\r\n", (i - 1000))))
|
||||
}
|
||||
}
|
||||
bw.Flush()
|
||||
flushConnection(b, c)
|
||||
|
||||
<-ch
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_Requests_CreateOneSubForAll(b *testing.B) {
|
||||
gatewaySendRequestsBench(b, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_Requests_CreateOneSubForEach(b *testing.B) {
|
||||
gatewaySendRequestsBench(b, false)
|
||||
}
|
||||
|
||||
// Run some benchmarks against interest churn across routes.
|
||||
// Watching for contention retrieving accounts, etc.
|
||||
func Benchmark_________________LookupAccount(b *testing.B) {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
@@ -415,11 +416,33 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
// switch.
|
||||
for i := 0; i < 10001; i++ {
|
||||
gASend(fmt.Sprintf("RMSG $G foo.%d 2\r\nok\r\n", i))
|
||||
if i < 1000 {
|
||||
if i <= 1000 {
|
||||
gAExpect(runsubRe)
|
||||
}
|
||||
}
|
||||
// Since B has no sub, we should get 2 INFOs with start/end
|
||||
// commands.
|
||||
expectNumberOfProtos(t, gAExpect, infoRe, 2)
|
||||
// Expect an INFO + RS+ $G not.used + INFO
|
||||
buf := bufio.NewReader(gA)
|
||||
for i := 0; i < 3; i++ {
|
||||
line, _, err := buf.ReadLine()
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading: %v", err)
|
||||
}
|
||||
switch i {
|
||||
case 0:
|
||||
case 2:
|
||||
if !bytes.HasPrefix(line, []byte("INFO {")) {
|
||||
t.Fatalf("Expected INFO, got: %s", line)
|
||||
}
|
||||
case 1:
|
||||
if !bytes.HasPrefix(line, []byte("RS+ ")) {
|
||||
t.Fatalf("Expected RS+, got: %s", line)
|
||||
}
|
||||
}
|
||||
}
|
||||
// After this point, any new sub or unsub on B should be
|
||||
// sent to A.
|
||||
clientSend("SUB foo 1\r\n")
|
||||
gAExpect(rsubRe)
|
||||
clientSend("UNSUB 1\r\n")
|
||||
gAExpect(runsubRe)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user