mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
Add additional fields to client info for latency
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -724,7 +724,10 @@ func (a *Account) IsExportServiceTracking(service string) bool {
|
||||
}
|
||||
|
||||
// ServiceLatency is the JSON message sent out in response to latency tracking for
|
||||
// an accounts exported services.
|
||||
// an accounts exported services. Additional client info is available in requestor
|
||||
// and responder. Note that for a requestor, the only information shared by default
|
||||
// is the RTT used to calculate the total latency. The requestor's account can
|
||||
// designate to share the additional information in the service import.
|
||||
type ServiceLatency struct {
|
||||
Status int `json:"status"`
|
||||
Error string `json:"description,omitempty"`
|
||||
@@ -741,12 +744,16 @@ type ServiceLatency struct {
|
||||
// to calculate the total latency. The requestor's account can designate to share
|
||||
// the additional information in the service import.
|
||||
type LatencyClient struct {
|
||||
User string `json:"user,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
RTT time.Duration `json:"rtt"`
|
||||
IP string `json:"ip"`
|
||||
CID uint64 `json:"cid"`
|
||||
Server string `json:"server"`
|
||||
Account string `json:"acc"`
|
||||
RTT time.Duration `json:"rtt"`
|
||||
Start time.Time `json:"start,omitempty"`
|
||||
User string `json:"user,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Lang string `json:"lang,omitempty"`
|
||||
Version string `json:"ver,omitempty"`
|
||||
IP string `json:"ip,omitempty"`
|
||||
CID uint64 `json:"cid,omitempty"`
|
||||
Server string `json:"server,omitempty"`
|
||||
}
|
||||
|
||||
// NATSTotalTime is a helper function that totals the NATS latencies.
|
||||
@@ -803,7 +810,7 @@ func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *cl
|
||||
sl := &ServiceLatency{
|
||||
Status: 400,
|
||||
Error: "Bad Request",
|
||||
Requestor: requestor.getLatencyInfo(si.share),
|
||||
Requestor: requestor.getClientInfo(si.share),
|
||||
}
|
||||
sl.RequestStart = time.Now().Add(-sl.Requestor.RTT).UTC()
|
||||
a.sendLatencyResult(si, sl)
|
||||
@@ -817,7 +824,7 @@ func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) {
|
||||
Error: "Request Timeout",
|
||||
}
|
||||
if si.rc != nil {
|
||||
sl.Requestor = si.rc.getLatencyInfo(si.share)
|
||||
sl.Requestor = si.rc.getClientInfo(si.share)
|
||||
}
|
||||
sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC()
|
||||
a.sendLatencyResult(si, sl)
|
||||
@@ -826,7 +833,7 @@ func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) {
|
||||
func (a *Account) sendBackendErrorTrackingLatency(si *serviceImport, reason rsiReason) {
|
||||
sl := &ServiceLatency{}
|
||||
if si.rc != nil {
|
||||
sl.Requestor = si.rc.getLatencyInfo(si.share)
|
||||
sl.Requestor = si.rc.getClientInfo(si.share)
|
||||
}
|
||||
sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC()
|
||||
if reason == rsiNoDelivery {
|
||||
@@ -854,8 +861,8 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
|
||||
|
||||
sl := &ServiceLatency{
|
||||
Status: 200,
|
||||
Requestor: requestor.getLatencyInfo(si.share),
|
||||
Responder: responder.getLatencyInfo(true),
|
||||
Requestor: requestor.getClientInfo(si.share),
|
||||
Responder: responder.getClientInfo(true),
|
||||
}
|
||||
sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC()
|
||||
sl.ServiceLatency = serviceRTT - sl.Responder.RTT
|
||||
|
||||
@@ -1388,7 +1388,6 @@ func (c *client) processConnect(arg []byte) error {
|
||||
// Estimate RTT to start.
|
||||
if c.kind == CLIENT {
|
||||
c.rtt = computeRTT(c.start)
|
||||
|
||||
if c.srv != nil {
|
||||
c.clearPingTimer()
|
||||
c.srv.setFirstPingTimer(c)
|
||||
@@ -2873,7 +2872,7 @@ func (c *client) processInboundClientMsg(msg []byte) bool {
|
||||
sl := &rl.M2
|
||||
// Fill this in and send it off to the other side.
|
||||
sl.Status = 200
|
||||
sl.Responder = c.getLatencyInfo(true)
|
||||
sl.Responder = c.getClientInfo(true)
|
||||
sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT
|
||||
sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT
|
||||
sanitizeLatencyMetric(sl)
|
||||
@@ -2969,7 +2968,7 @@ func (c *client) handleGWReplyMap(msg []byte) bool {
|
||||
sl := &rl.M2
|
||||
// Fill this in and send it off to the other side.
|
||||
sl.Status = 200
|
||||
sl.Responder = c.getLatencyInfo(true)
|
||||
sl.Responder = c.getClientInfo(true)
|
||||
sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT
|
||||
sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT
|
||||
sanitizeLatencyMetric(sl)
|
||||
@@ -3043,7 +3042,8 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
|
||||
si.acc.sendBadRequestTrackingLatency(si, c)
|
||||
}
|
||||
|
||||
// Send tracking info here if we are tracking this request/response.
|
||||
// Send tracking info here if we are tracking this response.
|
||||
// This is always a response.
|
||||
var didSendTL bool
|
||||
if si.tracking {
|
||||
didSendTL = acc.sendTrackingLatency(si, c)
|
||||
@@ -3973,20 +3973,28 @@ func (c *client) pruneClosedSubFromPerAccountCache() {
|
||||
}
|
||||
}
|
||||
|
||||
// Grabs the information for latency reporting.
|
||||
func (c *client) getLatencyInfo(detailed bool) LatencyClient {
|
||||
// Grabs the information for this client.
|
||||
func (c *client) getClientInfo(detailed bool) LatencyClient {
|
||||
var lc LatencyClient
|
||||
if c == nil || c.kind != CLIENT {
|
||||
return lc
|
||||
}
|
||||
// Server name. Defaults to server ID if not set explicitly.
|
||||
sn := c.srv.Name()
|
||||
|
||||
c.mu.Lock()
|
||||
// Defaults for all are RTT and Account.
|
||||
lc.Account = accForClient(c)
|
||||
lc.RTT = c.rtt
|
||||
// Detailed is opt in.
|
||||
if detailed {
|
||||
lc.Name = c.opts.Name
|
||||
lc.User = c.getRawAuthUser()
|
||||
lc.Start = c.start.UTC()
|
||||
lc.IP = c.host
|
||||
lc.CID = c.cid
|
||||
lc.Name = c.opts.Name
|
||||
lc.User = c.getRawAuthUser()
|
||||
lc.Lang = c.opts.Lang
|
||||
lc.Version = c.opts.Version
|
||||
lc.Server = sn
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2018-2019 The NATS Authors
|
||||
// Copyright 2018-2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -157,6 +157,7 @@ type ClientInfo struct {
|
||||
Lang string `json:"lang,omitempty"`
|
||||
Version string `json:"ver,omitempty"`
|
||||
RTT string `json:"rtt,omitempty"`
|
||||
Server string `json:"server,omitempty"`
|
||||
Stop *time.Time `json:"stop,omitempty"`
|
||||
}
|
||||
|
||||
@@ -1021,7 +1022,7 @@ func (s *Server) accountConnectEvent(c *client) {
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: nameForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
@@ -1063,7 +1064,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: nameForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
@@ -1105,7 +1106,7 @@ func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: nameForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
@@ -1242,10 +1243,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate the correct latency given M1 and M2.
|
||||
// M2 ServiceLatency is correct, so use that.
|
||||
// M1 TotalLatency is correct, so use that.
|
||||
// Will use those to back into NATS latency.
|
||||
// Calculate the correct latencies given M1 and M2.
|
||||
m1.merge(&m2)
|
||||
|
||||
// Clear the requesting client since we send the result here.
|
||||
@@ -1469,14 +1467,6 @@ func (s *Server) nsubsRequest(sub *subscription, _ *client, subject, reply strin
|
||||
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nsubs)
|
||||
}
|
||||
|
||||
// Helper to grab name for a client.
|
||||
func nameForClient(c *client) string {
|
||||
if c.user != nil {
|
||||
return c.user.Nkey
|
||||
}
|
||||
return "N/A"
|
||||
}
|
||||
|
||||
// Helper to grab account name for a client.
|
||||
func accForClient(c *client) string {
|
||||
if c.acc != nil {
|
||||
|
||||
Reference in New Issue
Block a user