mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Merge pull request #916 from nats-io/gw_acc_sub_unsub
Gateways: Rework Account Sub/Unsub
This commit is contained in:
@@ -39,8 +39,23 @@ var (
|
||||
gatewayConnectDelay = defaultGatewayConnectDelay
|
||||
gatewayReconnectDelay = defaultGatewayReconnectDelay
|
||||
gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch
|
||||
gatewaySolicitDelay = int64(defaultSolicitGatewaysDelay)
|
||||
)
|
||||
|
||||
// SetGatewaysSolicitDelay sets the initial delay before gateways
|
||||
// connections are initiated.
|
||||
// Used by tests.
|
||||
func SetGatewaysSolicitDelay(delay time.Duration) {
|
||||
atomic.StoreInt64(&gatewaySolicitDelay, int64(delay))
|
||||
}
|
||||
|
||||
// ResetGatewaysSolicitDelay resets the initial delay before gateways
|
||||
// connections are initiated to its default values.
|
||||
// Used by tests.
|
||||
func ResetGatewaysSolicitDelay() {
|
||||
atomic.StoreInt64(&gatewaySolicitDelay, int64(defaultSolicitGatewaysDelay))
|
||||
}
|
||||
|
||||
const (
|
||||
gatewayCmdGossip byte = 1
|
||||
gatewayCmdAllSubsStart byte = 2
|
||||
@@ -123,7 +138,7 @@ type gateway struct {
|
||||
connectURL *url.URL // Needed when sending CONNECT after receiving INFO from remote
|
||||
infoJSON []byte // Needed when sending INFO after receiving INFO from remote
|
||||
outsim *sync.Map // Per-account subject interest (or no-interest) (outbound conn)
|
||||
insim map[string]*insie // Per-account subject no-interest sent or send-all-subs mode (inbound conn)
|
||||
insim map[string]*insie // Per-account subject no-interest sent or modeInterestOnly mode (inbound conn)
|
||||
}
|
||||
|
||||
// Outbound subject interest entry.
|
||||
@@ -151,7 +166,7 @@ type outsie struct {
|
||||
// an RS+ to clear the no-interest in the remote.
|
||||
// When an account is switched to modeInterestOnly (we send
|
||||
// all subs of an account to the remote), then `ni` is nil and
|
||||
// when all subs have been sent, `sap` is set to true.
|
||||
// when all subs have been sent, mode is set to modeInterestOnly
|
||||
type insie struct {
|
||||
ni map[string]struct{} // Record if RS- was sent for given subject
|
||||
mode byte // modeOptimistic or modeInterestOnly
|
||||
@@ -297,7 +312,7 @@ func (s *Server) startGateways() {
|
||||
|
||||
dur := s.getOpts().gatewaysSolicitDelay
|
||||
if dur == 0 {
|
||||
dur = defaultSolicitGatewaysDelay
|
||||
dur = time.Duration(atomic.LoadInt64(&gatewaySolicitDelay))
|
||||
}
|
||||
|
||||
select {
|
||||
@@ -1529,6 +1544,9 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
var e *outsie
|
||||
var useSl, newe bool
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
ei, _ := c.gw.outsim.Load(accName)
|
||||
if ei != nil {
|
||||
e = ei.(*outsie)
|
||||
@@ -1550,8 +1568,6 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
// the sublist. Look for it and remove.
|
||||
if useSl {
|
||||
key := arg
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// m[string()] does not cause mem allocation
|
||||
sub, ok := c.subs[string(key)]
|
||||
// if RS- for a sub that we don't have, just ignore.
|
||||
@@ -1611,6 +1627,9 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
var e *outsie
|
||||
var useSl, newe bool
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
ei, _ := c.gw.outsim.Load(string(accName))
|
||||
// We should always have an existing entry for plain subs because
|
||||
// in optimistic mode we would have received RS- first, and
|
||||
@@ -1637,8 +1656,6 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
} else {
|
||||
key = arg
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// If RS+ for a sub that we already have, ignore.
|
||||
// (m[string()] does not allocate memory)
|
||||
if _, ok := c.subs[string(key)]; ok {
|
||||
@@ -1732,48 +1749,10 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
|
||||
return psi, r
|
||||
}
|
||||
|
||||
// This is invoked when an account is registered. We check if we did send
|
||||
// to remote gateways a "no interest" in the past when receiving messages.
|
||||
// If we did, we send to the remote gateway an A+ protocol (see
|
||||
// processGatewayAccountSub()).
|
||||
// <Invoked from outbound connection's readLoop>
|
||||
func (s *Server) endAccountNoInterestForGateways(accName string) {
|
||||
gwsa := [4]*client{}
|
||||
gws := gwsa[:0]
|
||||
s.getInboundGatewayConnections(&gws)
|
||||
if len(gws) == 0 {
|
||||
return
|
||||
}
|
||||
var protoa [256]byte
|
||||
var proto []byte
|
||||
for _, c := range gws {
|
||||
c.mu.Lock()
|
||||
// If value in map, it means we sent an A- and need
|
||||
// to clear and send A+ now.
|
||||
if _, inMap := c.gw.insim[accName]; inMap {
|
||||
delete(c.gw.insim, accName)
|
||||
if proto == nil {
|
||||
proto = protoa[:0]
|
||||
proto = append(proto, aSubBytes...)
|
||||
proto = append(proto, accName...)
|
||||
proto = append(proto, CR_LF...)
|
||||
}
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
c.sendProto(proto, true)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// This is invoked when registering (or unregistering) the first
|
||||
// (or last) subscription on a given account/subject. For each
|
||||
// GWs inbound connections, we will check if we need to send
|
||||
// the protocol. In optimistic mode we would send an RS+ only
|
||||
// if we had previously sent an RS-. If we are in the send-all-subs
|
||||
// mode then the protocol is always sent.
|
||||
// <Invoked from outbound connection's readLoop>
|
||||
// GWs inbound connections, we will check if we need to send an RS+ or A+
|
||||
// protocol.
|
||||
func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription, added bool) {
|
||||
if sub.queue != nil {
|
||||
return
|
||||
@@ -1785,18 +1764,24 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription
|
||||
return
|
||||
}
|
||||
var (
|
||||
protoa [512]byte
|
||||
proto []byte
|
||||
subject = string(sub.subject)
|
||||
hasWc = subjectHasWildcard(subject)
|
||||
rsProtoa [512]byte
|
||||
rsProto []byte
|
||||
accProtoa [256]byte
|
||||
accProto []byte
|
||||
proto []byte
|
||||
subject = string(sub.subject)
|
||||
hasWc = subjectHasWildcard(subject)
|
||||
)
|
||||
for _, c := range gws {
|
||||
sendProto := false
|
||||
proto = nil
|
||||
c.mu.Lock()
|
||||
e := c.gw.insim[accName]
|
||||
e, inMap := c.gw.insim[accName]
|
||||
// If there is a inbound subject interest entry...
|
||||
if e != nil {
|
||||
// If there is a map, need to check if we had sent no-interest.
|
||||
if e.ni != nil {
|
||||
sendProto := false
|
||||
// In optimistic mode, we care only about possibly sending RS+ (or A+)
|
||||
// so if e.ni is not nil we do things only when adding a new subscription.
|
||||
if e.ni != nil && added {
|
||||
// For wildcard subjects, we will remove from our no-interest
|
||||
// map, all subjects that are a subset of this wc subject, but we
|
||||
// still send the wc subject and let the remote do its own cleanup.
|
||||
@@ -1815,24 +1800,49 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription
|
||||
// We are in the mode where we always send RS+/- protocols.
|
||||
sendProto = true
|
||||
}
|
||||
}
|
||||
if sendProto {
|
||||
if proto == nil {
|
||||
proto = protoa[:0]
|
||||
if added {
|
||||
proto = append(proto, rSubBytes...)
|
||||
if sendProto {
|
||||
if rsProto == nil {
|
||||
// Construct the RS+/- only once
|
||||
proto = rsProtoa[:0]
|
||||
if added {
|
||||
proto = append(proto, rSubBytes...)
|
||||
} else {
|
||||
proto = append(proto, rUnsubBytes...)
|
||||
}
|
||||
proto = append(proto, accName...)
|
||||
proto = append(proto, ' ')
|
||||
proto = append(proto, sub.subject...)
|
||||
proto = append(proto, CR_LF...)
|
||||
rsProto = proto
|
||||
} else {
|
||||
proto = append(proto, rUnsubBytes...)
|
||||
// Point to the already constructed RS+/-
|
||||
proto = rsProto
|
||||
}
|
||||
proto = append(proto, accName...)
|
||||
proto = append(proto, ' ')
|
||||
proto = append(proto, sub.subject...)
|
||||
proto = append(proto, CR_LF...)
|
||||
}
|
||||
} else if added && inMap {
|
||||
// Here, we have a `nil` entry for this account in
|
||||
// the map, which means that we have previously sent
|
||||
// an A-. We have a new subscription, so we need to
|
||||
// send an A+ and delete the entry from the map so
|
||||
// that we do this only once.
|
||||
delete(c.gw.insim, accName)
|
||||
if accProto == nil {
|
||||
// Construct the A+ only once
|
||||
proto = accProtoa[:0]
|
||||
proto = append(proto, aSubBytes...)
|
||||
proto = append(proto, accName...)
|
||||
proto = append(proto, CR_LF...)
|
||||
accProto = proto
|
||||
} else {
|
||||
// Point to the already constructed A+
|
||||
proto = accProto
|
||||
}
|
||||
}
|
||||
if proto != nil {
|
||||
c.sendProto(proto, true)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
c.sendProto(proto, true)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
@@ -1875,17 +1885,17 @@ func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscriptio
|
||||
proto = append(proto, CR_LF...)
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.sendProto(proto, true)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
c.sendProto(proto, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// This is invoked when a (queue) subscription is added/removed locally
|
||||
// or in our cluster. We use ref counting to know when to update
|
||||
// the inbound gateways.
|
||||
// This is invoked when a subscription (plain or queue) is
|
||||
// added/removed locally or in our cluster. We use ref counting
|
||||
// to know when to update the inbound gateways.
|
||||
// <Invoked from client or route connection's readLoop or when such
|
||||
// connection is closed>
|
||||
func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, change int32) {
|
||||
@@ -1936,6 +1946,9 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
|
||||
if entry.n <= 0 {
|
||||
delete(st, string(key))
|
||||
last = true
|
||||
if len(st) == 0 {
|
||||
delete(accMap, accName)
|
||||
}
|
||||
}
|
||||
}
|
||||
if first || last {
|
||||
@@ -2050,10 +2063,10 @@ func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change
|
||||
s.mu.Lock()
|
||||
for _, r := range s.routes {
|
||||
r.mu.Lock()
|
||||
if r.trace {
|
||||
r.traceOutOp("", rsproto[:len(rsproto)-2])
|
||||
}
|
||||
r.sendProto(rsproto, true)
|
||||
if r.trace {
|
||||
r.traceOutOp("", rsproto[:len(rsproto)-LEN_CR_LF])
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
@@ -2061,6 +2074,108 @@ func (s *Server) gatewayHandleServiceImport(acc *Account, subject []byte, change
|
||||
s.gatewayUpdateSubInterest(acc.Name, sub, change)
|
||||
}
|
||||
|
||||
// Possibly sends an A- to the remote gateway `c`.
|
||||
// Invoked when processing an inbound message and the account is not found.
|
||||
// A check under a lock that protects processing of SUBs and UNSUBs is
|
||||
// done to make sure that we don't send the A- if a subscription has just
|
||||
// been created at the same time, which would otherwise results in the
|
||||
// remote never sending messages on this account until a new subscription
|
||||
// is created.
|
||||
func (s *Server) gatewayHandleAccountNoInterest(c *client, accName []byte) {
|
||||
// Check and possibly send the A- under this lock.
|
||||
s.gateway.pasi.Lock()
|
||||
defer s.gateway.pasi.Unlock()
|
||||
|
||||
si, inMap := s.gateway.pasi.m[string(accName)]
|
||||
if inMap && si != nil && len(si) > 0 {
|
||||
return
|
||||
}
|
||||
c.sendAccountUnsubToGateway(accName)
|
||||
}
|
||||
|
||||
// Helper that sends an A- to this remote gateway if not already done.
|
||||
// This function should not be invoked directly but instead be invoked
|
||||
// by functions holding the gateway.pasi's Lock.
|
||||
func (c *client) sendAccountUnsubToGateway(accName []byte) {
|
||||
// Check if we have sent the A- or not.
|
||||
c.mu.Lock()
|
||||
if _, sent := c.gw.insim[string(accName)]; !sent {
|
||||
// Add a nil value to indicate that we have sent an A-
|
||||
// so that we know to send A+ when needed.
|
||||
c.gw.insim[string(accName)] = nil
|
||||
var protoa [256]byte
|
||||
proto := protoa[:0]
|
||||
proto = append(proto, aUnsubBytes...)
|
||||
proto = append(proto, accName...)
|
||||
proto = append(proto, CR_LF...)
|
||||
c.sendProto(proto, true)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Possibly sends an A- for this account or RS- for this subject.
|
||||
// Invoked when processing an inbound message and the account is found
|
||||
// but there is no interest on this subject.
|
||||
// A test is done under a lock that protects processing of SUBs and UNSUBs
|
||||
// and if there is no subscription at this time, we send an A-. If there
|
||||
// is at least a subscription, but no interest on this subject, we send
|
||||
// an RS- for this subject (if not already done).
|
||||
func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName, subject []byte) {
|
||||
s.gateway.pasi.Lock()
|
||||
defer s.gateway.pasi.Unlock()
|
||||
|
||||
// If there is at least a subscription, possibly send RS-
|
||||
if acc.sl.Count() != 0 {
|
||||
sendProto := false
|
||||
c.mu.Lock()
|
||||
// Send an RS- protocol if not already done and only if
|
||||
// not in the modeInterestOnly.
|
||||
e := c.gw.insim[string(accName)]
|
||||
if e == nil {
|
||||
e = &insie{ni: make(map[string]struct{})}
|
||||
e.ni[string(subject)] = struct{}{}
|
||||
c.gw.insim[string(accName)] = e
|
||||
sendProto = true
|
||||
} else if e.ni != nil {
|
||||
// If we are not in modeInterestOnly, check if we
|
||||
// have already sent an RS-
|
||||
if _, alreadySent := e.ni[string(subject)]; !alreadySent {
|
||||
// TODO(ik): pick some threshold as to when
|
||||
// we need to switch mode
|
||||
if len(e.ni) > gatewayMaxRUnsubBeforeSwitch {
|
||||
// If too many RS-, switch to all-subs-mode.
|
||||
c.gatewaySwitchAccountToSendAllSubs(e, accName)
|
||||
} else {
|
||||
e.ni[string(subject)] = struct{}{}
|
||||
sendProto = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if sendProto {
|
||||
var (
|
||||
protoa = [512]byte{}
|
||||
proto = protoa[:0]
|
||||
)
|
||||
proto = append(proto, rUnsubBytes...)
|
||||
proto = append(proto, accName...)
|
||||
proto = append(proto, ' ')
|
||||
proto = append(proto, subject...)
|
||||
proto = append(proto, CR_LF...)
|
||||
c.sendProto(proto, true)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
} else {
|
||||
// There is not a single subscription, send an A- (if not already done).
|
||||
c.sendAccountUnsubToGateway([]byte(acc.Name))
|
||||
}
|
||||
}
|
||||
|
||||
// Process a message coming from a remote gateway. Send to any sub/qsub
|
||||
// in our cluster that is matching. When receiving a message for an
|
||||
// account or subject for which there is no interest in this cluster
|
||||
@@ -2087,30 +2202,14 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
|
||||
|
||||
acc, r := c.getAccAndResultFromCache()
|
||||
if acc == nil {
|
||||
c.Debugf("Unknown account %q for routed message on subject: %q", c.pa.account, c.pa.subject)
|
||||
// Send A- only once...
|
||||
c.mu.Lock()
|
||||
if _, sent := c.gw.insim[string(c.pa.account)]; !sent {
|
||||
// Add a nil value to indicate that we have sent an A-
|
||||
// so that we know to send A+ when/if account gets registered.
|
||||
c.gw.insim[string(c.pa.account)] = nil
|
||||
var protoa [256]byte
|
||||
proto := protoa[:0]
|
||||
proto = append(proto, aUnsubBytes...)
|
||||
proto = append(proto, c.pa.account...)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto)
|
||||
}
|
||||
proto = append(proto, CR_LF...)
|
||||
c.sendProto(proto, true)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
c.Debugf("Unknown account %q for gateway message on subject: %q", c.pa.account, c.pa.subject)
|
||||
c.srv.gatewayHandleAccountNoInterest(c, c.pa.account)
|
||||
return
|
||||
}
|
||||
|
||||
// Check to see if we need to map/route to another account.
|
||||
if acc.imports.services != nil && isServiceReply(c.pa.subject) {
|
||||
// We are handling an response to a request that we mapped
|
||||
// We are handling a response to a request that we mapped
|
||||
// via service imports, so if we are here we are the
|
||||
// origin server
|
||||
c.checkForImportServices(acc, msg)
|
||||
@@ -2119,48 +2218,11 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
|
||||
// If there is no interest on plain subs, possibly send an RS-,
|
||||
// even if there is qsubs interest.
|
||||
if len(r.psubs) == 0 {
|
||||
sendProto := false
|
||||
c.mu.Lock()
|
||||
// Send an RS- protocol if not already done and only if
|
||||
// not in the send-all-subs mode.
|
||||
e := c.gw.insim[string(c.pa.account)]
|
||||
if e == nil {
|
||||
e = &insie{ni: make(map[string]struct{})}
|
||||
e.ni[string(c.pa.subject)] = struct{}{}
|
||||
c.gw.insim[string(c.pa.account)] = e
|
||||
sendProto = true
|
||||
} else if e.ni != nil {
|
||||
// If we are not in send-all-subs mode, check if we
|
||||
// have already sent an RS-
|
||||
if _, alreadySent := e.ni[string(c.pa.subject)]; !alreadySent {
|
||||
// TODO(ik): pick some threshold as to when
|
||||
// we need to switch mode
|
||||
if len(e.ni) > gatewayMaxRUnsubBeforeSwitch {
|
||||
// If too many RS-, switch to all-subs-mode.
|
||||
c.gatewaySwitchAccountToSendAllSubs(e)
|
||||
} else {
|
||||
e.ni[string(c.pa.subject)] = struct{}{}
|
||||
sendProto = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if sendProto {
|
||||
var (
|
||||
protoa = [512]byte{}
|
||||
proto = protoa[:0]
|
||||
)
|
||||
proto = append(proto, rUnsubBytes...)
|
||||
proto = append(proto, c.pa.account...)
|
||||
proto = append(proto, ' ')
|
||||
proto = append(proto, c.pa.subject...)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto)
|
||||
}
|
||||
proto = append(proto, CR_LF...)
|
||||
c.sendProto(proto, true)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
if len(r.qsubs) == 0 || len(c.pa.queues) == 0 {
|
||||
c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject)
|
||||
|
||||
// If there is also no queue filter, then no point in continuing
|
||||
// (even if r.qsubs i > 0).
|
||||
if len(c.pa.queues) == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -2185,9 +2247,6 @@ func (c *client) gatewayAllSubsReceiveStart(info *Info) {
|
||||
ei, _ := c.gw.outsim.Load(account)
|
||||
if ei != nil {
|
||||
e := ei.(*outsie)
|
||||
// Would not even need locking here since this is
|
||||
// checked only from this go routine, but it's a
|
||||
// one-time event, so...
|
||||
e.Lock()
|
||||
e.mode = modeTransitioning
|
||||
e.Unlock()
|
||||
@@ -2239,12 +2298,12 @@ func getAccountFromGatewayCommand(c *client, info *Info, cmd string) string {
|
||||
// sent.
|
||||
// The client's lock is held on entry.
|
||||
// <Invoked from inbound connection's readLoop>
|
||||
func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie) {
|
||||
func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie, accName []byte) {
|
||||
// Set this map to nil so that the no-interest is
|
||||
// no longer checked.
|
||||
e.ni = nil
|
||||
// Capture this since we are passing it to a go-routine.
|
||||
account := string(c.pa.account)
|
||||
account := string(accName)
|
||||
s := c.srv
|
||||
|
||||
// Function that will create an INFO protocol
|
||||
|
||||
@@ -95,7 +95,7 @@ func checkForRegisteredQSubInterest(t *testing.T, s *Server, gwName, acc, subj s
|
||||
})
|
||||
}
|
||||
|
||||
func checkForNoInterest(t *testing.T, c *client, account, subject string, expectNoInterest bool, timeout time.Duration) {
|
||||
func checkForSubjectNoInterest(t *testing.T, c *client, account, subject string, expectNoInterest bool, timeout time.Duration) {
|
||||
t.Helper()
|
||||
checkFor(t, timeout, 15*time.Millisecond, func() error {
|
||||
ei, _ := c.gw.outsim.Load(account)
|
||||
@@ -1253,6 +1253,9 @@ func setAccountUserPassInOptions(o *Options, accName, username, password string)
|
||||
|
||||
func TestGatewayAccountInterest(t *testing.T) {
|
||||
o2 := testDefaultOptionsForGateway("B")
|
||||
// Add users to cause s2 to require auth. Will add an account with user
|
||||
// later.
|
||||
o2.Users = append([]*User(nil), &User{Username: "test", Password: "pwd"})
|
||||
s2 := runGatewayServer(o2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
@@ -1293,27 +1296,13 @@ func TestGatewayAccountInterest(t *testing.T) {
|
||||
gwcc := s1.getOutboundGatewayConnection("C")
|
||||
checkCount(t, gwcc, 1)
|
||||
|
||||
// S2 should have sent a protocol indicating no interest.
|
||||
// S2 should have sent a protocol indicating no account interest.
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
if _, inMap := gwcb.gw.outsim.Load("$foo"); !inMap {
|
||||
if e, inMap := gwcb.gw.outsim.Load("$foo"); !inMap || e != nil {
|
||||
return fmt.Errorf("Did not receive account no interest")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
ei, inMap := gwcc.gw.outsim.Load("$foo")
|
||||
if !inMap {
|
||||
return fmt.Errorf("Did not receive subject no interest")
|
||||
}
|
||||
e := ei.(*outsie)
|
||||
e.RLock()
|
||||
_, inMap = e.ni["foo"]
|
||||
e.RUnlock()
|
||||
if !inMap {
|
||||
return fmt.Errorf("Did not receive subject no interest")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// Second send should not go through to B
|
||||
natsPub(t, nc, "foo", []byte("hello"))
|
||||
natsFlush(t, nc)
|
||||
@@ -1322,8 +1311,20 @@ func TestGatewayAccountInterest(t *testing.T) {
|
||||
// but because there is no interest on the subject.
|
||||
checkCount(t, gwcc, 1)
|
||||
|
||||
// Add account to S2, this should clear the no interest for that account.
|
||||
s2.RegisterAccount("$foo")
|
||||
// Add account to S2 and a client, this should clear the no-interest
|
||||
// for that account.
|
||||
s2FooAcc, err := s2.RegisterAccount("$foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Error registering account: %v", err)
|
||||
}
|
||||
s2.mu.Lock()
|
||||
s2.users["ivan"] = &User{Account: s2FooAcc, Username: "ivan", Password: "password"}
|
||||
s2.mu.Unlock()
|
||||
s2Url := fmt.Sprintf("nats://ivan:password@127.0.0.1:%d", o2.Port)
|
||||
ncS2 := natsConnect(t, s2Url)
|
||||
defer ncS2.Close()
|
||||
// Any subscription should cause s2 to send an A+
|
||||
natsSubSync(t, ncS2, "asub")
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
if _, inMap := gwcb.gw.outsim.Load("$foo"); inMap {
|
||||
return fmt.Errorf("NoInterest has not been cleared")
|
||||
@@ -1337,6 +1338,17 @@ func TestGatewayAccountInterest(t *testing.T) {
|
||||
// Still won't go to C since there is no sub interest
|
||||
checkCount(t, gwcc, 1)
|
||||
|
||||
// By closing the client from S2, the sole subscription for this
|
||||
// account will disappear and since S2 sent an A+, it will send
|
||||
// an A-.
|
||||
ncS2.Close()
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
if _, inMap := gwcb.gw.outsim.Load("$foo"); !inMap {
|
||||
return fmt.Errorf("NoInterest should be set")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Restart C and that should reset the no-interest
|
||||
s3.Shutdown()
|
||||
s3 = runGatewayServer(o3)
|
||||
@@ -1373,6 +1385,14 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
waitForOutboundGateways(t, s1, 1, time.Second)
|
||||
waitForOutboundGateways(t, s2, 1, time.Second)
|
||||
|
||||
// We will create a subscription that we are not testing so
|
||||
// that we don't get an A- in this test.
|
||||
s2Url := fmt.Sprintf("nats://ivan:password@127.0.0.1:%d", o2.Port)
|
||||
ncb := natsConnect(t, s2Url)
|
||||
defer ncb.Close()
|
||||
natsSubSync(t, ncb, "not.used")
|
||||
checkExpectedSubs(t, 1, s2)
|
||||
|
||||
s1Url := fmt.Sprintf("nats://ivan:password@127.0.0.1:%d", o1.Port)
|
||||
nc := natsConnect(t, s1Url)
|
||||
defer nc.Close()
|
||||
@@ -1395,7 +1415,7 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
// S2 should have sent a protocol indicating no subject interest.
|
||||
checkNoInterest := func(t *testing.T, subject string, expectedNoInterest bool) {
|
||||
t.Helper()
|
||||
checkForNoInterest(t, gwcb, "$foo", subject, expectedNoInterest, 2*time.Second)
|
||||
checkForSubjectNoInterest(t, gwcb, "$foo", subject, expectedNoInterest, 2*time.Second)
|
||||
}
|
||||
checkNoInterest(t, "foo", true)
|
||||
// Second send should not go through to B
|
||||
@@ -1404,15 +1424,12 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
checkCount(t, gwcb, 1)
|
||||
|
||||
// Now create subscription interest on B (s2)
|
||||
s2Url := fmt.Sprintf("nats://ivan:password@127.0.0.1:%d", o2.Port)
|
||||
ncb := natsConnect(t, s2Url)
|
||||
defer ncb.Close()
|
||||
ch := make(chan bool, 1)
|
||||
sub := natsSub(t, ncb, "foo", func(_ *nats.Msg) {
|
||||
ch <- true
|
||||
})
|
||||
natsFlush(t, ncb)
|
||||
checkExpectedSubs(t, 1, s2)
|
||||
checkExpectedSubs(t, 2, s2)
|
||||
checkExpectedSubs(t, 0, s1)
|
||||
|
||||
// This should clear the no interest for this subject
|
||||
@@ -1427,7 +1444,7 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
// Now unsubscribe, there won't be an UNSUB sent to the gateway.
|
||||
natsUnsub(t, sub)
|
||||
natsFlush(t, ncb)
|
||||
checkExpectedSubs(t, 0, s2)
|
||||
checkExpectedSubs(t, 1, s2)
|
||||
checkExpectedSubs(t, 0, s1)
|
||||
|
||||
// So now sending a message should go over, but then we should get an RS-
|
||||
@@ -1454,7 +1471,7 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
// This should clear the no-interest on both "foo" and "bar"
|
||||
natsSub(t, ncb, "*", func(_ *nats.Msg) {})
|
||||
natsFlush(t, ncb)
|
||||
checkExpectedSubs(t, 1, s2)
|
||||
checkExpectedSubs(t, 2, s2)
|
||||
checkExpectedSubs(t, 0, s1)
|
||||
checkNoInterest(t, "foo", false)
|
||||
checkNoInterest(t, "bar", false)
|
||||
@@ -1473,6 +1490,11 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
waitForOutboundGateways(t, s1, 1, time.Second)
|
||||
waitForOutboundGateways(t, s2, 1, time.Second)
|
||||
|
||||
ncb = natsConnect(t, s2Url)
|
||||
defer ncb.Close()
|
||||
natsSubSync(t, ncb, "not.used")
|
||||
checkExpectedSubs(t, 1, s2)
|
||||
|
||||
gwcb = s1.getOutboundGatewayConnection("B")
|
||||
checkCount(t, gwcb, 0)
|
||||
natsPub(t, nc, "foo", []byte("hello"))
|
||||
@@ -1510,8 +1532,8 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
natsSub(t, ncb2bis, "foo", func(_ *nats.Msg) {})
|
||||
natsFlush(t, ncb2bis)
|
||||
|
||||
// Wait for subscription to be registered locally on s2bis and remotely on s2
|
||||
checkExpectedSubs(t, 1, s2, s2bis)
|
||||
// Wait for subscriptions to be registered locally on s2bis and remotely on s2
|
||||
checkExpectedSubs(t, 2, s2, s2bis)
|
||||
|
||||
// Check that subject no-interest on A was cleared.
|
||||
checkNoInterest(t, "foo", false)
|
||||
@@ -2957,7 +2979,7 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
natsPub(t, ncA, "newsub", []byte("hello"))
|
||||
natsFlush(t, ncA)
|
||||
aOutboundToC := sa.getOutboundGatewayConnection("C")
|
||||
checkForNoInterest(t, aOutboundToC, globalAccountName, "newsub", true, 2*time.Second)
|
||||
checkForSubjectNoInterest(t, aOutboundToC, globalAccountName, "newsub", true, 2*time.Second)
|
||||
|
||||
newSubSub := natsSub(t, ncC, "newsub", func(_ *nats.Msg) {})
|
||||
natsFlush(t, ncC)
|
||||
@@ -2974,7 +2996,7 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
}
|
||||
return fmt.Errorf("Newsub not registered on B")
|
||||
})
|
||||
checkForNoInterest(t, aOutboundToC, globalAccountName, "newsub", false, 2*time.Second)
|
||||
checkForSubjectNoInterest(t, aOutboundToC, globalAccountName, "newsub", false, 2*time.Second)
|
||||
|
||||
natsUnsub(t, newSubSub)
|
||||
natsFlush(t, ncC)
|
||||
@@ -3308,6 +3330,10 @@ func TestGatewayServiceImport(t *testing.T) {
|
||||
waitForInboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sb, 1, 2*time.Second)
|
||||
|
||||
// We need at least a subscription on A otherwise when publishing
|
||||
// to subjects with no interest we would simply get an A-
|
||||
natsSubSync(t, clientA, "not.used")
|
||||
|
||||
// Create a client on B that will use account $foo
|
||||
bURL = fmt.Sprintf("nats://clientBFoo:password@127.0.0.1:%d", ob.Port)
|
||||
clientB = natsConnect(t, bURL)
|
||||
@@ -3606,6 +3632,10 @@ func TestGatewayServiceImportWithQueue(t *testing.T) {
|
||||
waitForInboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sb, 1, 2*time.Second)
|
||||
|
||||
// We need at least a subscription on A otherwise when publishing
|
||||
// to subjects with no interest we would simply get an A-
|
||||
natsSubSync(t, clientA, "not.used")
|
||||
|
||||
// Create a client on B that will use account $foo
|
||||
bURL = fmt.Sprintf("nats://clientBFoo:password@127.0.0.1:%d", ob.Port)
|
||||
clientB = natsConnect(t, bURL)
|
||||
@@ -3942,6 +3972,14 @@ func TestGatewayServiceImportComplexSetup(t *testing.T) {
|
||||
checkSubs(t, barB2, "B2", 0)
|
||||
|
||||
// Check that this all work in interest-only mode.
|
||||
|
||||
// We need at least a subscription on B2 otherwise when publishing
|
||||
// to subjects with no interest we would simply get an A-
|
||||
b2URL := fmt.Sprintf("nats://clientBFoo:password@127.0.0.1:%d", ob2.Port)
|
||||
clientB2 := natsConnect(t, b2URL)
|
||||
defer clientB2.Close()
|
||||
natsSubSync(t, clientB2, "not.used")
|
||||
|
||||
// Make A2 flood B2 with subjects that B2 is not interested in.
|
||||
for i := 0; i < 1100; i++ {
|
||||
natsPub(t, clientA, fmt.Sprintf("no.interest.%d", i), []byte("hello"))
|
||||
@@ -4281,6 +4319,14 @@ func TestGatewayServiceExportWithWildcards(t *testing.T) {
|
||||
checkSubs(t, barB2, "B2", 0)
|
||||
|
||||
// Check that this all work in interest-only mode.
|
||||
|
||||
// We need at least a subscription on B2 otherwise when publishing
|
||||
// to subjects with no interest we would simply get an A-
|
||||
b2URL := fmt.Sprintf("nats://clientBFoo:password@127.0.0.1:%d", ob2.Port)
|
||||
clientB2 := natsConnect(t, b2URL)
|
||||
defer clientB2.Close()
|
||||
natsSubSync(t, clientB2, "not.used")
|
||||
|
||||
// Make A2 flood B2 with subjects that B2 is not interested in.
|
||||
for i := 0; i < 1100; i++ {
|
||||
natsPub(t, clientA, fmt.Sprintf("no.interest.%d", i), []byte("hello"))
|
||||
|
||||
@@ -243,8 +243,6 @@ func NewServer(opts *Options) (*Server, error) {
|
||||
// Call this even if there is no gateway defined. It will
|
||||
// initialize the structure so we don't have to check for
|
||||
// it to be nil or not in various places in the code.
|
||||
// Do this before calling registerAccount() since registerAccount
|
||||
// may try to send things to gateways.
|
||||
gws, err := newGateway(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -695,11 +693,6 @@ func (s *Server) registerAccount(acc *Account) {
|
||||
acc.srv = s
|
||||
acc.mu.Unlock()
|
||||
s.accounts[acc.Name] = acc
|
||||
if s.gateway.enabled {
|
||||
// Check and possibly send an A+ to gateways for which
|
||||
// we had sent an A- because account did not exist at that time.
|
||||
s.endAccountNoInterestForGateways(acc.Name)
|
||||
}
|
||||
s.enableAccountTracking(acc)
|
||||
}
|
||||
|
||||
|
||||
@@ -130,10 +130,16 @@ func TestGatewayAccountInterest(t *testing.T) {
|
||||
gCSend("PING\r\n")
|
||||
gCExpect(pongRe)
|
||||
|
||||
// Now register the $foo account on B, A should receive an A+
|
||||
// because B knows that it previously sent an A-, but since
|
||||
// it did not send one to C, C should not receive the A+.
|
||||
// Now register the $foo account on B and create a subscription,
|
||||
// A should receive an A+ because B knows that it previously sent
|
||||
// an A-, but since it did not send one to C, C should not receive
|
||||
// the A+.
|
||||
sb.RegisterAccount("$foo")
|
||||
client := createClientConn(t, ob.Host, ob.Port)
|
||||
defer client.Close()
|
||||
clientSend, clientExpect := setupConnWithAccount(t, client, "$foo")
|
||||
clientSend("SUB not.used 1234567\r\nPING\r\n")
|
||||
clientExpect(pongRe)
|
||||
gAExpect(asubRe)
|
||||
expectNothing(t, gC)
|
||||
}
|
||||
@@ -146,6 +152,16 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
// Create a client on B
|
||||
client := createClientConn(t, ob.Host, ob.Port)
|
||||
defer client.Close()
|
||||
clientSend, clientExpect := setupConnWithUserPass(t, client, "ivan", "password")
|
||||
// Since we want to test RS+/-, we need to have at
|
||||
// least a subscription on B so that sending from A does
|
||||
// not result in A-
|
||||
clientSend("SUB not.used 1234567\r\nPING\r\n")
|
||||
clientExpect(pongRe)
|
||||
|
||||
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
|
||||
defer gA.Close()
|
||||
|
||||
@@ -207,10 +223,6 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
// A should receive a RS+ because B knows that it previously
|
||||
// sent a RS-, but since it did not send one to C, C should
|
||||
// not receive the RS+.
|
||||
client := createClientConn(t, ob.Host, ob.Port)
|
||||
defer client.Close()
|
||||
|
||||
clientSend, clientExpect := setupConnWithUserPass(t, client, "ivan", "password")
|
||||
clientSend("SUB foo 1\r\nSUB foo 2\r\n")
|
||||
// Also subscribe to subject that was not used before,
|
||||
// so there should be no RS+ for this one.
|
||||
@@ -381,6 +393,16 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
// Create a client on B
|
||||
client := createClientConn(t, ob.Host, ob.Port)
|
||||
defer client.Close()
|
||||
clientSend, clientExpect := setupConn(t, client)
|
||||
// Since we want to test RS+/-, we need to have at
|
||||
// least a subscription on B so that sending from A does
|
||||
// not result in A-
|
||||
clientSend("SUB not.used 1234567\r\nPING\r\n")
|
||||
clientExpect(pongRe)
|
||||
|
||||
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
|
||||
defer gA.Close()
|
||||
|
||||
|
||||
@@ -144,6 +144,13 @@ func TestSendRouteSubAndUnsub(t *testing.T) {
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
|
||||
// Routes now send their subs list from a go routine,
|
||||
// so it is possible that if we don't wait we get
|
||||
// the client SUB being forwarded, then for the UNSUB,
|
||||
// we get the go routine kicking-in and send the SUB again
|
||||
// (which is ok since it is idempotent on the receiving side)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Send SUB via client connection
|
||||
send("SUB foo 22\r\n")
|
||||
|
||||
|
||||
@@ -364,27 +364,26 @@ func TestTLSGatewaysCertificateCNBasedAuth(t *testing.T) {
|
||||
optsC.Gateway.Name = "C"
|
||||
optsC.Gateway.Port = 9997
|
||||
|
||||
routes := make([]*url.URL, 3)
|
||||
routes[0] = gwA
|
||||
routes[1] = gwB
|
||||
routes[2] = gwC
|
||||
gateways := make([]*server.RemoteGatewayOpts, 3)
|
||||
gateways[0] = &server.RemoteGatewayOpts{
|
||||
Name: optsA.Gateway.Name,
|
||||
URLs: routes,
|
||||
URLs: []*url.URL{gwA},
|
||||
}
|
||||
gateways[1] = &server.RemoteGatewayOpts{
|
||||
Name: optsB.Gateway.Name,
|
||||
URLs: routes,
|
||||
URLs: []*url.URL{gwB},
|
||||
}
|
||||
gateways[2] = &server.RemoteGatewayOpts{
|
||||
Name: optsC.Gateway.Name,
|
||||
URLs: routes,
|
||||
URLs: []*url.URL{gwC},
|
||||
}
|
||||
optsA.Gateway.Gateways = gateways
|
||||
optsB.Gateway.Gateways = gateways
|
||||
optsC.Gateway.Gateways = gateways
|
||||
|
||||
server.SetGatewaysSolicitDelay(100 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user