Adding new subscription for account conns (#1599)

This fits better with similar events
New subject is $SYS.ACCOUNT.%s.SERVER.CONNS
Old subject remains for backwards compatibiliby

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2020-09-18 17:52:23 -04:00
committed by GitHub
parent a10a2e9612
commit f76d6e38bd
2 changed files with 79 additions and 27 deletions

View File

@@ -41,7 +41,8 @@ const (
accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS"
accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
connsRespSubj = "$SYS._INBOX_.%s"
accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS"
accConnsEventSubjNew = "$SYS.ACCOUNT.%s.SERVER.CONNS"
accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
@@ -569,9 +570,12 @@ func (s *Server) initEventTracking() {
}
s.sys.inboxPre = subject
// This is for remote updates for connection accounting.
subject = fmt.Sprintf(accConnsEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
for _, subj := range []string{accConnsEventSubjOld, accConnsEventSubjNew} {
subject = fmt.Sprintf(subj, "*")
if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil {
s.Errorf("Error setting up internal tracking for %s: %v", subject, err)
}
}
// This will be for responses for account info that we send out.
subject = fmt.Sprintf(connsRespSubj, s.info.ID)
@@ -1067,23 +1071,22 @@ func (s *Server) sendLeafNodeConnectMsg(accName string) {
// sendAccConnsUpdate is called to send out our information on the
// account's local connections.
// Lock should be held on entry.
func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
if !s.eventsEnabled() || a == nil {
return
}
a.mu.RLock()
// Build event with account name and number of local clients and leafnodes.
m := AccountNumConns{
a.mu.RLock()
m := &AccountNumConns{
Account: a.Name,
Conns: a.numLocalConnections(),
LeafNodes: a.numLocalLeafNodes(),
TotalConns: a.numLocalConnections() + a.numLocalLeafNodes(),
}
a.mu.RUnlock()
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
for _, sub := range subj {
s.sendInternalMsg(sub, _EMPTY_, &m.Server, &m)
}
// Set timer to fire again unless we are at zero.
a.mu.Lock()
if a.numLocalConnections() == 0 {
@@ -1107,8 +1110,7 @@ func (s *Server) accConnsUpdate(a *Account) {
if !s.eventsEnabled() || a == nil {
return
}
subj := fmt.Sprintf(accConnsEventSubj, a.Name)
s.sendAccConnsUpdate(a, subj)
s.sendAccConnsUpdate(a, fmt.Sprintf(accConnsEventSubjOld, a.Name), fmt.Sprintf(accConnsEventSubjNew, a.Name))
}
// server lock should be held

View File

@@ -216,7 +216,24 @@ func TestSystemAccountNewConnection(t *testing.T) {
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
connsMsg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
msg, connsMsg = connsMsg, msg
}
if !strings.HasPrefix(connsMsg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
}
conns := AccountNumConns{}
if err := json.Unmarshal(connsMsg.Data, &conns); err != nil {
t.Fatalf("Error unmarshalling conns event message: %v", err)
} else if conns.Account != acc2.Name {
t.Fatalf("Wrong account in conns message: %v", conns)
} else if conns.Conns != 1 || conns.TotalConns != 1 || conns.LeafNodes != 0 {
t.Fatalf("Wrong counts in conns message: %v", conns)
}
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
}
@@ -272,7 +289,26 @@ func TestSystemAccountNewConnection(t *testing.T) {
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
connsMsg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
msg, connsMsg = connsMsg, msg
}
if !strings.HasPrefix(connsMsg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
} else if !strings.Contains(string(connsMsg.Data), `"total_conns": 0`) {
t.Fatalf("Expected event to reflect created connection, got: %s", string(msg.Data))
}
conns = AccountNumConns{}
if err := json.Unmarshal(connsMsg.Data, &conns); err != nil {
t.Fatalf("Error unmarshalling conns event message: %v", err)
} else if conns.Account != acc2.Name {
t.Fatalf("Wrong account in conns message: %v", conns)
} else if conns.Conns != 0 || conns.TotalConns != 0 || conns.LeafNodes != 0 {
t.Fatalf("Wrong counts in conns message: %v", conns)
}
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.DISCONNECT", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.DISCONNECT", msg.Subject)
}
@@ -722,7 +758,7 @@ func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) {
copy := append([]byte(nil), msg...)
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
}
subj := fmt.Sprintf(accConnsEventSubj, pub)
subj := fmt.Sprintf(accConnsEventSubjOld, pub)
sub, err := sa.sysSubscribe(subj, cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
@@ -1354,7 +1390,7 @@ func TestSystemAccountWithGateways(t *testing.T) {
// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 26, sa)
checkExpectedSubs(t, 27, sa)
// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
@@ -1365,18 +1401,32 @@ func TestSystemAccountWithGateways(t *testing.T) {
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
connsMsgA, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
connsMsgG, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if strings.HasSuffix(connsMsgA.Subject, ".CONNECT") {
msg, connsMsgA = connsMsgA, msg
} else if strings.HasSuffix(connsMsgG.Subject, ".CONNECT") {
msg, connsMsgG = connsMsgG, msg
}
if connsMsgG.Subject != "$SYS.ACCOUNT.$G.SERVER.CONNS" {
connsMsgA, connsMsgG = connsMsgG, connsMsgA
}
if connsMsgG.Subject != "$SYS.ACCOUNT.$G.SERVER.CONNS" {
t.Fatalf("Expected subject $SYS.ACCOUNT.$G.SERVER.CONNS but got %s", connsMsgG.Subject)
}
// Basic checks, could expand on that...
accName := sa.SystemAccount().Name
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", accName)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
if connsMsgA.Subject != fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", accName) {
t.Fatalf("Expected subject to be $SYS.ACCOUNT.%s.SERVER.CONNS but got: %s", accName, connsMsgA.Subject)
}
tokens := strings.Split(msg.Subject, ".")
if len(tokens) < 4 {
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
}
account := tokens[2]
if account != accName {
t.Fatalf("Expected %q for account, got %q", accName, account)
if msg.Subject != fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", accName) {
t.Fatalf("Expected subject to be $SYS.ACCOUNT.%s.CONNECT but got: %s", accName, msg.Subject)
}
}
func TestServerEventsStatsZ(t *testing.T) {
@@ -1853,7 +1903,7 @@ func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
cb := func(sub *subscription, _ *client, subject, reply string, msg []byte) {
atomic.AddInt32(&count, 1)
}
subj := fmt.Sprintf(accConnsEventSubj, pub)
subj := fmt.Sprintf(accConnsEventSubjOld, pub)
sub, err := sa.sysSubscribe(subj, cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)