From 2618d39a36d33bc2f0cc2cce2491d9205b17b8be Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 3 Dec 2018 20:59:32 -0700 Subject: [PATCH] Allow system messages to cross gateways. Removed the code getting matching subscriptions and trying to exclude non internal interest since as soon as there is routing and/or gateway, it is likely that server would end-up generating the payload and sending. May need to revisit. Signed-off-by: Ivan Kozlovic --- server/events.go | 73 ++++++--------------------------- server/events_test.go | 94 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 100 insertions(+), 67 deletions(-) diff --git a/server/events.go b/server/events.go index d5f5b580..29597780 100644 --- a/server/events.go +++ b/server/events.go @@ -111,7 +111,6 @@ type DataStats struct { // Used for internally queueing up messages that the server wants to send. type pubMsg struct { - r *SublistResult sub string rply string si *ServerInfo @@ -136,7 +135,6 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) { return } c := s.sys.client - acc := s.sys.account sendq := s.sys.sendq id := s.info.ID host := s.info.Host @@ -161,13 +159,10 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) { c.pa.subject = []byte(pm.sub) c.pa.size = len(b) c.pa.szb = []byte(strconv.FormatInt(int64(len(b)), 10)) + c.pa.reply = []byte(pm.rply) // Add in NL b = append(b, _CRLF_...) - // Check to see if we need to map/route to another account. - if acc.imports.services != nil { - c.checkForImportServices(acc, b) - } - c.processMsgResults(acc, pm.r, b, c.pa.subject, []byte(pm.rply), nil) + c.processInboundClientMsg(b) c.flushClients() // See if we are doing graceful shutdown. if pm.last { @@ -187,7 +182,6 @@ func (s *Server) sendShutdownEvent() { return } subj := fmt.Sprintf(shutdownEventSubj, s.info.ID) - r := s.sys.account.sl.Match(subj) sendq := s.sys.sendq // Stop any more messages from queueing up. s.sys.sendq = nil @@ -195,19 +189,19 @@ func (s *Server) sendShutdownEvent() { s.sys.subs = nil s.mu.Unlock() // Send to the internal queue and mark as last. - sendq <- &pubMsg{r, subj, _EMPTY_, nil, nil, true} + sendq <- &pubMsg{subj, _EMPTY_, nil, nil, true} } // This will queue up a message to be sent. // Assumes lock is held on entry. -func (s *Server) sendInternalMsg(r *SublistResult, sub, rply string, si *ServerInfo, msg interface{}) { +func (s *Server) sendInternalMsg(sub, rply string, si *ServerInfo, msg interface{}) { if s.sys == nil || s.sys.sendq == nil { return } sendq := s.sys.sendq // Don't hold lock while placing on the channel. s.mu.Unlock() - sendq <- &pubMsg{r, sub, rply, si, msg, false} + sendq <- &pubMsg{sub, rply, si, msg, false} s.mu.Lock() } @@ -469,17 +463,15 @@ func (s *Server) enableAccountTracking(a *Account) { if a == nil || !s.eventsEnabled() || a == s.sys.account { return } - acc := s.sys.account - sc := s.sys.client + + // TODO(ik): Generate payload although message may not be sent. + // May need to ensure we do so only if there is a known interest. + // This can get complicated with gateways. subj := fmt.Sprintf(accConnsReqSubj, a.Name) - r := acc.sl.Match(subj) - if noOutSideInterest(sc, r) { - return - } reply := fmt.Sprintf(connsRespSubj, s.info.ID) m := accNumConnsReq{Account: a.Name} - s.sendInternalMsg(r, subj, reply, &m.Server, &m) + s.sendInternalMsg(subj, reply, &m.Server, &m) } // FIXME(dlc) - make configurable. @@ -492,13 +484,6 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) { if !s.eventsEnabled() || a == nil || a == s.sys.account || a == s.gacc { return } - acc := s.sys.account - sc := s.sys.client - - r := acc.sl.Match(subj) - if noOutSideInterest(sc, r) { - return - } a.mu.Lock() // If no limits set, don't update, no need to. @@ -519,7 +504,7 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) { } a.mu.Unlock() - s.sendInternalMsg(r, subj, "", &m.Server, &m) + s.sendInternalMsg(subj, "", &m.Server, &m) } // accConnsUpdate is called whenever there is a change to the account's @@ -542,15 +527,9 @@ func (s *Server) accountConnectEvent(c *client) { s.mu.Unlock() return } - acc := s.sys.account - sc := s.sys.client s.mu.Unlock() subj := fmt.Sprintf(connectEventSubj, c.acc.Name) - r := acc.sl.Match(subj) - if noOutSideInterest(sc, r) { - return - } c.mu.Lock() m := ConnectEventMsg{ @@ -568,7 +547,7 @@ func (s *Server) accountConnectEvent(c *client) { c.mu.Unlock() s.mu.Lock() - s.sendInternalMsg(r, subj, "", &m.Server, &m) + s.sendInternalMsg(subj, "", &m.Server, &m) s.mu.Unlock() } @@ -580,15 +559,9 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) s.mu.Unlock() return } - acc := s.sys.account - sc := s.sys.client s.mu.Unlock() subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name) - r := acc.sl.Match(subj) - if noOutSideInterest(sc, r) { - return - } c.mu.Lock() m := DisconnectEventMsg{ @@ -616,7 +589,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) c.mu.Unlock() s.mu.Lock() - s.sendInternalMsg(r, subj, "", &m.Server, &m) + s.sendInternalMsg(subj, "", &m.Server, &m) s.mu.Unlock() } @@ -674,26 +647,6 @@ func (s *Server) sysUnsubscribe(sub *subscription) { c.unsubscribe(acc, sub, true) } -func noOutSideInterest(sc *client, r *SublistResult) bool { - if sc == nil || r == nil { - return true - } - nsubs := len(r.psubs) + len(r.qsubs) - if nsubs == 0 { - return true - } - // We will always be no-echo but will determine that on delivery. - // Here we try to avoid generating the payload if there is only us. - // We only check normal subs. If we introduce queue subs into the - // internal subscribers we should add in the check. - for _, sub := range r.psubs { - if sub.client != sc { - return false - } - } - return true -} - func (c *client) flushClients() { last := time.Now() for cp := range c.pcd { diff --git a/server/events_test.go b/server/events_test.go index 4756ccd2..ee091403 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -105,6 +105,46 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options) { return sa, optsA, sb, optsB } +func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) { + t.Helper() + + kp, _ := nkeys.FromSeed(oSeed) + pub, _ := kp.PublicKey() + + mr := &MemAccResolver{} + + // Now create a system account. + // NOTE: This can NOT be shared directly between servers. + // Set via server options. + okp, _ := nkeys.FromSeed(oSeed) + akp, _ := nkeys.CreateAccount() + apub, _ := akp.PublicKey() + nac := jwt.NewAccountClaims(apub) + jwt, _ := nac.Encode(okp) + + mr.Store(apub, jwt) + + optsA := testDefaultOptionsForGateway("A") + optsA.Cluster.Host = "127.0.0.1" + optsA.TrustedKeys = []string{pub} + optsA.AccountResolver = mr + optsA.SystemAccount = apub + + sa := RunServer(optsA) + + optsB := testGatewayOptionsFromToWithServers(t, "B", "A", sa) + optsB.TrustedKeys = []string{pub} + optsB.AccountResolver = mr + optsB.SystemAccount = apub + + sb := RunServer(optsB) + + waitForOutboundGateways(t, sa, 1, time.Second) + waitForOutboundGateways(t, sb, 1, time.Second) + + return sa, optsA, sb, optsB, akp +} + func TestSystemAccount(t *testing.T) { s, _ := runTrustedServer(t) defer s.Shutdown() @@ -141,14 +181,16 @@ func TestSystemAccountNewConnection(t *testing.T) { } defer ncs.Close() - sub, _ := ncs.SubscribeSync("$SYS.ACCOUNT.>") - defer sub.Unsubscribe() - ncs.Flush() - - // We can't hear ourselves, so we need to create a second client to + // We may not be able to hear ourselves (if the event is processed + // before we create the sub), so we need to create a second client to // trigger the connect/disconnect events. acc2, akp2 := createAccount(s) + // Be explicit to only receive the event for acc2. + sub, _ := ncs.SubscribeSync(fmt.Sprintf("$SYS.ACCOUNT.%s.>", acc2.Name)) + defer sub.Unsubscribe() + ncs.Flush() + nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST EVENTS")) if err != nil { t.Fatalf("Error on connect: %v", err) @@ -327,9 +369,8 @@ func TestSystemAccountInternalSubscriptions(t *testing.T) { // Now make sure we do not hear ourselves. We optimize this for internally // generated messages. - r := SublistResult{psubs: []*subscription{sub}} s.mu.Lock() - s.sendInternalMsg(&r, "foo", "", nil, msg.Data) + s.sendInternalMsg("foo", "", nil, msg.Data) s.mu.Unlock() select { @@ -736,3 +777,42 @@ func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) { t.Fatalf("Expected all new clients to be closed, only got %d of 5", closed) } } + +func TestSystemAccountWithGateways(t *testing.T) { + sa, oa, sb, ob, akp := runTrustedGateways(t) + defer sa.Shutdown() + defer sb.Shutdown() + + // Create a client on A that will subscribe on $SYS.ACCOUNT.> + urla := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port) + nca := natsConnect(t, urla, createUserCreds(t, sa, akp)) + defer nca.Close() + + sub, _ := nca.SubscribeSync("$SYS.ACCOUNT.>") + defer sub.Unsubscribe() + nca.Flush() + checkExpectedSubs(t, 6, sa) + + // Create a client on B and see if we receive the event + urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) + ncb := natsConnect(t, urlb, createUserCreds(t, sb, akp), nats.Name("TEST EVENTS")) + defer ncb.Close() + + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + // Basic checks, could expand on that... + accName := sa.SystemAccount().Name + if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", accName)) { + t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT..CONNECT", msg.Subject) + } + tokens := strings.Split(msg.Subject, ".") + if len(tokens) < 4 { + t.Fatalf("Expected 4 tokens, got %d", len(tokens)) + } + account := tokens[2] + if account != accName { + t.Fatalf("Expected %q for account, got %q", accName, account) + } +}