From c83d7f88517543dadfd5bb80c1301e6d78a90b8d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 7 Dec 2018 06:02:34 -0800 Subject: [PATCH] Support server ping for statusz Signed-off-by: Derek Collison --- server/const.go | 2 +- server/events.go | 72 ++++++++++++++++++++++++++++++++++--------- server/events_test.go | 41 ++++++++++++++++++++++-- 3 files changed, 97 insertions(+), 18 deletions(-) diff --git a/server/const.go b/server/const.go index ebb765aa..5a69a55b 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.0.0-beta.2" + VERSION = "2.0.0-beta.4" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/events.go b/server/events.go index df2a72ef..614c7a28 100644 --- a/server/events.go +++ b/server/events.go @@ -16,6 +16,7 @@ package server import ( "encoding/json" "fmt" + "math/rand" "strconv" "strings" "sync" @@ -26,15 +27,16 @@ import ( ) const ( - connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT" - disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT" - accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS" - accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE" - connsRespSubj = "$SYS._INBOX_.%s" - accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS" - shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN" - serverStatsSubj = "$SYS.SERVER.%s.STATSZ" - serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ" + connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT" + disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT" + accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS" + accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE" + connsRespSubj = "$SYS._INBOX_.%s" + accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS" + shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN" + serverStatsSubj = "$SYS.SERVER.%s.STATSZ" + serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ" + serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" shutdownEventTokens = 4 serverSubjectIndex = 2 @@ -98,11 +100,12 @@ type accNumConnsReq struct { } type ServerInfo struct { - Host string `json:"host"` - ID string `json:"id"` - Cluster string `json:"cluster,omitempty"` - Version string `json:"ver"` - Seq uint64 `json:"seq"` + Host string `json:"host"` + ID string `json:"id"` + Cluster string `json:"cluster,omitempty"` + Version string `json:"ver"` + Seq uint64 `json:"seq"` + Time time.Time `json:"time"` } // ClientInfo is detailed information about the client forming a connection. @@ -203,6 +206,7 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) { pm.si.ID = id pm.si.Seq = seq pm.si.Version = VERSION + pm.si.Time = time.Now() } var b []byte if pm.msg != nil { @@ -265,6 +269,16 @@ func (s *Server) eventsRunning() bool { return s.running && s.eventsEnabled() } +// EventsEnabled will report if the server has internal events enabled via +// a defined system account. +func (s *Server) EventsEnabled() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.eventsEnabled() +} + +// eventsEnabled will report if events are enabled. +// Lock should be held. func (s *Server) eventsEnabled() bool { return s.sys != nil && s.sys.client != nil && s.sys.account != nil } @@ -420,11 +434,15 @@ func (s *Server) initEventTracking() { s.Errorf("Error setting up internal tracking: %v", err) } - // Listen for requests for our statz + // Listen for requests for our statsz. subject = fmt.Sprintf(serverStatsReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.statszReq); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } + // Listen for ping messages that will be sent to all servers for statsz. + if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.statszPing); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } } // accountClaimUpdate will receive claim updates for accounts. @@ -549,6 +567,30 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by } } +// random back off interval for broadcast statsz ping requests. +const randomBackoff = 250 * time.Millisecond + +// statszPing will handle global requests for our server statsz. This will be a +// broadcast msg so we want to be mindful of that. We will do a random backoff and +// process in a separate go routine. +func (s *Server) statszPing(sub *subscription, subject, reply string, msg []byte) { + if !s.EventsEnabled() || reply == _EMPTY_ { + return + } + s.startGoRoutine(func() { + defer s.grWG.Done() + delay := time.Duration(rand.Intn(int(randomBackoff))) + select { + case <-time.After(delay): + s.mu.Lock() + defer s.mu.Unlock() + s.sendStatsz(reply) + case <-s.quitCh: + return + } + }) +} + // statszReq is a request for us to respond with current statz. func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() diff --git a/server/events_test.go b/server/events_test.go index a18355d3..3918485a 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -963,7 +963,7 @@ func TestSystemAccountWithGateways(t *testing.T) { nca.Flush() // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 7, sa) + checkExpectedSubs(t, 8, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) @@ -988,7 +988,7 @@ func TestSystemAccountWithGateways(t *testing.T) { t.Fatalf("Expected %q for account, got %q", accName, account) } } -func TestServerEventStatsZ(t *testing.T) { +func TestServerEventsStatsZ(t *testing.T) { sa, optsA, sb, _, akp := runTrustedCluster(t) defer sa.Shutdown() defer sb.Shutdown() @@ -1107,3 +1107,40 @@ func TestServerEventStatsZ(t *testing.T) { t.Fatalf("Expected a route, but got %d", lr) } } + +func TestServerEventsPingStatsZ(t *testing.T) { + sa, _, sb, optsB, akp := runTrustedCluster(t) + defer sa.Shutdown() + defer sb.Shutdown() + + url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port) + nc, err := nats.Connect(url, createUserCreds(t, sb, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + reply := nc.NewRespInbox() + sub, _ := nc.SubscribeSync(reply) + + nc.PublishRequest(serverStatsPingReqSubj, reply, nil) + + // Make sure its a statsz + m := ServerStatsMsg{} + + // Receive both manually. + msg, err := sub.NextMsg(randomBackoff * 2) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + msg, err = sub.NextMsg(randomBackoff) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } +}