From a7f1bca53464044eb65d44efba4c144763f4e5f3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Apr 2020 16:24:18 -0700 Subject: [PATCH] Additional service latency upgrades. We now share more information about the responder and the requestor. The requestor information by default is not shared, but can be when declaring the import. Also fixed bug for error handling on old request style requests that would always result on a 408 response. Signed-off-by: Derek Collison --- server/accounts.go | 155 +++++++------ server/auth.go | 4 +- server/client.go | 124 ++++++---- server/client_test.go | 2 +- server/events.go | 3 +- server/opts.go | 21 +- test/operator_test.go | 8 +- test/service_latency_test.go | 427 +++++++++++++++++++++++++++++++++-- 8 files changed, 595 insertions(+), 149 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index fa7421fe..38a82363 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -113,6 +113,7 @@ type serviceImport struct { response bool invalid bool tracking bool + share bool } // This is used to record when we create a mapping for implicit service @@ -722,30 +723,37 @@ func (a *Account) IsExportServiceTracking(service string) bool { return false } -// NATSLatency represents the internal NATS latencies, including RTTs to clients. -type NATSLatency struct { - Requestor time.Duration `json:"req"` - Responder time.Duration `json:"resp"` - System time.Duration `json:"sys"` -} - -// TotalTime is a helper function that totals the NATS latencies. -func (nl *NATSLatency) TotalTime() time.Duration { - return nl.Requestor + nl.Responder + nl.System -} - // ServiceLatency is the JSON message sent out in response to latency tracking for -// exported services. +// an accounts exported services. type ServiceLatency struct { Status int `json:"status"` Error string `json:"description,omitempty"` - AppName string `json:"app,omitempty"` + Requestor LatencyClient `json:"requestor,omitempty"` + Responder LatencyClient `json:"responder,omitempty"` RequestStart time.Time `json:"start"` - ServiceLatency time.Duration `json:"svc"` - NATSLatency NATSLatency `json:"nats"` + ServiceLatency time.Duration `json:"service"` + SystemLatency time.Duration `json:"system"` TotalLatency time.Duration `json:"total"` } +// LatencyClient is the JSON message structure assigned to requestors and responders. +// Note that for a requestor, the only information shared by default is the RTT used +// to calculate the total latency. The requestor's account can designate to share +// the additional information in the service import. +type LatencyClient struct { + User string `json:"user,omitempty"` + Name string `json:"name,omitempty"` + RTT time.Duration `json:"rtt"` + IP string `json:"ip"` + CID uint64 `json:"cid"` + Server string `json:"server"` +} + +// NATSTotalTime is a helper function that totals the NATS latencies. +func (nl *ServiceLatency) NATSTotalTime() time.Duration { + return nl.Requestor.RTT + nl.Responder.RTT + nl.SystemLatency +} + // Merge function to merge m1 and m2 (requestor and responder) measurements // when there are two samples. This happens when the requestor and responder // are on different servers. @@ -754,10 +762,9 @@ type ServiceLatency struct { // m1 TotalLatency is correct, so use that. // Will use those to back into NATS latency. func (m1 *ServiceLatency) merge(m2 *ServiceLatency) { - m1.AppName = m2.AppName - m1.NATSLatency.System = m1.ServiceLatency - (m2.ServiceLatency + m2.NATSLatency.Responder) + m1.SystemLatency = m1.ServiceLatency - (m2.ServiceLatency + m2.Responder.RTT) m1.ServiceLatency = m2.ServiceLatency - m1.NATSLatency.Responder = m2.NATSLatency.Responder + m1.Responder = m2.Responder sanitizeLatencyMetric(m1) } @@ -770,8 +777,8 @@ func sanitizeLatencyMetric(sl *ServiceLatency) { if sl.ServiceLatency < 0 { sl.ServiceLatency = 0 } - if sl.NATSLatency.System < 0 { - sl.NATSLatency.System = 0 + if sl.SystemLatency < 0 { + sl.SystemLatency = 0 } } @@ -794,44 +801,34 @@ func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) { // Used to send a bad request metric when we do not have a reply subject func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *client) { sl := &ServiceLatency{ - Status: 400, - Error: "Bad Request", - RequestStart: time.Now().Add(-requestor.getRTTValue()).UTC(), + Status: 400, + Error: "Bad Request", + Requestor: requestor.getLatencyInfo(si.share), } + sl.RequestStart = time.Now().Add(-sl.Requestor.RTT).UTC() a.sendLatencyResult(si, sl) } // Used to send a latency result when the requestor interest was lost before the // response could be delivered. func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) { - var reqClientRTT time.Duration - if si.rc != nil { - reqClientRTT = si.rc.getRTTValue() - } - reqStart := time.Unix(0, si.ts-int64(reqClientRTT)) sl := &ServiceLatency{ - Status: 408, - Error: "Request Timeout", - RequestStart: reqStart.UTC(), - NATSLatency: NATSLatency{ - Requestor: reqClientRTT, - }, + Status: 408, + Error: "Request Timeout", } + if si.rc != nil { + sl.Requestor = si.rc.getLatencyInfo(si.share) + } + sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() a.sendLatencyResult(si, sl) } func (a *Account) sendBackendErrorTrackingLatency(si *serviceImport, reason rsiReason) { - var reqClientRTT time.Duration + sl := &ServiceLatency{} if si.rc != nil { - reqClientRTT = si.rc.getRTTValue() - } - reqStart := time.Unix(0, si.ts-int64(reqClientRTT)) - sl := &ServiceLatency{ - RequestStart: reqStart.UTC(), - NATSLatency: NATSLatency{ - Requestor: reqClientRTT, - }, + sl.Requestor = si.rc.getLatencyInfo(si.share) } + sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() if reason == rsiNoDelivery { sl.Status = 503 sl.Error = "Service Unavailable" @@ -853,37 +850,20 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool ts := time.Now() serviceRTT := time.Duration(ts.UnixNano() - si.ts) + requestor := si.rc - var requestor = si.rc - var reqClientRTT = requestor.getRTTValue() - var respClientRTT time.Duration - var appName string - - if responder != nil && responder.kind == CLIENT { - respClientRTT = responder.getRTTValue() - appName = responder.GetName() - } - - // We will estimate time when request left the requestor by time we received - // and the client RTT for the requestor. - reqStart := time.Unix(0, si.ts-int64(reqClientRTT)) sl := &ServiceLatency{ - Status: 200, - AppName: appName, - RequestStart: reqStart.UTC(), - ServiceLatency: serviceRTT - respClientRTT, - NATSLatency: NATSLatency{ - Requestor: reqClientRTT, - Responder: respClientRTT, - System: 0, - }, - TotalLatency: reqClientRTT + serviceRTT, + Status: 200, + Requestor: requestor.getLatencyInfo(si.share), + Responder: responder.getLatencyInfo(true), } - if respClientRTT > 0 { - sl.NATSLatency.System = time.Since(ts) - sl.TotalLatency += sl.NATSLatency.System + sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() + sl.ServiceLatency = serviceRTT - sl.Responder.RTT + sl.TotalLatency = sl.Requestor.RTT + serviceRTT + if sl.Responder.RTT > 0 { + sl.SystemLatency = time.Since(ts) + sl.TotalLatency += sl.SystemLatency } - sanitizeLatencyMetric(sl) // If we are expecting a remote measurement, store our sl here. @@ -963,6 +943,23 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin return err } +// SetServiceImportSharing will allow sharing of information about requests with the export account. +// Used for service latency tracking at the moment. +func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error { + a.mu.Lock() + defer a.mu.Unlock() + if a.isClaimAccount() { + return fmt.Errorf("claim based accounts can not be updated directly") + } + for _, si := range a.imports.services { + if si.acc == destination && si.to == to { + si.share = allow + return nil + } + } + return fmt.Errorf("service import not found") +} + // AddServiceImport will add a route to an account to send published messages / requests // to the destination account. From is the local subject to map, To is the // subject that will appear on the destination account. Destination will need @@ -1209,7 +1206,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im } hasWC := subjectHasWildcard(from) - si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false} + si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false, false} a.imports.services[from] = si a.mu.Unlock() @@ -1491,6 +1488,9 @@ func (a *Account) ServiceExportResponseThreshold(export string) (time.Duration, func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time.Duration) error { a.mu.Lock() defer a.mu.Unlock() + if a.isClaimAccount() { + return fmt.Errorf("claim based accounts can not be updated directly") + } lrt := a.lowestServiceExportResponseTime() se := a.getServiceExport(export) if se == nil { @@ -1512,9 +1512,8 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp rt := osi.rt // dest is the requestor's account. a is the service responder with the export. - se := osi.se // Marked as internal here, that is how we distinguish. - si := &serviceImport{dest, nil, se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, false} + si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, false, osi.share} if a.exports.responses == nil { a.exports.responses = make(map[string]*serviceImport) @@ -1523,7 +1522,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp // Always grab time and make sure response threshold timer is running. si.ts = time.Now().UnixNano() - se.setResponseThresholdTimer() + osi.se.setResponseThresholdTimer() if rt == Singleton && tracking { si.latency = osi.latency @@ -2106,6 +2105,12 @@ func (s *Server) AccountResolver() AccountResolver { return ar } +// isClaimAccount returns if this account is backed by a JWT claim. +// Lock should be held. +func (a *Account) isClaimAccount() bool { + return a.claimJWT != "" +} + // updateAccountClaims will update an existing account with new claims. // This will replace any exports or imports previously defined. // Lock MUST NOT be held upon entry. diff --git a/server/auth.go b/server/auth.go index a5857250..d76327d9 100644 --- a/server/auth.go +++ b/server/auth.go @@ -459,6 +459,8 @@ func (s *Server) processClientOrLeafAuthentication(c *client) bool { if err := c.RegisterNkeyUser(nkey); err != nil { return false } + // Hold onto the user's public key. + c.pubKey = juc.Subject // Generate an event if we have a system account. s.accountConnectEvent(c) @@ -511,7 +513,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client) bool { if c.kind == CLIENT { if opts.Authorization != "" { - return comparePasswords(opts.Authorization, c.opts.Authorization) + return comparePasswords(opts.Authorization, c.opts.Token) } else if opts.Username != "" { if opts.Username != c.opts.Username { return false diff --git a/server/client.go b/server/client.go index df1114f7..b226a027 100644 --- a/server/client.go +++ b/server/client.go @@ -199,6 +199,7 @@ type client struct { opts clientOpts start time.Time nonce []byte + pubKey string nc net.Conn ncs string out outbound @@ -416,22 +417,22 @@ func (s *subscription) isClosed() bool { } type clientOpts struct { - Echo bool `json:"echo"` - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - TLSRequired bool `json:"tls_required"` - Nkey string `json:"nkey,omitempty"` - JWT string `json:"jwt,omitempty"` - Sig string `json:"sig,omitempty"` - Authorization string `json:"auth_token,omitempty"` - Username string `json:"user,omitempty"` - Password string `json:"pass,omitempty"` - Name string `json:"name"` - Lang string `json:"lang"` - Version string `json:"version"` - Protocol int `json:"protocol"` - Account string `json:"account,omitempty"` - AccountNew bool `json:"new_account,omitempty"` + Echo bool `json:"echo"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + TLSRequired bool `json:"tls_required"` + Nkey string `json:"nkey,omitempty"` + JWT string `json:"jwt,omitempty"` + Sig string `json:"sig,omitempty"` + Token string `json:"auth_token,omitempty"` + Username string `json:"user,omitempty"` + Password string `json:"pass,omitempty"` + Name string `json:"name"` + Lang string `json:"lang"` + Version string `json:"version"` + Protocol int `json:"protocol"` + Account string `json:"account,omitempty"` + AccountNew bool `json:"new_account,omitempty"` // Routes only Import *SubjectPermission `json:"import,omitempty"` @@ -2866,18 +2867,18 @@ func (c *client) processInboundClientMsg(msg []byte) bool { if rl != nil { delete(c.rrTracking.rmap, string(c.pa.subject)) } - rtt := c.rtt c.mu.Unlock() + if rl != nil { sl := &rl.M2 // Fill this in and send it off to the other side. - sl.AppName = c.opts.Name - sl.ServiceLatency = time.Since(sl.RequestStart) - rtt - sl.NATSLatency.Responder = rtt - sl.TotalLatency = sl.ServiceLatency + rtt + sl.Status = 200 + sl.Responder = c.getLatencyInfo(true) + sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT + sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT sanitizeLatencyMetric(sl) lsub := remoteLatencySubjectForResponse(c.pa.subject) - c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account + c.srv.sendInternalAccountMsg(nil, lsub, rl) // Send to SYS account } } @@ -2955,28 +2956,25 @@ func (c *client) handleGWReplyMap(msg []byte) bool { c.pa.subject = []byte(rm.ms) var rl *remoteLatency - var rtt time.Duration if c.rrTracking != nil { rl = c.rrTracking.rmap[string(c.pa.subject)] if rl != nil { delete(c.rrTracking.rmap, string(c.pa.subject)) } - rtt = c.rtt } c.mu.Unlock() if rl != nil { sl := &rl.M2 // Fill this in and send it off to the other side. - sl.AppName = c.opts.Name - sl.ServiceLatency = time.Since(sl.RequestStart) - rtt - sl.NATSLatency.Responder = rtt - sl.TotalLatency = sl.ServiceLatency + rtt + sl.Status = 200 + sl.Responder = c.getLatencyInfo(true) + sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT + sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT sanitizeLatencyMetric(sl) - lsub := remoteLatencySubjectForResponse(c.pa.subject) - c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account + c.srv.sendInternalAccountMsg(nil, lsub, rl) // Send to SYS account } // Check for leaf nodes @@ -3040,6 +3038,17 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt to = string(c.pa.subject) } + // Check to see if this was a bad request with no reply and we were supposed to be tracking. + if !si.response && si.latency != nil && len(c.pa.reply) == 0 && shouldSample(si.latency) { + si.acc.sendBadRequestTrackingLatency(si, c) + } + + // Send tracking info here if we are tracking this request/response. + var didSendTL bool + if si.tracking { + didSendTL = acc.sendTrackingLatency(si, c) + } + // FIXME(dlc) - Do L1 cache trick like normal client? rr := si.acc.sl.Match(to) @@ -3079,17 +3088,10 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // We will remove if we did not deliver, or if we are a response service import and we are // a singleton, or we have an EOF message. shouldRemove := !didDeliver || (si.response && (si.rt == Singleton || len(msg) == LEN_CR_LF)) - - // Calculate tracking info here if we are tracking this request/response. - if si.tracking { - shouldRemove = acc.sendTrackingLatency(si, c) + // If we are tracking and we did not actually send the latency info we need to suppress the removal. + if si.tracking && !didSendTL { + shouldRemove = false } - - // Check to see if this was a bad request with no reply and we were supposed to be tracking. - if !si.response && si.latency != nil && len(c.pa.reply) == 0 && shouldSample(si.latency) { - si.acc.sendBadRequestTrackingLatency(si, c) - } - // If we are streamed or chunked we need to update our timestamp to avoid cleanup. if si.rt != Singleton && didDeliver { acc.mu.Lock() @@ -3696,7 +3698,7 @@ func (c *client) teardownConn() { if c.srv != nil { if c.kind == ROUTER || c.kind == GATEWAY { c.Noticef("%s connection closed", c.typeString()) - } else { // Client, System, Jetstream and Leafnode connections. + } else { // Client, System, Jetstream, Account and Leafnode connections. c.Debugf("%s connection closed", c.typeString()) } } @@ -3971,13 +3973,53 @@ func (c *client) pruneClosedSubFromPerAccountCache() { } } +// Grabs the information for latency reporting. +func (c *client) getLatencyInfo(detailed bool) LatencyClient { + var lc LatencyClient + if c == nil || c.kind != CLIENT { + return lc + } + sn := c.srv.Name() + c.mu.Lock() + lc.RTT = c.rtt + if detailed { + lc.Name = c.opts.Name + lc.User = c.getRawAuthUser() + lc.IP = c.host + lc.CID = c.cid + lc.Server = sn + } + c.mu.Unlock() + return lc +} + +// getRAwAuthUser returns the raw auth user for the client. +// Lock should be held. +func (c *client) getRawAuthUser() string { + switch { + case c.opts.Nkey != "": + return c.opts.Nkey + case c.opts.Username != "": + return c.opts.Username + case c.opts.JWT != "": + return c.pubKey + case c.opts.Token != "": + return c.opts.Token + default: + return "" + } +} + // getAuthUser returns the auth user for the client. +// Lock should be held. func (c *client) getAuthUser() string { switch { case c.opts.Nkey != "": return fmt.Sprintf("Nkey %q", c.opts.Nkey) case c.opts.Username != "": return fmt.Sprintf("User %q", c.opts.Username) + case c.opts.JWT != "": + return fmt.Sprintf("JWT User %q", c.pubKey) default: return `User "N/A"` } diff --git a/server/client_test.go b/server/client_test.go index 5379cd59..dc9dee70 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -256,7 +256,7 @@ func TestClientConnect(t *testing.T) { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) { + if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Token: "YZZ222", Name: "router"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } } diff --git a/server/events.go b/server/events.go index e955f211..f94377fb 100644 --- a/server/events.go +++ b/server/events.go @@ -1223,6 +1223,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st } m1 := si.m1 m2 := rl.M2 + lsub := si.latency.subject acc.mu.RUnlock() @@ -1255,7 +1256,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st // Make sure we remove the entry here. acc.removeServiceImport(si.from) // Send the metrics - s.sendInternalAccountMsg(acc, lsub, &m1) + s.sendInternalAccountMsg(acc, lsub, m1) } // This is used for all inbox replies so that we do not send supercluster wide interest diff --git a/server/opts.go b/server/opts.go index 17eb1809..eaca5cee 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1676,10 +1676,11 @@ type importStream struct { } type importService struct { - acc *Account - an string - sub string - to string + acc *Account + an string + sub string + to string + share bool } // Checks if an account name is reserved. @@ -1925,6 +1926,11 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, &configErr{tk, msg}) continue } + if err := service.acc.SetServiceImportSharing(ta, service.sub, service.share); err != nil { + msg := fmt.Sprintf("Error setting service import sharing %q: %v", service.sub, err) + *errors = append(*errors, &configErr{tk, msg}) + continue + } } return nil @@ -2298,6 +2304,7 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo curStream *importStream curService *importService pre, to string + share bool lt token ) defer convertPanicToErrorList(<, errors) @@ -2366,6 +2373,7 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo } else { curService.to = subject } + curService.share = share case "prefix": pre = mv.(string) if curStream != nil { @@ -2376,6 +2384,11 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo if curService != nil { curService.to = to } + case "share": + share = mv.(bool) + if curService != nil { + curService.share = share + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/test/operator_test.go b/test/operator_test.go index d813faad..a69e7b3c 100644 --- a/test/operator_test.go +++ b/test/operator_test.go @@ -178,6 +178,12 @@ func createAccount(t *testing.T, s *server.Server) (*server.Account, nkeys.KeyPa } func createUserCreds(t *testing.T, s *server.Server, akp nkeys.KeyPair) nats.Option { + t.Helper() + opt, _ := createUserCredsOption(t, s, akp) + return opt +} + +func createUserCredsOption(t *testing.T, s *server.Server, akp nkeys.KeyPair) (nats.Option, string) { t.Helper() kp, _ := nkeys.CreateUser() pub, _ := kp.PublicKey() @@ -193,7 +199,7 @@ func createUserCreds(t *testing.T, s *server.Server, akp nkeys.KeyPair) nats.Opt sig, _ := kp.Sign(nonce) return sig, nil } - return nats.UserJWT(userCB, sigCB) + return nats.UserJWT(userCB, sigCB), pub } func TestOperatorServer(t *testing.T) { diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 084c9791..fdd39a77 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -16,6 +16,7 @@ package test import ( "encoding/json" "fmt" + "io/ioutil" "math/rand" "os" "strings" @@ -81,6 +82,25 @@ func (sc *supercluster) setResponseThreshold(t *testing.T, maxTime time.Duration } } +func (sc *supercluster) setImportShare(t *testing.T) { + t.Helper() + for _, c := range sc.clusters { + for _, s := range c.servers { + foo, err := s.LookupAccount("FOO") + if err != nil { + t.Fatalf("Error looking up account 'FOO': %v", err) + } + bar, err := s.LookupAccount("BAR") + if err != nil { + t.Fatalf("Error looking up account 'BAR': %v", err) + } + if err := bar.SetServiceImportSharing(foo, "ngs.usage.bar", true); err != nil { + t.Fatalf("Error setting import sharing: %v", err) + } + } + } +} + func (sc *supercluster) setupLatencyTracking(t *testing.T, p int) { t.Helper() for _, c := range sc.clusters { @@ -155,6 +175,10 @@ func clientConnectOldRequest(t *testing.T, opts *server.Options, user string) *n func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time, serviceTime time.Duration) { t.Helper() + if sl.Status != 200 { + t.Fatalf("Bad status received, wanted 200 got %d", sl.Status) + } + serviceTime = serviceTime.Round(time.Millisecond) startDelta := sl.RequestStart.Sub(start) @@ -169,21 +193,59 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time } // We should have NATS latency here that is non-zero with real clients. - if sl.NATSLatency.Requestor == 0 { + if sl.Requestor.RTT == 0 { t.Fatalf("Expected non-zero NATS Requestor latency") } - if sl.NATSLatency.Responder == 0 { + if sl.Responder.RTT == 0 { t.Fatalf("Expected non-zero NATS Requestor latency") } // Make sure they add up got := sl.TotalLatency - expected := sl.ServiceLatency + sl.NATSLatency.TotalTime() + expected := sl.ServiceLatency + sl.NATSTotalTime() if got != expected { t.Fatalf("Numbers do not add up: %+v,\ngot: %v\nexpected: %v", sl, got, expected) } } +func extendedCheck(t *testing.T, lc *server.LatencyClient, eUser, appName, eServer string) { + t.Helper() + if lc.User != eUser { + t.Fatalf("Expected user of %q, got %q", eUser, lc.User) + } + if appName != "" && appName != lc.Name { + t.Fatalf("Expected appname of %q, got %q\n", appName, lc.Name) + } + if lc.IP == "" { + t.Fatalf("Expected non-empty IP") + } + if lc.CID < 1 || lc.CID > 20 { + t.Fatalf("Expected a CID in range, got %d", lc.CID) + } + if eServer != "" && eServer != lc.Server { + t.Fatalf("Expected server of %q, got %q", eServer, lc.Server) + } +} + +func noShareCheck(t *testing.T, lc *server.LatencyClient) { + t.Helper() + if lc.Name != "" { + t.Fatalf("appname should not have been shared, got %q", lc.Name) + } + if lc.User != "" { + t.Fatalf("user should not have been shared, got %q", lc.User) + } + if lc.IP != "" { + t.Fatalf("client ip should not have been shared, got %q", lc.IP) + } + if lc.CID != 0 { + t.Fatalf("client id should not have been shared, got %d", lc.CID) + } + if lc.Server != "" { + t.Fatalf("client' server should not have been shared, got %q", lc.Server) + } +} + func TestServiceLatencySingleServerConnect(t *testing.T) { sc := createSuperCluster(t, 3, 2) defer sc.shutdown() @@ -193,7 +255,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) { // Now we can setup and test, do single node only first. // This is the service provider. - nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22") defer nc.Close() // The service listener. @@ -212,8 +274,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) { // Send the request. start := time.Now() - _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second) - if err != nil { + if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil { t.Fatalf("Expected a response") } @@ -222,6 +283,26 @@ func TestServiceLatencySingleServerConnect(t *testing.T) { json.Unmarshal(rmsg.Data, &sl) checkServiceLatency(t, sl, start, serviceTime) + + rs := sc.clusters[0].servers[0] + extendedCheck(t, &sl.Responder, "foo", "service22", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) + + // Now make sure normal use case works with old request style. + nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar") + defer nc3.Close() + + start = time.Now() + if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + nc3.Close() + + checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, "foo", "service22", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) } // If a client has a longer RTT that the effective RTT for NATS + responder @@ -249,8 +330,9 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) { nc.Flush() // Requestor and processing - requestAndCheck := func(sopts *server.Options) { + requestAndCheck := func(cindex, sindex int) { t.Helper() + sopts := sc.clusters[cindex].opts[sindex] if nmsgs, _, err := rsub.Pending(); err != nil || nmsgs != 0 { t.Fatalf("Did not expect any latency results, got %d", nmsgs) @@ -290,12 +372,18 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) { // We want to test here that when the client requestor RTT is larger then the response time // we still deliver a requestor value > 0. // Now check that it is close to rtt. - if sl.NATSLatency.Requestor < rtt { - t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.NATSLatency.Requestor) + if sl.Requestor.RTT < rtt { + t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.Requestor.RTT) } if sl.TotalLatency < rtt { t.Fatalf("Expected total latency to be > %v, got %v", rtt, sl.TotalLatency) } + + rs := sc.clusters[0].servers[0] + extendedCheck(t, &sl.Responder, "foo", "", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) + // Check for trailing duplicates.. rmsg, err = rsub.NextMsg(100 * time.Millisecond) if err == nil { @@ -304,11 +392,11 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) { } // Check same server. - requestAndCheck(sc.clusters[0].opts[0]) + requestAndCheck(0, 0) // Check from remote server across GW. - requestAndCheck(sc.clusters[1].opts[1]) + requestAndCheck(1, 1) // Same cluster but different server - requestAndCheck(sc.clusters[0].opts[1]) + requestAndCheck(0, 1) } func connRTT(nc *nats.Conn) time.Duration { @@ -362,11 +450,15 @@ func TestServiceLatencyRemoteConnect(t *testing.T) { } json.Unmarshal(rmsg.Data, &sl) checkServiceLatency(t, sl, start, serviceTime) + rs := sc.clusters[0].servers[0] + extendedCheck(t, &sl.Responder, "foo", "", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) // Lastly here, we need to make sure we are properly tracking the extra hops. // We will make sure that NATS latency is close to what we see from the outside in terms of RTT. - if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt { - t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt) + if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt { + t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt) } // Gateway Requestor @@ -386,11 +478,14 @@ func TestServiceLatencyRemoteConnect(t *testing.T) { } json.Unmarshal(rmsg.Data, &sl) checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, "foo", "", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) // Lastly here, we need to make sure we are properly tracking the extra hops. // We will make sure that NATS latency is close to what we see from the outside in terms of RTT. - if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt { - t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt) + if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt { + t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt) } // Now turn off and make sure we no longer receive updates. @@ -530,9 +625,10 @@ func TestServiceLatencyWithName(t *testing.T) { json.Unmarshal(rmsg.Data, &sl) // Make sure we have AppName set. - if sl.AppName != "dlc22" { - t.Fatalf("Expected to have AppName set correctly, %q vs %q", "dlc22", sl.AppName) - } + rs := sc.clusters[0].servers[0] + extendedCheck(t, &sl.Responder, "foo", "dlc22", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) } func TestServiceLatencyWithNameMultiServer(t *testing.T) { @@ -563,9 +659,10 @@ func TestServiceLatencyWithNameMultiServer(t *testing.T) { json.Unmarshal(rmsg.Data, &sl) // Make sure we have AppName set. - if sl.AppName != "dlc22" { - t.Fatalf("Expected to have AppName set correctly, %q vs %q", "dlc22", sl.AppName) - } + rs := sc.clusters[0].servers[1] + extendedCheck(t, &sl.Responder, "foo", "dlc22", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) } func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) { @@ -623,7 +720,7 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) { var sl server.ServiceLatency json.Unmarshal(msg.Data, &sl) rlock.Lock() - results[sl.AppName] += sl.ServiceLatency + results[sl.Responder.Name] += sl.ServiceLatency rlock.Unlock() if r := atomic.AddInt32(&received, 1); r >= toSend { ch <- true @@ -730,7 +827,8 @@ func TestServiceLatencyWithJWT(t *testing.T) { // Create service provider. url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) - nc, err := nats.Connect(url, createUserCreds(t, s, svcKP)) + copt, pubUser := createUserCredsOption(t, s, svcKP) + nc, err := nats.Connect(url, copt, nats.Name("fooService")) if err != nil { t.Fatalf("Error on connect: %v", err) } @@ -781,6 +879,8 @@ func TestServiceLatencyWithJWT(t *testing.T) { serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"} updateAccount() + // Grab the service responder's user. + // Send the request. start := time.Now() _, err = nc2.Request("request", []byte("hello"), time.Second) @@ -795,6 +895,9 @@ func TestServiceLatencyWithJWT(t *testing.T) { } json.Unmarshal(rmsg.Data, &sl) checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, pubUser, "fooService", s.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) // Now we will remove tracking. Do this by simulating a JWT update. serviceExport.Latency = nil @@ -933,7 +1036,7 @@ func TestServiceLatencyRemoteConnectAdjustNegativeValues(t *testing.T) { if sl.ServiceLatency < 0 { t.Fatalf("Unexpected negative service latency value: %v", sl) } - if sl.NATSLatency.System < 0 { + if sl.SystemLatency < 0 { t.Fatalf("Unexpected negative system latency value: %v", sl) } } @@ -977,6 +1080,7 @@ func TestServiceLatencyFailureReportingSingleServer(t *testing.T) { // Test a request with no reply subject nc2.Publish("ngs.usage", []byte("1h")) sl := getMetricResult() + if sl.Status != 400 { t.Fatalf("Expected to get a bad request status [400], got %d", sl.Status) } @@ -1109,3 +1213,276 @@ func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) { time.Sleep(100 * time.Millisecond) } } + +// To test a bug rip@nats.io is seeing. +func TestServiceLatencyOldRequestStyleSingleServer(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + accounts: { + SVC: { + users: [ {user: svc, password: pass} ] + exports: [ { + service: "svc.echo" + accounts: [CLIENT] + latency: { + sampling: 100% + subject: latency.svc + } + } ] + }, + CLIENT: { + users: [{user: client, password: pass} ] + imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ] + }, + SYS: { users: [{user: admin, password: pass}] } + } + + system_account: SYS + `)) + defer os.Remove(conf) + + srv, opts := RunServerWithConfig(conf) + defer srv.Shutdown() + + // Responder + nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // Listen for metrics + rsub, _ := nc.SubscribeSync("latency.svc") + + // Requestor + nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Setup responder + serviceTime := 25 * time.Millisecond + sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) { + time.Sleep(serviceTime) + msg.Respond([]byte("world")) + }) + nc.Flush() + defer sub.Unsubscribe() + + // Send a request + start := time.Now() + if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + + var sl server.ServiceLatency + rmsg, _ := rsub.NextMsg(time.Second) + json.Unmarshal(rmsg.Data, &sl) + + checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, "svc", "", srv.Name()) + noShareCheck(t, &sl.Requestor) +} + +// Check we get the proper detailed information for the requestor when allowed. +func TestServiceLatencyRequestorSharesDetailedInfo(t *testing.T) { + sc := createSuperCluster(t, 3, 3) + defer sc.shutdown() + + // Now add in new service export to FOO and have bar import that with tracking enabled. + sc.setupLatencyTracking(t, 100) + sc.setResponseThreshold(t, 10*time.Millisecond) + sc.setImportShare(t) + + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + // Listen for metrics + rsub, err := nc.SubscribeSync("results") + if err != nil { + t.Fatal(err) + } + nc.Flush() + + getMetricResult := func() *server.ServiceLatency { + t.Helper() + rmsg, err := rsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Expected to receive latency metric: %v", err) + } + var sl server.ServiceLatency + if err = json.Unmarshal(rmsg.Data, &sl); err != nil { + t.Errorf("Unexpected error processing latency metric: %s", err) + } + return &sl + } + + cases := []struct { + ci, si int + desc string + }{ + {0, 0, "same server"}, + {0, 1, "same cluster, different server"}, + {1, 1, "different cluster"}, + } + + for _, cs := range cases { + // Select the server to send request from. + nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar") + defer nc2.Close() + + rs := sc.clusters[cs.ci].servers[cs.si] + + // Test a request with no reply subject + nc2.Publish("ngs.usage", []byte("1h")) + sl := getMetricResult() + if sl.Status != 400 { + t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status) + } + extendedCheck(t, &sl.Requestor, "bar", "", rs.Name()) + + // Proper request, but no responders. + nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond) + sl = getMetricResult() + if sl.Status != 503 { + t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status) + } + extendedCheck(t, &sl.Requestor, "bar", "", rs.Name()) + + // The service listener. Make it slow. 10ms is respThreshold, so take 2X + sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) { + time.Sleep(20 * time.Millisecond) + msg.Respond([]byte("22 msgs")) + }) + defer sub.Unsubscribe() + nc.Flush() + // Wait to propagate. + time.Sleep(100 * time.Millisecond) + + nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond) + sl = getMetricResult() + if sl.Status != 504 { + t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status) + } + extendedCheck(t, &sl.Requestor, "bar", "", rs.Name()) + + // Clean up subscriber and requestor + nc2.Close() + sub.Unsubscribe() + nc.Flush() + // Wait to propagate. + time.Sleep(100 * time.Millisecond) + } +} + +func TestServiceLatencyRequestorSharesConfig(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + accounts: { + SVC: { + users: [ {user: svc, password: pass} ] + exports: [ { + service: "svc.echo" + accounts: [CLIENT] + latency: { + sampling: 100% + subject: latency.svc + } + } ] + }, + CLIENT: { + users: [{user: client, password: pass} ] + imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ] + }, + SYS: { users: [{user: admin, password: pass}] } + } + + system_account: SYS + `)) + defer os.Remove(conf) + + srv, opts := RunServerWithConfig(conf) + defer srv.Shutdown() + + // Responder + nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // Listen for metrics + rsub, _ := nc.SubscribeSync("latency.svc") + + // Requestor + nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Setup responder + serviceTime := 25 * time.Millisecond + sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) { + time.Sleep(serviceTime) + msg.Respond([]byte("world")) + }) + nc.Flush() + defer sub.Unsubscribe() + + // Send a request + start := time.Now() + if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + + var sl server.ServiceLatency + rmsg, _ := rsub.NextMsg(time.Second) + json.Unmarshal(rmsg.Data, &sl) + + checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, "svc", "", srv.Name()) + extendedCheck(t, &sl.Requestor, "client", "", srv.Name()) + + // Check reload. + newConf := []byte(` + listen: 127.0.0.1:-1 + accounts: { + SVC: { + users: [ {user: svc, password: pass} ] + exports: [ { + service: "svc.echo" + accounts: [CLIENT] + latency: { + sampling: 100% + subject: latency.svc + } + } ] + }, + CLIENT: { + users: [{user: client, password: pass} ] + imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ] + }, + SYS: { users: [{user: admin, password: pass}] } + } + + system_account: SYS + `) + if err := ioutil.WriteFile(conf, newConf, 0600); err != nil { + t.Fatalf("Error rewriting server's config file: %v", err) + } + if err := srv.Reload(); err != nil { + t.Fatalf("Error on server reload: %v", err) + } + + start = time.Now() + if _, err = nc2.Request("SVC", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + + var sl2 server.ServiceLatency + rmsg, _ = rsub.NextMsg(time.Second) + json.Unmarshal(rmsg.Data, &sl2) + noShareCheck(t, &sl2.Requestor) +}