From 911e7ef35dc19e69274b4f65b1f286cd7c962f1e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 29 Apr 2020 19:32:44 -0700 Subject: [PATCH] Add additional fields to client info for latency Signed-off-by: Derek Collison --- server/accounts.go | 31 +++++++++++++++++++------------ server/client.go | 24 ++++++++++++++++-------- server/events.go | 22 ++++++---------------- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 38a82363..51587aeb 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -724,7 +724,10 @@ func (a *Account) IsExportServiceTracking(service string) bool { } // ServiceLatency is the JSON message sent out in response to latency tracking for -// an accounts exported services. +// an accounts exported services. Additional client info is available in requestor +// and responder. Note that for a requestor, the only information shared by default +// is the RTT used to calculate the total latency. The requestor's account can +// designate to share the additional information in the service import. type ServiceLatency struct { Status int `json:"status"` Error string `json:"description,omitempty"` @@ -741,12 +744,16 @@ type ServiceLatency struct { // to calculate the total latency. The requestor's account can designate to share // the additional information in the service import. type LatencyClient struct { - User string `json:"user,omitempty"` - Name string `json:"name,omitempty"` - RTT time.Duration `json:"rtt"` - IP string `json:"ip"` - CID uint64 `json:"cid"` - Server string `json:"server"` + Account string `json:"acc"` + RTT time.Duration `json:"rtt"` + Start time.Time `json:"start,omitempty"` + User string `json:"user,omitempty"` + Name string `json:"name,omitempty"` + Lang string `json:"lang,omitempty"` + Version string `json:"ver,omitempty"` + IP string `json:"ip,omitempty"` + CID uint64 `json:"cid,omitempty"` + Server string `json:"server,omitempty"` } // NATSTotalTime is a helper function that totals the NATS latencies. @@ -803,7 +810,7 @@ func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *cl sl := &ServiceLatency{ Status: 400, Error: "Bad Request", - Requestor: requestor.getLatencyInfo(si.share), + Requestor: requestor.getClientInfo(si.share), } sl.RequestStart = time.Now().Add(-sl.Requestor.RTT).UTC() a.sendLatencyResult(si, sl) @@ -817,7 +824,7 @@ func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) { Error: "Request Timeout", } if si.rc != nil { - sl.Requestor = si.rc.getLatencyInfo(si.share) + sl.Requestor = si.rc.getClientInfo(si.share) } sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() a.sendLatencyResult(si, sl) @@ -826,7 +833,7 @@ func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) { func (a *Account) sendBackendErrorTrackingLatency(si *serviceImport, reason rsiReason) { sl := &ServiceLatency{} if si.rc != nil { - sl.Requestor = si.rc.getLatencyInfo(si.share) + sl.Requestor = si.rc.getClientInfo(si.share) } sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() if reason == rsiNoDelivery { @@ -854,8 +861,8 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool sl := &ServiceLatency{ Status: 200, - Requestor: requestor.getLatencyInfo(si.share), - Responder: responder.getLatencyInfo(true), + Requestor: requestor.getClientInfo(si.share), + Responder: responder.getClientInfo(true), } sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() sl.ServiceLatency = serviceRTT - sl.Responder.RTT diff --git a/server/client.go b/server/client.go index b226a027..17ea903a 100644 --- a/server/client.go +++ b/server/client.go @@ -1388,7 +1388,6 @@ func (c *client) processConnect(arg []byte) error { // Estimate RTT to start. if c.kind == CLIENT { c.rtt = computeRTT(c.start) - if c.srv != nil { c.clearPingTimer() c.srv.setFirstPingTimer(c) @@ -2873,7 +2872,7 @@ func (c *client) processInboundClientMsg(msg []byte) bool { sl := &rl.M2 // Fill this in and send it off to the other side. sl.Status = 200 - sl.Responder = c.getLatencyInfo(true) + sl.Responder = c.getClientInfo(true) sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT sanitizeLatencyMetric(sl) @@ -2969,7 +2968,7 @@ func (c *client) handleGWReplyMap(msg []byte) bool { sl := &rl.M2 // Fill this in and send it off to the other side. sl.Status = 200 - sl.Responder = c.getLatencyInfo(true) + sl.Responder = c.getClientInfo(true) sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT sanitizeLatencyMetric(sl) @@ -3043,7 +3042,8 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt si.acc.sendBadRequestTrackingLatency(si, c) } - // Send tracking info here if we are tracking this request/response. + // Send tracking info here if we are tracking this response. + // This is always a response. var didSendTL bool if si.tracking { didSendTL = acc.sendTrackingLatency(si, c) @@ -3973,20 +3973,28 @@ func (c *client) pruneClosedSubFromPerAccountCache() { } } -// Grabs the information for latency reporting. -func (c *client) getLatencyInfo(detailed bool) LatencyClient { +// Grabs the information for this client. +func (c *client) getClientInfo(detailed bool) LatencyClient { var lc LatencyClient if c == nil || c.kind != CLIENT { return lc } + // Server name. Defaults to server ID if not set explicitly. sn := c.srv.Name() + c.mu.Lock() + // Defaults for all are RTT and Account. + lc.Account = accForClient(c) lc.RTT = c.rtt + // Detailed is opt in. if detailed { - lc.Name = c.opts.Name - lc.User = c.getRawAuthUser() + lc.Start = c.start.UTC() lc.IP = c.host lc.CID = c.cid + lc.Name = c.opts.Name + lc.User = c.getRawAuthUser() + lc.Lang = c.opts.Lang + lc.Version = c.opts.Version lc.Server = sn } c.mu.Unlock() diff --git a/server/events.go b/server/events.go index f94377fb..3126b641 100644 --- a/server/events.go +++ b/server/events.go @@ -1,4 +1,4 @@ -// Copyright 2018-2019 The NATS Authors +// Copyright 2018-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -157,6 +157,7 @@ type ClientInfo struct { Lang string `json:"lang,omitempty"` Version string `json:"ver,omitempty"` RTT string `json:"rtt,omitempty"` + Server string `json:"server,omitempty"` Stop *time.Time `json:"stop,omitempty"` } @@ -1021,7 +1022,7 @@ func (s *Server) accountConnectEvent(c *client) { Host: c.host, ID: c.cid, Account: accForClient(c), - User: nameForClient(c), + User: c.getRawAuthUser(), Name: c.opts.Name, Lang: c.opts.Lang, Version: c.opts.Version, @@ -1063,7 +1064,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) Host: c.host, ID: c.cid, Account: accForClient(c), - User: nameForClient(c), + User: c.getRawAuthUser(), Name: c.opts.Name, Lang: c.opts.Lang, Version: c.opts.Version, @@ -1105,7 +1106,7 @@ func (s *Server) sendAuthErrorEvent(c *client) { Host: c.host, ID: c.cid, Account: accForClient(c), - User: nameForClient(c), + User: c.getRawAuthUser(), Name: c.opts.Name, Lang: c.opts.Lang, Version: c.opts.Version, @@ -1242,10 +1243,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st } } - // Calculate the correct latency given M1 and M2. - // M2 ServiceLatency is correct, so use that. - // M1 TotalLatency is correct, so use that. - // Will use those to back into NATS latency. + // Calculate the correct latencies given M1 and M2. m1.merge(&m2) // Clear the requesting client since we send the result here. @@ -1469,14 +1467,6 @@ func (s *Server) nsubsRequest(sub *subscription, _ *client, subject, reply strin s.sendInternalMsgLocked(reply, _EMPTY_, nil, nsubs) } -// Helper to grab name for a client. -func nameForClient(c *client) string { - if c.user != nil { - return c.user.Nkey - } - return "N/A" -} - // Helper to grab account name for a client. func accForClient(c *client) string { if c.acc != nil {