Allow system messages to cross gateways.

Removed the code getting matching subscriptions and trying
to exclude non internal interest since as soon as there is
routing and/or gateway, it is likely that server would end-up
generating the payload and sending. May need to revisit.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2018-12-03 20:59:32 -07:00
parent 1011339375
commit 2618d39a36
2 changed files with 100 additions and 67 deletions

View File

@@ -111,7 +111,6 @@ type DataStats struct {
// Used for internally queueing up messages that the server wants to send.
type pubMsg struct {
r *SublistResult
sub string
rply string
si *ServerInfo
@@ -136,7 +135,6 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
return
}
c := s.sys.client
acc := s.sys.account
sendq := s.sys.sendq
id := s.info.ID
host := s.info.Host
@@ -161,13 +159,10 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
c.pa.subject = []byte(pm.sub)
c.pa.size = len(b)
c.pa.szb = []byte(strconv.FormatInt(int64(len(b)), 10))
c.pa.reply = []byte(pm.rply)
// Add in NL
b = append(b, _CRLF_...)
// Check to see if we need to map/route to another account.
if acc.imports.services != nil {
c.checkForImportServices(acc, b)
}
c.processMsgResults(acc, pm.r, b, c.pa.subject, []byte(pm.rply), nil)
c.processInboundClientMsg(b)
c.flushClients()
// See if we are doing graceful shutdown.
if pm.last {
@@ -187,7 +182,6 @@ func (s *Server) sendShutdownEvent() {
return
}
subj := fmt.Sprintf(shutdownEventSubj, s.info.ID)
r := s.sys.account.sl.Match(subj)
sendq := s.sys.sendq
// Stop any more messages from queueing up.
s.sys.sendq = nil
@@ -195,19 +189,19 @@ func (s *Server) sendShutdownEvent() {
s.sys.subs = nil
s.mu.Unlock()
// Send to the internal queue and mark as last.
sendq <- &pubMsg{r, subj, _EMPTY_, nil, nil, true}
sendq <- &pubMsg{subj, _EMPTY_, nil, nil, true}
}
// This will queue up a message to be sent.
// Assumes lock is held on entry.
func (s *Server) sendInternalMsg(r *SublistResult, sub, rply string, si *ServerInfo, msg interface{}) {
func (s *Server) sendInternalMsg(sub, rply string, si *ServerInfo, msg interface{}) {
if s.sys == nil || s.sys.sendq == nil {
return
}
sendq := s.sys.sendq
// Don't hold lock while placing on the channel.
s.mu.Unlock()
sendq <- &pubMsg{r, sub, rply, si, msg, false}
sendq <- &pubMsg{sub, rply, si, msg, false}
s.mu.Lock()
}
@@ -469,17 +463,15 @@ func (s *Server) enableAccountTracking(a *Account) {
if a == nil || !s.eventsEnabled() || a == s.sys.account {
return
}
acc := s.sys.account
sc := s.sys.client
// TODO(ik): Generate payload although message may not be sent.
// May need to ensure we do so only if there is a known interest.
// This can get complicated with gateways.
subj := fmt.Sprintf(accConnsReqSubj, a.Name)
r := acc.sl.Match(subj)
if noOutSideInterest(sc, r) {
return
}
reply := fmt.Sprintf(connsRespSubj, s.info.ID)
m := accNumConnsReq{Account: a.Name}
s.sendInternalMsg(r, subj, reply, &m.Server, &m)
s.sendInternalMsg(subj, reply, &m.Server, &m)
}
// FIXME(dlc) - make configurable.
@@ -492,13 +484,6 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
if !s.eventsEnabled() || a == nil || a == s.sys.account || a == s.gacc {
return
}
acc := s.sys.account
sc := s.sys.client
r := acc.sl.Match(subj)
if noOutSideInterest(sc, r) {
return
}
a.mu.Lock()
// If no limits set, don't update, no need to.
@@ -519,7 +504,7 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
}
a.mu.Unlock()
s.sendInternalMsg(r, subj, "", &m.Server, &m)
s.sendInternalMsg(subj, "", &m.Server, &m)
}
// accConnsUpdate is called whenever there is a change to the account's
@@ -542,15 +527,9 @@ func (s *Server) accountConnectEvent(c *client) {
s.mu.Unlock()
return
}
acc := s.sys.account
sc := s.sys.client
s.mu.Unlock()
subj := fmt.Sprintf(connectEventSubj, c.acc.Name)
r := acc.sl.Match(subj)
if noOutSideInterest(sc, r) {
return
}
c.mu.Lock()
m := ConnectEventMsg{
@@ -568,7 +547,7 @@ func (s *Server) accountConnectEvent(c *client) {
c.mu.Unlock()
s.mu.Lock()
s.sendInternalMsg(r, subj, "", &m.Server, &m)
s.sendInternalMsg(subj, "", &m.Server, &m)
s.mu.Unlock()
}
@@ -580,15 +559,9 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
s.mu.Unlock()
return
}
acc := s.sys.account
sc := s.sys.client
s.mu.Unlock()
subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name)
r := acc.sl.Match(subj)
if noOutSideInterest(sc, r) {
return
}
c.mu.Lock()
m := DisconnectEventMsg{
@@ -616,7 +589,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
c.mu.Unlock()
s.mu.Lock()
s.sendInternalMsg(r, subj, "", &m.Server, &m)
s.sendInternalMsg(subj, "", &m.Server, &m)
s.mu.Unlock()
}
@@ -674,26 +647,6 @@ func (s *Server) sysUnsubscribe(sub *subscription) {
c.unsubscribe(acc, sub, true)
}
func noOutSideInterest(sc *client, r *SublistResult) bool {
if sc == nil || r == nil {
return true
}
nsubs := len(r.psubs) + len(r.qsubs)
if nsubs == 0 {
return true
}
// We will always be no-echo but will determine that on delivery.
// Here we try to avoid generating the payload if there is only us.
// We only check normal subs. If we introduce queue subs into the
// internal subscribers we should add in the check.
for _, sub := range r.psubs {
if sub.client != sc {
return false
}
}
return true
}
func (c *client) flushClients() {
last := time.Now()
for cp := range c.pcd {

View File

@@ -105,6 +105,46 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options) {
return sa, optsA, sb, optsB
}
func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
t.Helper()
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
mr := &MemAccResolver{}
// Now create a system account.
// NOTE: This can NOT be shared directly between servers.
// Set via server options.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
jwt, _ := nac.Encode(okp)
mr.Store(apub, jwt)
optsA := testDefaultOptionsForGateway("A")
optsA.Cluster.Host = "127.0.0.1"
optsA.TrustedKeys = []string{pub}
optsA.AccountResolver = mr
optsA.SystemAccount = apub
sa := RunServer(optsA)
optsB := testGatewayOptionsFromToWithServers(t, "B", "A", sa)
optsB.TrustedKeys = []string{pub}
optsB.AccountResolver = mr
optsB.SystemAccount = apub
sb := RunServer(optsB)
waitForOutboundGateways(t, sa, 1, time.Second)
waitForOutboundGateways(t, sb, 1, time.Second)
return sa, optsA, sb, optsB, akp
}
func TestSystemAccount(t *testing.T) {
s, _ := runTrustedServer(t)
defer s.Shutdown()
@@ -141,14 +181,16 @@ func TestSystemAccountNewConnection(t *testing.T) {
}
defer ncs.Close()
sub, _ := ncs.SubscribeSync("$SYS.ACCOUNT.>")
defer sub.Unsubscribe()
ncs.Flush()
// We can't hear ourselves, so we need to create a second client to
// We may not be able to hear ourselves (if the event is processed
// before we create the sub), so we need to create a second client to
// trigger the connect/disconnect events.
acc2, akp2 := createAccount(s)
// Be explicit to only receive the event for acc2.
sub, _ := ncs.SubscribeSync(fmt.Sprintf("$SYS.ACCOUNT.%s.>", acc2.Name))
defer sub.Unsubscribe()
ncs.Flush()
nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST EVENTS"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
@@ -327,9 +369,8 @@ func TestSystemAccountInternalSubscriptions(t *testing.T) {
// Now make sure we do not hear ourselves. We optimize this for internally
// generated messages.
r := SublistResult{psubs: []*subscription{sub}}
s.mu.Lock()
s.sendInternalMsg(&r, "foo", "", nil, msg.Data)
s.sendInternalMsg("foo", "", nil, msg.Data)
s.mu.Unlock()
select {
@@ -736,3 +777,42 @@ func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) {
t.Fatalf("Expected all new clients to be closed, only got %d of 5", closed)
}
}
func TestSystemAccountWithGateways(t *testing.T) {
sa, oa, sb, ob, akp := runTrustedGateways(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Create a client on A that will subscribe on $SYS.ACCOUNT.>
urla := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
nca := natsConnect(t, urla, createUserCreds(t, sa, akp))
defer nca.Close()
sub, _ := nca.SubscribeSync("$SYS.ACCOUNT.>")
defer sub.Unsubscribe()
nca.Flush()
checkExpectedSubs(t, 6, sa)
// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
ncb := natsConnect(t, urlb, createUserCreds(t, sb, akp), nats.Name("TEST EVENTS"))
defer ncb.Close()
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
// Basic checks, could expand on that...
accName := sa.SystemAccount().Name
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", accName)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
}
tokens := strings.Split(msg.Subject, ".")
if len(tokens) < 4 {
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
}
account := tokens[2]
if account != accName {
t.Fatalf("Expected %q for account, got %q", accName, account)
}
}