Merge pull request #843 from nats-io/conn_reporting

Added server version and cluster name to statsz. Fixed account connection reporting.
This commit is contained in:
Derek Collison
2018-12-06 11:51:01 -08:00
committed by GitHub
4 changed files with 148 additions and 21 deletions

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.0.0-alpha"
VERSION = "2.0.0-beta.1"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -98,9 +98,11 @@ type accNumConnsReq struct {
}
type ServerInfo struct {
Host string `json:"host"`
ID string `json:"id"`
Seq uint64 `json:"seq"`
Host string `json:"host"`
ID string `json:"id"`
Cluster string `json:"cluster,omitempty"`
Version string `json:"ver"`
Seq uint64 `json:"seq"`
}
// ClientInfo is detailed information about the client forming a connection.
@@ -113,6 +115,7 @@ type ClientInfo struct {
Name string `json:"name,omitempty"`
Lang string `json:"lang,omitempty"`
Version string `json:"ver,omitempty"`
RTT string `json:"rtt,omitempty"`
Stop *time.Time `json:"stop,omitempty"`
}
@@ -183,6 +186,10 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
id := s.info.ID
host := s.info.Host
seqp := &s.sys.seq
var cluster string
if s.gateway.enabled {
cluster = s.getGatewayName()
}
s.mu.Unlock()
for s.eventsRunning() {
@@ -192,8 +199,10 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
case pm := <-sendq:
if pm.si != nil {
pm.si.Host = host
pm.si.Cluster = cluster
pm.si.ID = id
pm.si.Seq = seq
pm.si.Version = VERSION
}
var b []byte
if pm.msg != nil {
@@ -511,14 +520,8 @@ func (s *Server) shutdownEventing() {
a.mu.Lock()
a.nrclients = 0
// Now clear state
if a.etmr != nil {
a.etmr.Stop()
a.etmr = nil
}
if a.ctmr != nil {
a.ctmr.Stop()
a.ctmr = nil
}
clearTimer(&a.etmr)
clearTimer(&a.ctmr)
a.clients = nil
a.strack = nil
a.mu.Unlock()
@@ -623,20 +626,14 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
if !s.eventsEnabled() || a == nil || a == s.gacc {
return
}
// Update timer first
a.mu.Lock()
// Check to see if we have an HB running and update.
if a.ctmr == nil {
a.etmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) })
} else {
a.etmr.Reset(eventsHBInterval)
}
// If no limits set, don't update, no need to.
if a.mconns == 0 {
a.mu.Unlock()
return
}
// Build event with account name and number of local clients.
m := accNumConns{
Account: a.Name,
@@ -644,7 +641,21 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
}
a.mu.Unlock()
s.sendInternalMsg(subj, "", &m.Server, &m)
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
// Set timer to fire again unless we are at zero.
a.mu.Lock()
if len(a.clients) == 0 {
clearTimer(&a.etmr)
} else {
// Check to see if we have an HB running and update.
if a.ctmr == nil {
a.etmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) })
} else {
a.etmr.Reset(eventsHBInterval)
}
}
a.mu.Unlock()
}
// accConnsUpdate is called whenever there is a change to the account's
@@ -715,6 +726,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
Name: c.opts.Name,
Lang: c.opts.Lang,
Version: c.opts.Version,
RTT: c.getRTT(),
},
Sent: DataStats{
Msgs: c.inMsgs,

View File

@@ -97,6 +97,11 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey
optsA.TrustedKeys = []string{pub}
optsA.AccountResolver = mr
optsA.SystemAccount = apub
// Add in dummy gateway
optsA.Gateway.Name = "TEST CLUSTER 22"
optsA.Gateway.Host = "127.0.0.1"
optsA.Gateway.Port = -1
optsA.gatewaysSolicitDelay = 30 * time.Second
sa := RunServer(optsA)
@@ -178,7 +183,6 @@ func TestSystemAccountNewConnection(t *testing.T) {
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
@@ -385,6 +389,107 @@ func TestSystemAccountInternalSubscriptions(t *testing.T) {
}
}
func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) {
sa, _, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Normal Account
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 4 // Limit to 4 connections.
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, jwt)
// Listen for updates to the new account connection activity.
received := make(chan *nats.Msg, 10)
cb := func(sub *subscription, subject, reply string, msg []byte) {
copy := append([]byte(nil), msg...)
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
}
subj := fmt.Sprintf(accConnsEventSubj, pub)
sub, err := sa.sysSubscribe(subj, cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
}
defer sa.sysUnsubscribe(sub)
// Create a few users on the new account.
clients := []*nats.Conn{}
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
for i := 0; i < 4; i++ {
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
clients = append(clients, nc)
}
// Wait for all 4 notifications.
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if len(received) == 4 {
return nil
}
return fmt.Errorf("Not enough messages, %d vs 4", len(received))
})
// Now lookup the account doing the events on sb.
acc, _ := sb.LookupAccount(pub)
// Make sure we have the timer running.
acc.mu.RLock()
etmr := acc.etmr
acc.mu.RUnlock()
if etmr == nil {
t.Fatalf("Expected event timer for acc conns to be running")
}
// Now close all of the connections.
for _, nc := range clients {
nc.Close()
}
// Wait for all 4 notifications.
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if len(received) == 4 {
return nil
}
return fmt.Errorf("Not enough messages, %d vs 4", len(received))
})
// Drain the messages.
for i := 0; i < 7; i++ {
<-received
}
// Check last one.
msg := <-received
m := accNumConns{}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling account connections request message: %v", err)
}
if m.Conns != 0 {
t.Fatalf("Expected Conns to be 0, got %d", m.Conns)
}
// Should not receive any more messages..
select {
case <-received:
t.Fatalf("Did not expect a message here")
case <-time.After(50 * time.Millisecond):
break
}
// Make sure we have the timer is NOT running.
acc.mu.RLock()
etmr = acc.etmr
acc.mu.RUnlock()
if etmr != nil {
t.Fatalf("Expected event timer for acc conns to NOT be running after reaching zero local clients")
}
}
func TestSystemAccountConnectionLimits(t *testing.T) {
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
@@ -912,6 +1017,13 @@ func TestServerEventStatsZ(t *testing.T) {
if m.Server.ID != sa.ID() {
t.Fatalf("Did not match IDs")
}
if m.Server.Cluster != "TEST CLUSTER 22" {
t.Fatalf("Did not match cluster name")
}
if m.Server.Version != VERSION {
t.Fatalf("Did not match server version")
}
if m.Stats.Connections != 1 {
t.Fatalf("Did not match connections of 1, got %d", m.Stats.Connections)
}

View File

@@ -298,6 +298,9 @@ func nextServerOpts(opts *Options) *Options {
nopts.Port = -1
nopts.Cluster.Port = -1
nopts.HTTPPort = -1
if nopts.Gateway.Name != "" {
nopts.Gateway.Port = -1
}
return &nopts
}