Additional service latency upgrades.

We now share more information about the responder and the requestor. The requestor information by default is not shared, but can be when declaring the import.

Also fixed bug for error handling on old request style requests that would always result on a 408 response.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-04-28 16:24:18 -07:00
parent b8c04c1abf
commit a7f1bca534
8 changed files with 595 additions and 149 deletions

View File

@@ -113,6 +113,7 @@ type serviceImport struct {
response bool
invalid bool
tracking bool
share bool
}
// This is used to record when we create a mapping for implicit service
@@ -722,30 +723,37 @@ func (a *Account) IsExportServiceTracking(service string) bool {
return false
}
// NATSLatency represents the internal NATS latencies, including RTTs to clients.
type NATSLatency struct {
Requestor time.Duration `json:"req"`
Responder time.Duration `json:"resp"`
System time.Duration `json:"sys"`
}
// TotalTime is a helper function that totals the NATS latencies.
func (nl *NATSLatency) TotalTime() time.Duration {
return nl.Requestor + nl.Responder + nl.System
}
// ServiceLatency is the JSON message sent out in response to latency tracking for
// exported services.
// an accounts exported services.
type ServiceLatency struct {
Status int `json:"status"`
Error string `json:"description,omitempty"`
AppName string `json:"app,omitempty"`
Requestor LatencyClient `json:"requestor,omitempty"`
Responder LatencyClient `json:"responder,omitempty"`
RequestStart time.Time `json:"start"`
ServiceLatency time.Duration `json:"svc"`
NATSLatency NATSLatency `json:"nats"`
ServiceLatency time.Duration `json:"service"`
SystemLatency time.Duration `json:"system"`
TotalLatency time.Duration `json:"total"`
}
// LatencyClient is the JSON message structure assigned to requestors and responders.
// Note that for a requestor, the only information shared by default is the RTT used
// to calculate the total latency. The requestor's account can designate to share
// the additional information in the service import.
type LatencyClient struct {
User string `json:"user,omitempty"`
Name string `json:"name,omitempty"`
RTT time.Duration `json:"rtt"`
IP string `json:"ip"`
CID uint64 `json:"cid"`
Server string `json:"server"`
}
// NATSTotalTime is a helper function that totals the NATS latencies.
func (nl *ServiceLatency) NATSTotalTime() time.Duration {
return nl.Requestor.RTT + nl.Responder.RTT + nl.SystemLatency
}
// Merge function to merge m1 and m2 (requestor and responder) measurements
// when there are two samples. This happens when the requestor and responder
// are on different servers.
@@ -754,10 +762,9 @@ type ServiceLatency struct {
// m1 TotalLatency is correct, so use that.
// Will use those to back into NATS latency.
func (m1 *ServiceLatency) merge(m2 *ServiceLatency) {
m1.AppName = m2.AppName
m1.NATSLatency.System = m1.ServiceLatency - (m2.ServiceLatency + m2.NATSLatency.Responder)
m1.SystemLatency = m1.ServiceLatency - (m2.ServiceLatency + m2.Responder.RTT)
m1.ServiceLatency = m2.ServiceLatency
m1.NATSLatency.Responder = m2.NATSLatency.Responder
m1.Responder = m2.Responder
sanitizeLatencyMetric(m1)
}
@@ -770,8 +777,8 @@ func sanitizeLatencyMetric(sl *ServiceLatency) {
if sl.ServiceLatency < 0 {
sl.ServiceLatency = 0
}
if sl.NATSLatency.System < 0 {
sl.NATSLatency.System = 0
if sl.SystemLatency < 0 {
sl.SystemLatency = 0
}
}
@@ -794,44 +801,34 @@ func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) {
// Used to send a bad request metric when we do not have a reply subject
func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *client) {
sl := &ServiceLatency{
Status: 400,
Error: "Bad Request",
RequestStart: time.Now().Add(-requestor.getRTTValue()).UTC(),
Status: 400,
Error: "Bad Request",
Requestor: requestor.getLatencyInfo(si.share),
}
sl.RequestStart = time.Now().Add(-sl.Requestor.RTT).UTC()
a.sendLatencyResult(si, sl)
}
// Used to send a latency result when the requestor interest was lost before the
// response could be delivered.
func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) {
var reqClientRTT time.Duration
if si.rc != nil {
reqClientRTT = si.rc.getRTTValue()
}
reqStart := time.Unix(0, si.ts-int64(reqClientRTT))
sl := &ServiceLatency{
Status: 408,
Error: "Request Timeout",
RequestStart: reqStart.UTC(),
NATSLatency: NATSLatency{
Requestor: reqClientRTT,
},
Status: 408,
Error: "Request Timeout",
}
if si.rc != nil {
sl.Requestor = si.rc.getLatencyInfo(si.share)
}
sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC()
a.sendLatencyResult(si, sl)
}
func (a *Account) sendBackendErrorTrackingLatency(si *serviceImport, reason rsiReason) {
var reqClientRTT time.Duration
sl := &ServiceLatency{}
if si.rc != nil {
reqClientRTT = si.rc.getRTTValue()
}
reqStart := time.Unix(0, si.ts-int64(reqClientRTT))
sl := &ServiceLatency{
RequestStart: reqStart.UTC(),
NATSLatency: NATSLatency{
Requestor: reqClientRTT,
},
sl.Requestor = si.rc.getLatencyInfo(si.share)
}
sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC()
if reason == rsiNoDelivery {
sl.Status = 503
sl.Error = "Service Unavailable"
@@ -853,37 +850,20 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
ts := time.Now()
serviceRTT := time.Duration(ts.UnixNano() - si.ts)
requestor := si.rc
var requestor = si.rc
var reqClientRTT = requestor.getRTTValue()
var respClientRTT time.Duration
var appName string
if responder != nil && responder.kind == CLIENT {
respClientRTT = responder.getRTTValue()
appName = responder.GetName()
}
// We will estimate time when request left the requestor by time we received
// and the client RTT for the requestor.
reqStart := time.Unix(0, si.ts-int64(reqClientRTT))
sl := &ServiceLatency{
Status: 200,
AppName: appName,
RequestStart: reqStart.UTC(),
ServiceLatency: serviceRTT - respClientRTT,
NATSLatency: NATSLatency{
Requestor: reqClientRTT,
Responder: respClientRTT,
System: 0,
},
TotalLatency: reqClientRTT + serviceRTT,
Status: 200,
Requestor: requestor.getLatencyInfo(si.share),
Responder: responder.getLatencyInfo(true),
}
if respClientRTT > 0 {
sl.NATSLatency.System = time.Since(ts)
sl.TotalLatency += sl.NATSLatency.System
sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC()
sl.ServiceLatency = serviceRTT - sl.Responder.RTT
sl.TotalLatency = sl.Requestor.RTT + serviceRTT
if sl.Responder.RTT > 0 {
sl.SystemLatency = time.Since(ts)
sl.TotalLatency += sl.SystemLatency
}
sanitizeLatencyMetric(sl)
// If we are expecting a remote measurement, store our sl here.
@@ -963,6 +943,23 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin
return err
}
// SetServiceImportSharing will allow sharing of information about requests with the export account.
// Used for service latency tracking at the moment.
func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.isClaimAccount() {
return fmt.Errorf("claim based accounts can not be updated directly")
}
for _, si := range a.imports.services {
if si.acc == destination && si.to == to {
si.share = allow
return nil
}
}
return fmt.Errorf("service import not found")
}
// AddServiceImport will add a route to an account to send published messages / requests
// to the destination account. From is the local subject to map, To is the
// subject that will appear on the destination account. Destination will need
@@ -1209,7 +1206,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
}
hasWC := subjectHasWildcard(from)
si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false}
si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false, false}
a.imports.services[from] = si
a.mu.Unlock()
@@ -1491,6 +1488,9 @@ func (a *Account) ServiceExportResponseThreshold(export string) (time.Duration,
func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time.Duration) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.isClaimAccount() {
return fmt.Errorf("claim based accounts can not be updated directly")
}
lrt := a.lowestServiceExportResponseTime()
se := a.getServiceExport(export)
if se == nil {
@@ -1512,9 +1512,8 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp
rt := osi.rt
// dest is the requestor's account. a is the service responder with the export.
se := osi.se
// Marked as internal here, that is how we distinguish.
si := &serviceImport{dest, nil, se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, false}
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, false, osi.share}
if a.exports.responses == nil {
a.exports.responses = make(map[string]*serviceImport)
@@ -1523,7 +1522,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp
// Always grab time and make sure response threshold timer is running.
si.ts = time.Now().UnixNano()
se.setResponseThresholdTimer()
osi.se.setResponseThresholdTimer()
if rt == Singleton && tracking {
si.latency = osi.latency
@@ -2106,6 +2105,12 @@ func (s *Server) AccountResolver() AccountResolver {
return ar
}
// isClaimAccount returns if this account is backed by a JWT claim.
// Lock should be held.
func (a *Account) isClaimAccount() bool {
return a.claimJWT != ""
}
// updateAccountClaims will update an existing account with new claims.
// This will replace any exports or imports previously defined.
// Lock MUST NOT be held upon entry.

View File

@@ -459,6 +459,8 @@ func (s *Server) processClientOrLeafAuthentication(c *client) bool {
if err := c.RegisterNkeyUser(nkey); err != nil {
return false
}
// Hold onto the user's public key.
c.pubKey = juc.Subject
// Generate an event if we have a system account.
s.accountConnectEvent(c)
@@ -511,7 +513,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client) bool {
if c.kind == CLIENT {
if opts.Authorization != "" {
return comparePasswords(opts.Authorization, c.opts.Authorization)
return comparePasswords(opts.Authorization, c.opts.Token)
} else if opts.Username != "" {
if opts.Username != c.opts.Username {
return false

View File

@@ -199,6 +199,7 @@ type client struct {
opts clientOpts
start time.Time
nonce []byte
pubKey string
nc net.Conn
ncs string
out outbound
@@ -416,22 +417,22 @@ func (s *subscription) isClosed() bool {
}
type clientOpts struct {
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
TLSRequired bool `json:"tls_required"`
Nkey string `json:"nkey,omitempty"`
JWT string `json:"jwt,omitempty"`
Sig string `json:"sig,omitempty"`
Authorization string `json:"auth_token,omitempty"`
Username string `json:"user,omitempty"`
Password string `json:"pass,omitempty"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Account string `json:"account,omitempty"`
AccountNew bool `json:"new_account,omitempty"`
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
TLSRequired bool `json:"tls_required"`
Nkey string `json:"nkey,omitempty"`
JWT string `json:"jwt,omitempty"`
Sig string `json:"sig,omitempty"`
Token string `json:"auth_token,omitempty"`
Username string `json:"user,omitempty"`
Password string `json:"pass,omitempty"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Account string `json:"account,omitempty"`
AccountNew bool `json:"new_account,omitempty"`
// Routes only
Import *SubjectPermission `json:"import,omitempty"`
@@ -2866,18 +2867,18 @@ func (c *client) processInboundClientMsg(msg []byte) bool {
if rl != nil {
delete(c.rrTracking.rmap, string(c.pa.subject))
}
rtt := c.rtt
c.mu.Unlock()
if rl != nil {
sl := &rl.M2
// Fill this in and send it off to the other side.
sl.AppName = c.opts.Name
sl.ServiceLatency = time.Since(sl.RequestStart) - rtt
sl.NATSLatency.Responder = rtt
sl.TotalLatency = sl.ServiceLatency + rtt
sl.Status = 200
sl.Responder = c.getLatencyInfo(true)
sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT
sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT
sanitizeLatencyMetric(sl)
lsub := remoteLatencySubjectForResponse(c.pa.subject)
c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account
c.srv.sendInternalAccountMsg(nil, lsub, rl) // Send to SYS account
}
}
@@ -2955,28 +2956,25 @@ func (c *client) handleGWReplyMap(msg []byte) bool {
c.pa.subject = []byte(rm.ms)
var rl *remoteLatency
var rtt time.Duration
if c.rrTracking != nil {
rl = c.rrTracking.rmap[string(c.pa.subject)]
if rl != nil {
delete(c.rrTracking.rmap, string(c.pa.subject))
}
rtt = c.rtt
}
c.mu.Unlock()
if rl != nil {
sl := &rl.M2
// Fill this in and send it off to the other side.
sl.AppName = c.opts.Name
sl.ServiceLatency = time.Since(sl.RequestStart) - rtt
sl.NATSLatency.Responder = rtt
sl.TotalLatency = sl.ServiceLatency + rtt
sl.Status = 200
sl.Responder = c.getLatencyInfo(true)
sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT
sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT
sanitizeLatencyMetric(sl)
lsub := remoteLatencySubjectForResponse(c.pa.subject)
c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account
c.srv.sendInternalAccountMsg(nil, lsub, rl) // Send to SYS account
}
// Check for leaf nodes
@@ -3040,6 +3038,17 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
to = string(c.pa.subject)
}
// Check to see if this was a bad request with no reply and we were supposed to be tracking.
if !si.response && si.latency != nil && len(c.pa.reply) == 0 && shouldSample(si.latency) {
si.acc.sendBadRequestTrackingLatency(si, c)
}
// Send tracking info here if we are tracking this request/response.
var didSendTL bool
if si.tracking {
didSendTL = acc.sendTrackingLatency(si, c)
}
// FIXME(dlc) - Do L1 cache trick like normal client?
rr := si.acc.sl.Match(to)
@@ -3079,17 +3088,10 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// We will remove if we did not deliver, or if we are a response service import and we are
// a singleton, or we have an EOF message.
shouldRemove := !didDeliver || (si.response && (si.rt == Singleton || len(msg) == LEN_CR_LF))
// Calculate tracking info here if we are tracking this request/response.
if si.tracking {
shouldRemove = acc.sendTrackingLatency(si, c)
// If we are tracking and we did not actually send the latency info we need to suppress the removal.
if si.tracking && !didSendTL {
shouldRemove = false
}
// Check to see if this was a bad request with no reply and we were supposed to be tracking.
if !si.response && si.latency != nil && len(c.pa.reply) == 0 && shouldSample(si.latency) {
si.acc.sendBadRequestTrackingLatency(si, c)
}
// If we are streamed or chunked we need to update our timestamp to avoid cleanup.
if si.rt != Singleton && didDeliver {
acc.mu.Lock()
@@ -3696,7 +3698,7 @@ func (c *client) teardownConn() {
if c.srv != nil {
if c.kind == ROUTER || c.kind == GATEWAY {
c.Noticef("%s connection closed", c.typeString())
} else { // Client, System, Jetstream and Leafnode connections.
} else { // Client, System, Jetstream, Account and Leafnode connections.
c.Debugf("%s connection closed", c.typeString())
}
}
@@ -3971,13 +3973,53 @@ func (c *client) pruneClosedSubFromPerAccountCache() {
}
}
// Grabs the information for latency reporting.
func (c *client) getLatencyInfo(detailed bool) LatencyClient {
var lc LatencyClient
if c == nil || c.kind != CLIENT {
return lc
}
sn := c.srv.Name()
c.mu.Lock()
lc.RTT = c.rtt
if detailed {
lc.Name = c.opts.Name
lc.User = c.getRawAuthUser()
lc.IP = c.host
lc.CID = c.cid
lc.Server = sn
}
c.mu.Unlock()
return lc
}
// getRAwAuthUser returns the raw auth user for the client.
// Lock should be held.
func (c *client) getRawAuthUser() string {
switch {
case c.opts.Nkey != "":
return c.opts.Nkey
case c.opts.Username != "":
return c.opts.Username
case c.opts.JWT != "":
return c.pubKey
case c.opts.Token != "":
return c.opts.Token
default:
return ""
}
}
// getAuthUser returns the auth user for the client.
// Lock should be held.
func (c *client) getAuthUser() string {
switch {
case c.opts.Nkey != "":
return fmt.Sprintf("Nkey %q", c.opts.Nkey)
case c.opts.Username != "":
return fmt.Sprintf("User %q", c.opts.Username)
case c.opts.JWT != "":
return fmt.Sprintf("JWT User %q", c.pubKey)
default:
return `User "N/A"`
}

View File

@@ -256,7 +256,7 @@ func TestClientConnect(t *testing.T) {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) {
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Token: "YZZ222", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
}

View File

@@ -1223,6 +1223,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st
}
m1 := si.m1
m2 := rl.M2
lsub := si.latency.subject
acc.mu.RUnlock()
@@ -1255,7 +1256,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st
// Make sure we remove the entry here.
acc.removeServiceImport(si.from)
// Send the metrics
s.sendInternalAccountMsg(acc, lsub, &m1)
s.sendInternalAccountMsg(acc, lsub, m1)
}
// This is used for all inbox replies so that we do not send supercluster wide interest

View File

@@ -1676,10 +1676,11 @@ type importStream struct {
}
type importService struct {
acc *Account
an string
sub string
to string
acc *Account
an string
sub string
to string
share bool
}
// Checks if an account name is reserved.
@@ -1925,6 +1926,11 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
*errors = append(*errors, &configErr{tk, msg})
continue
}
if err := service.acc.SetServiceImportSharing(ta, service.sub, service.share); err != nil {
msg := fmt.Sprintf("Error setting service import sharing %q: %v", service.sub, err)
*errors = append(*errors, &configErr{tk, msg})
continue
}
}
return nil
@@ -2298,6 +2304,7 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo
curStream *importStream
curService *importService
pre, to string
share bool
lt token
)
defer convertPanicToErrorList(&lt, errors)
@@ -2366,6 +2373,7 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo
} else {
curService.to = subject
}
curService.share = share
case "prefix":
pre = mv.(string)
if curStream != nil {
@@ -2376,6 +2384,11 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo
if curService != nil {
curService.to = to
}
case "share":
share = mv.(bool)
if curService != nil {
curService.share = share
}
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{

View File

@@ -178,6 +178,12 @@ func createAccount(t *testing.T, s *server.Server) (*server.Account, nkeys.KeyPa
}
func createUserCreds(t *testing.T, s *server.Server, akp nkeys.KeyPair) nats.Option {
t.Helper()
opt, _ := createUserCredsOption(t, s, akp)
return opt
}
func createUserCredsOption(t *testing.T, s *server.Server, akp nkeys.KeyPair) (nats.Option, string) {
t.Helper()
kp, _ := nkeys.CreateUser()
pub, _ := kp.PublicKey()
@@ -193,7 +199,7 @@ func createUserCreds(t *testing.T, s *server.Server, akp nkeys.KeyPair) nats.Opt
sig, _ := kp.Sign(nonce)
return sig, nil
}
return nats.UserJWT(userCB, sigCB)
return nats.UserJWT(userCB, sigCB), pub
}
func TestOperatorServer(t *testing.T) {

View File

@@ -16,6 +16,7 @@ package test
import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strings"
@@ -81,6 +82,25 @@ func (sc *supercluster) setResponseThreshold(t *testing.T, maxTime time.Duration
}
}
func (sc *supercluster) setImportShare(t *testing.T) {
t.Helper()
for _, c := range sc.clusters {
for _, s := range c.servers {
foo, err := s.LookupAccount("FOO")
if err != nil {
t.Fatalf("Error looking up account 'FOO': %v", err)
}
bar, err := s.LookupAccount("BAR")
if err != nil {
t.Fatalf("Error looking up account 'BAR': %v", err)
}
if err := bar.SetServiceImportSharing(foo, "ngs.usage.bar", true); err != nil {
t.Fatalf("Error setting import sharing: %v", err)
}
}
}
}
func (sc *supercluster) setupLatencyTracking(t *testing.T, p int) {
t.Helper()
for _, c := range sc.clusters {
@@ -155,6 +175,10 @@ func clientConnectOldRequest(t *testing.T, opts *server.Options, user string) *n
func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time, serviceTime time.Duration) {
t.Helper()
if sl.Status != 200 {
t.Fatalf("Bad status received, wanted 200 got %d", sl.Status)
}
serviceTime = serviceTime.Round(time.Millisecond)
startDelta := sl.RequestStart.Sub(start)
@@ -169,21 +193,59 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time
}
// We should have NATS latency here that is non-zero with real clients.
if sl.NATSLatency.Requestor == 0 {
if sl.Requestor.RTT == 0 {
t.Fatalf("Expected non-zero NATS Requestor latency")
}
if sl.NATSLatency.Responder == 0 {
if sl.Responder.RTT == 0 {
t.Fatalf("Expected non-zero NATS Requestor latency")
}
// Make sure they add up
got := sl.TotalLatency
expected := sl.ServiceLatency + sl.NATSLatency.TotalTime()
expected := sl.ServiceLatency + sl.NATSTotalTime()
if got != expected {
t.Fatalf("Numbers do not add up: %+v,\ngot: %v\nexpected: %v", sl, got, expected)
}
}
func extendedCheck(t *testing.T, lc *server.LatencyClient, eUser, appName, eServer string) {
t.Helper()
if lc.User != eUser {
t.Fatalf("Expected user of %q, got %q", eUser, lc.User)
}
if appName != "" && appName != lc.Name {
t.Fatalf("Expected appname of %q, got %q\n", appName, lc.Name)
}
if lc.IP == "" {
t.Fatalf("Expected non-empty IP")
}
if lc.CID < 1 || lc.CID > 20 {
t.Fatalf("Expected a CID in range, got %d", lc.CID)
}
if eServer != "" && eServer != lc.Server {
t.Fatalf("Expected server of %q, got %q", eServer, lc.Server)
}
}
func noShareCheck(t *testing.T, lc *server.LatencyClient) {
t.Helper()
if lc.Name != "" {
t.Fatalf("appname should not have been shared, got %q", lc.Name)
}
if lc.User != "" {
t.Fatalf("user should not have been shared, got %q", lc.User)
}
if lc.IP != "" {
t.Fatalf("client ip should not have been shared, got %q", lc.IP)
}
if lc.CID != 0 {
t.Fatalf("client id should not have been shared, got %d", lc.CID)
}
if lc.Server != "" {
t.Fatalf("client' server should not have been shared, got %q", lc.Server)
}
}
func TestServiceLatencySingleServerConnect(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
@@ -193,7 +255,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) {
// Now we can setup and test, do single node only first.
// This is the service provider.
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22")
defer nc.Close()
// The service listener.
@@ -212,8 +274,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) {
// Send the request.
start := time.Now()
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
if err != nil {
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
@@ -222,6 +283,26 @@ func TestServiceLatencySingleServerConnect(t *testing.T) {
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
rs := sc.clusters[0].servers[0]
extendedCheck(t, &sl.Responder, "foo", "service22", rs.Name())
// Normally requestor's don't share
noShareCheck(t, &sl.Requestor)
// Now make sure normal use case works with old request style.
nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar")
defer nc3.Close()
start = time.Now()
if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
nc3.Close()
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, &sl.Responder, "foo", "service22", rs.Name())
// Normally requestor's don't share
noShareCheck(t, &sl.Requestor)
}
// If a client has a longer RTT that the effective RTT for NATS + responder
@@ -249,8 +330,9 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) {
nc.Flush()
// Requestor and processing
requestAndCheck := func(sopts *server.Options) {
requestAndCheck := func(cindex, sindex int) {
t.Helper()
sopts := sc.clusters[cindex].opts[sindex]
if nmsgs, _, err := rsub.Pending(); err != nil || nmsgs != 0 {
t.Fatalf("Did not expect any latency results, got %d", nmsgs)
@@ -290,12 +372,18 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) {
// We want to test here that when the client requestor RTT is larger then the response time
// we still deliver a requestor value > 0.
// Now check that it is close to rtt.
if sl.NATSLatency.Requestor < rtt {
t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.NATSLatency.Requestor)
if sl.Requestor.RTT < rtt {
t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.Requestor.RTT)
}
if sl.TotalLatency < rtt {
t.Fatalf("Expected total latency to be > %v, got %v", rtt, sl.TotalLatency)
}
rs := sc.clusters[0].servers[0]
extendedCheck(t, &sl.Responder, "foo", "", rs.Name())
// Normally requestor's don't share
noShareCheck(t, &sl.Requestor)
// Check for trailing duplicates..
rmsg, err = rsub.NextMsg(100 * time.Millisecond)
if err == nil {
@@ -304,11 +392,11 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) {
}
// Check same server.
requestAndCheck(sc.clusters[0].opts[0])
requestAndCheck(0, 0)
// Check from remote server across GW.
requestAndCheck(sc.clusters[1].opts[1])
requestAndCheck(1, 1)
// Same cluster but different server
requestAndCheck(sc.clusters[0].opts[1])
requestAndCheck(0, 1)
}
func connRTT(nc *nats.Conn) time.Duration {
@@ -362,11 +450,15 @@ func TestServiceLatencyRemoteConnect(t *testing.T) {
}
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
rs := sc.clusters[0].servers[0]
extendedCheck(t, &sl.Responder, "foo", "", rs.Name())
// Normally requestor's don't share
noShareCheck(t, &sl.Requestor)
// Lastly here, we need to make sure we are properly tracking the extra hops.
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt {
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt)
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt {
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt)
}
// Gateway Requestor
@@ -386,11 +478,14 @@ func TestServiceLatencyRemoteConnect(t *testing.T) {
}
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, &sl.Responder, "foo", "", rs.Name())
// Normally requestor's don't share
noShareCheck(t, &sl.Requestor)
// Lastly here, we need to make sure we are properly tracking the extra hops.
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt {
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt)
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt {
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt)
}
// Now turn off and make sure we no longer receive updates.
@@ -530,9 +625,10 @@ func TestServiceLatencyWithName(t *testing.T) {
json.Unmarshal(rmsg.Data, &sl)
// Make sure we have AppName set.
if sl.AppName != "dlc22" {
t.Fatalf("Expected to have AppName set correctly, %q vs %q", "dlc22", sl.AppName)
}
rs := sc.clusters[0].servers[0]
extendedCheck(t, &sl.Responder, "foo", "dlc22", rs.Name())
// Normally requestor's don't share
noShareCheck(t, &sl.Requestor)
}
func TestServiceLatencyWithNameMultiServer(t *testing.T) {
@@ -563,9 +659,10 @@ func TestServiceLatencyWithNameMultiServer(t *testing.T) {
json.Unmarshal(rmsg.Data, &sl)
// Make sure we have AppName set.
if sl.AppName != "dlc22" {
t.Fatalf("Expected to have AppName set correctly, %q vs %q", "dlc22", sl.AppName)
}
rs := sc.clusters[0].servers[1]
extendedCheck(t, &sl.Responder, "foo", "dlc22", rs.Name())
// Normally requestor's don't share
noShareCheck(t, &sl.Requestor)
}
func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) {
@@ -623,7 +720,7 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) {
var sl server.ServiceLatency
json.Unmarshal(msg.Data, &sl)
rlock.Lock()
results[sl.AppName] += sl.ServiceLatency
results[sl.Responder.Name] += sl.ServiceLatency
rlock.Unlock()
if r := atomic.AddInt32(&received, 1); r >= toSend {
ch <- true
@@ -730,7 +827,8 @@ func TestServiceLatencyWithJWT(t *testing.T) {
// Create service provider.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, svcKP))
copt, pubUser := createUserCredsOption(t, s, svcKP)
nc, err := nats.Connect(url, copt, nats.Name("fooService"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
@@ -781,6 +879,8 @@ func TestServiceLatencyWithJWT(t *testing.T) {
serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"}
updateAccount()
// Grab the service responder's user.
// Send the request.
start := time.Now()
_, err = nc2.Request("request", []byte("hello"), time.Second)
@@ -795,6 +895,9 @@ func TestServiceLatencyWithJWT(t *testing.T) {
}
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, &sl.Responder, pubUser, "fooService", s.Name())
// Normally requestor's don't share
noShareCheck(t, &sl.Requestor)
// Now we will remove tracking. Do this by simulating a JWT update.
serviceExport.Latency = nil
@@ -933,7 +1036,7 @@ func TestServiceLatencyRemoteConnectAdjustNegativeValues(t *testing.T) {
if sl.ServiceLatency < 0 {
t.Fatalf("Unexpected negative service latency value: %v", sl)
}
if sl.NATSLatency.System < 0 {
if sl.SystemLatency < 0 {
t.Fatalf("Unexpected negative system latency value: %v", sl)
}
}
@@ -977,6 +1080,7 @@ func TestServiceLatencyFailureReportingSingleServer(t *testing.T) {
// Test a request with no reply subject
nc2.Publish("ngs.usage", []byte("1h"))
sl := getMetricResult()
if sl.Status != 400 {
t.Fatalf("Expected to get a bad request status [400], got %d", sl.Status)
}
@@ -1109,3 +1213,276 @@ func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
}
// To test a bug rip@nats.io is seeing.
func TestServiceLatencyOldRequestStyleSingleServer(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
accounts: [CLIENT]
latency: {
sampling: 100%
subject: latency.svc
}
} ]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`))
defer os.Remove(conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Listen for metrics
rsub, _ := nc.SubscribeSync("latency.svc")
// Requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Setup responder
serviceTime := 25 * time.Millisecond
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond([]byte("world"))
})
nc.Flush()
defer sub.Unsubscribe()
// Send a request
start := time.Now()
if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
var sl server.ServiceLatency
rmsg, _ := rsub.NextMsg(time.Second)
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, &sl.Responder, "svc", "", srv.Name())
noShareCheck(t, &sl.Requestor)
}
// Check we get the proper detailed information for the requestor when allowed.
func TestServiceLatencyRequestorSharesDetailedInfo(t *testing.T) {
sc := createSuperCluster(t, 3, 3)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
sc.setResponseThreshold(t, 10*time.Millisecond)
sc.setImportShare(t)
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// Listen for metrics
rsub, err := nc.SubscribeSync("results")
if err != nil {
t.Fatal(err)
}
nc.Flush()
getMetricResult := func() *server.ServiceLatency {
t.Helper()
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected to receive latency metric: %v", err)
}
var sl server.ServiceLatency
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
t.Errorf("Unexpected error processing latency metric: %s", err)
}
return &sl
}
cases := []struct {
ci, si int
desc string
}{
{0, 0, "same server"},
{0, 1, "same cluster, different server"},
{1, 1, "different cluster"},
}
for _, cs := range cases {
// Select the server to send request from.
nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar")
defer nc2.Close()
rs := sc.clusters[cs.ci].servers[cs.si]
// Test a request with no reply subject
nc2.Publish("ngs.usage", []byte("1h"))
sl := getMetricResult()
if sl.Status != 400 {
t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status)
}
extendedCheck(t, &sl.Requestor, "bar", "", rs.Name())
// Proper request, but no responders.
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
sl = getMetricResult()
if sl.Status != 503 {
t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status)
}
extendedCheck(t, &sl.Requestor, "bar", "", rs.Name())
// The service listener. Make it slow. 10ms is respThreshold, so take 2X
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
time.Sleep(20 * time.Millisecond)
msg.Respond([]byte("22 msgs"))
})
defer sub.Unsubscribe()
nc.Flush()
// Wait to propagate.
time.Sleep(100 * time.Millisecond)
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
sl = getMetricResult()
if sl.Status != 504 {
t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status)
}
extendedCheck(t, &sl.Requestor, "bar", "", rs.Name())
// Clean up subscriber and requestor
nc2.Close()
sub.Unsubscribe()
nc.Flush()
// Wait to propagate.
time.Sleep(100 * time.Millisecond)
}
}
func TestServiceLatencyRequestorSharesConfig(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
accounts: [CLIENT]
latency: {
sampling: 100%
subject: latency.svc
}
} ]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`))
defer os.Remove(conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Listen for metrics
rsub, _ := nc.SubscribeSync("latency.svc")
// Requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Setup responder
serviceTime := 25 * time.Millisecond
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond([]byte("world"))
})
nc.Flush()
defer sub.Unsubscribe()
// Send a request
start := time.Now()
if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
var sl server.ServiceLatency
rmsg, _ := rsub.NextMsg(time.Second)
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, &sl.Responder, "svc", "", srv.Name())
extendedCheck(t, &sl.Requestor, "client", "", srv.Name())
// Check reload.
newConf := []byte(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
accounts: [CLIENT]
latency: {
sampling: 100%
subject: latency.svc
}
} ]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`)
if err := ioutil.WriteFile(conf, newConf, 0600); err != nil {
t.Fatalf("Error rewriting server's config file: %v", err)
}
if err := srv.Reload(); err != nil {
t.Fatalf("Error on server reload: %v", err)
}
start = time.Now()
if _, err = nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
var sl2 server.ServiceLatency
rmsg, _ = rsub.NextMsg(time.Second)
json.Unmarshal(rmsg.Data, &sl2)
noShareCheck(t, &sl2.Requestor)
}