diff --git a/server/accounts.go b/server/accounts.go index fa7421fe..38a82363 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -113,6 +113,7 @@ type serviceImport struct { response bool invalid bool tracking bool + share bool } // This is used to record when we create a mapping for implicit service @@ -722,30 +723,37 @@ func (a *Account) IsExportServiceTracking(service string) bool { return false } -// NATSLatency represents the internal NATS latencies, including RTTs to clients. -type NATSLatency struct { - Requestor time.Duration `json:"req"` - Responder time.Duration `json:"resp"` - System time.Duration `json:"sys"` -} - -// TotalTime is a helper function that totals the NATS latencies. -func (nl *NATSLatency) TotalTime() time.Duration { - return nl.Requestor + nl.Responder + nl.System -} - // ServiceLatency is the JSON message sent out in response to latency tracking for -// exported services. +// an accounts exported services. type ServiceLatency struct { Status int `json:"status"` Error string `json:"description,omitempty"` - AppName string `json:"app,omitempty"` + Requestor LatencyClient `json:"requestor,omitempty"` + Responder LatencyClient `json:"responder,omitempty"` RequestStart time.Time `json:"start"` - ServiceLatency time.Duration `json:"svc"` - NATSLatency NATSLatency `json:"nats"` + ServiceLatency time.Duration `json:"service"` + SystemLatency time.Duration `json:"system"` TotalLatency time.Duration `json:"total"` } +// LatencyClient is the JSON message structure assigned to requestors and responders. +// Note that for a requestor, the only information shared by default is the RTT used +// to calculate the total latency. The requestor's account can designate to share +// the additional information in the service import. +type LatencyClient struct { + User string `json:"user,omitempty"` + Name string `json:"name,omitempty"` + RTT time.Duration `json:"rtt"` + IP string `json:"ip"` + CID uint64 `json:"cid"` + Server string `json:"server"` +} + +// NATSTotalTime is a helper function that totals the NATS latencies. +func (nl *ServiceLatency) NATSTotalTime() time.Duration { + return nl.Requestor.RTT + nl.Responder.RTT + nl.SystemLatency +} + // Merge function to merge m1 and m2 (requestor and responder) measurements // when there are two samples. This happens when the requestor and responder // are on different servers. @@ -754,10 +762,9 @@ type ServiceLatency struct { // m1 TotalLatency is correct, so use that. // Will use those to back into NATS latency. func (m1 *ServiceLatency) merge(m2 *ServiceLatency) { - m1.AppName = m2.AppName - m1.NATSLatency.System = m1.ServiceLatency - (m2.ServiceLatency + m2.NATSLatency.Responder) + m1.SystemLatency = m1.ServiceLatency - (m2.ServiceLatency + m2.Responder.RTT) m1.ServiceLatency = m2.ServiceLatency - m1.NATSLatency.Responder = m2.NATSLatency.Responder + m1.Responder = m2.Responder sanitizeLatencyMetric(m1) } @@ -770,8 +777,8 @@ func sanitizeLatencyMetric(sl *ServiceLatency) { if sl.ServiceLatency < 0 { sl.ServiceLatency = 0 } - if sl.NATSLatency.System < 0 { - sl.NATSLatency.System = 0 + if sl.SystemLatency < 0 { + sl.SystemLatency = 0 } } @@ -794,44 +801,34 @@ func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) { // Used to send a bad request metric when we do not have a reply subject func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *client) { sl := &ServiceLatency{ - Status: 400, - Error: "Bad Request", - RequestStart: time.Now().Add(-requestor.getRTTValue()).UTC(), + Status: 400, + Error: "Bad Request", + Requestor: requestor.getLatencyInfo(si.share), } + sl.RequestStart = time.Now().Add(-sl.Requestor.RTT).UTC() a.sendLatencyResult(si, sl) } // Used to send a latency result when the requestor interest was lost before the // response could be delivered. func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) { - var reqClientRTT time.Duration - if si.rc != nil { - reqClientRTT = si.rc.getRTTValue() - } - reqStart := time.Unix(0, si.ts-int64(reqClientRTT)) sl := &ServiceLatency{ - Status: 408, - Error: "Request Timeout", - RequestStart: reqStart.UTC(), - NATSLatency: NATSLatency{ - Requestor: reqClientRTT, - }, + Status: 408, + Error: "Request Timeout", } + if si.rc != nil { + sl.Requestor = si.rc.getLatencyInfo(si.share) + } + sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() a.sendLatencyResult(si, sl) } func (a *Account) sendBackendErrorTrackingLatency(si *serviceImport, reason rsiReason) { - var reqClientRTT time.Duration + sl := &ServiceLatency{} if si.rc != nil { - reqClientRTT = si.rc.getRTTValue() - } - reqStart := time.Unix(0, si.ts-int64(reqClientRTT)) - sl := &ServiceLatency{ - RequestStart: reqStart.UTC(), - NATSLatency: NATSLatency{ - Requestor: reqClientRTT, - }, + sl.Requestor = si.rc.getLatencyInfo(si.share) } + sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() if reason == rsiNoDelivery { sl.Status = 503 sl.Error = "Service Unavailable" @@ -853,37 +850,20 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool ts := time.Now() serviceRTT := time.Duration(ts.UnixNano() - si.ts) + requestor := si.rc - var requestor = si.rc - var reqClientRTT = requestor.getRTTValue() - var respClientRTT time.Duration - var appName string - - if responder != nil && responder.kind == CLIENT { - respClientRTT = responder.getRTTValue() - appName = responder.GetName() - } - - // We will estimate time when request left the requestor by time we received - // and the client RTT for the requestor. - reqStart := time.Unix(0, si.ts-int64(reqClientRTT)) sl := &ServiceLatency{ - Status: 200, - AppName: appName, - RequestStart: reqStart.UTC(), - ServiceLatency: serviceRTT - respClientRTT, - NATSLatency: NATSLatency{ - Requestor: reqClientRTT, - Responder: respClientRTT, - System: 0, - }, - TotalLatency: reqClientRTT + serviceRTT, + Status: 200, + Requestor: requestor.getLatencyInfo(si.share), + Responder: responder.getLatencyInfo(true), } - if respClientRTT > 0 { - sl.NATSLatency.System = time.Since(ts) - sl.TotalLatency += sl.NATSLatency.System + sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC() + sl.ServiceLatency = serviceRTT - sl.Responder.RTT + sl.TotalLatency = sl.Requestor.RTT + serviceRTT + if sl.Responder.RTT > 0 { + sl.SystemLatency = time.Since(ts) + sl.TotalLatency += sl.SystemLatency } - sanitizeLatencyMetric(sl) // If we are expecting a remote measurement, store our sl here. @@ -963,6 +943,23 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin return err } +// SetServiceImportSharing will allow sharing of information about requests with the export account. +// Used for service latency tracking at the moment. +func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error { + a.mu.Lock() + defer a.mu.Unlock() + if a.isClaimAccount() { + return fmt.Errorf("claim based accounts can not be updated directly") + } + for _, si := range a.imports.services { + if si.acc == destination && si.to == to { + si.share = allow + return nil + } + } + return fmt.Errorf("service import not found") +} + // AddServiceImport will add a route to an account to send published messages / requests // to the destination account. From is the local subject to map, To is the // subject that will appear on the destination account. Destination will need @@ -1209,7 +1206,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im } hasWC := subjectHasWildcard(from) - si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false} + si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false, false} a.imports.services[from] = si a.mu.Unlock() @@ -1491,6 +1488,9 @@ func (a *Account) ServiceExportResponseThreshold(export string) (time.Duration, func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time.Duration) error { a.mu.Lock() defer a.mu.Unlock() + if a.isClaimAccount() { + return fmt.Errorf("claim based accounts can not be updated directly") + } lrt := a.lowestServiceExportResponseTime() se := a.getServiceExport(export) if se == nil { @@ -1512,9 +1512,8 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp rt := osi.rt // dest is the requestor's account. a is the service responder with the export. - se := osi.se // Marked as internal here, that is how we distinguish. - si := &serviceImport{dest, nil, se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, false} + si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, false, osi.share} if a.exports.responses == nil { a.exports.responses = make(map[string]*serviceImport) @@ -1523,7 +1522,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp // Always grab time and make sure response threshold timer is running. si.ts = time.Now().UnixNano() - se.setResponseThresholdTimer() + osi.se.setResponseThresholdTimer() if rt == Singleton && tracking { si.latency = osi.latency @@ -2106,6 +2105,12 @@ func (s *Server) AccountResolver() AccountResolver { return ar } +// isClaimAccount returns if this account is backed by a JWT claim. +// Lock should be held. +func (a *Account) isClaimAccount() bool { + return a.claimJWT != "" +} + // updateAccountClaims will update an existing account with new claims. // This will replace any exports or imports previously defined. // Lock MUST NOT be held upon entry. diff --git a/server/auth.go b/server/auth.go index a5857250..d76327d9 100644 --- a/server/auth.go +++ b/server/auth.go @@ -459,6 +459,8 @@ func (s *Server) processClientOrLeafAuthentication(c *client) bool { if err := c.RegisterNkeyUser(nkey); err != nil { return false } + // Hold onto the user's public key. + c.pubKey = juc.Subject // Generate an event if we have a system account. s.accountConnectEvent(c) @@ -511,7 +513,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client) bool { if c.kind == CLIENT { if opts.Authorization != "" { - return comparePasswords(opts.Authorization, c.opts.Authorization) + return comparePasswords(opts.Authorization, c.opts.Token) } else if opts.Username != "" { if opts.Username != c.opts.Username { return false diff --git a/server/client.go b/server/client.go index df1114f7..b226a027 100644 --- a/server/client.go +++ b/server/client.go @@ -199,6 +199,7 @@ type client struct { opts clientOpts start time.Time nonce []byte + pubKey string nc net.Conn ncs string out outbound @@ -416,22 +417,22 @@ func (s *subscription) isClosed() bool { } type clientOpts struct { - Echo bool `json:"echo"` - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - TLSRequired bool `json:"tls_required"` - Nkey string `json:"nkey,omitempty"` - JWT string `json:"jwt,omitempty"` - Sig string `json:"sig,omitempty"` - Authorization string `json:"auth_token,omitempty"` - Username string `json:"user,omitempty"` - Password string `json:"pass,omitempty"` - Name string `json:"name"` - Lang string `json:"lang"` - Version string `json:"version"` - Protocol int `json:"protocol"` - Account string `json:"account,omitempty"` - AccountNew bool `json:"new_account,omitempty"` + Echo bool `json:"echo"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + TLSRequired bool `json:"tls_required"` + Nkey string `json:"nkey,omitempty"` + JWT string `json:"jwt,omitempty"` + Sig string `json:"sig,omitempty"` + Token string `json:"auth_token,omitempty"` + Username string `json:"user,omitempty"` + Password string `json:"pass,omitempty"` + Name string `json:"name"` + Lang string `json:"lang"` + Version string `json:"version"` + Protocol int `json:"protocol"` + Account string `json:"account,omitempty"` + AccountNew bool `json:"new_account,omitempty"` // Routes only Import *SubjectPermission `json:"import,omitempty"` @@ -2866,18 +2867,18 @@ func (c *client) processInboundClientMsg(msg []byte) bool { if rl != nil { delete(c.rrTracking.rmap, string(c.pa.subject)) } - rtt := c.rtt c.mu.Unlock() + if rl != nil { sl := &rl.M2 // Fill this in and send it off to the other side. - sl.AppName = c.opts.Name - sl.ServiceLatency = time.Since(sl.RequestStart) - rtt - sl.NATSLatency.Responder = rtt - sl.TotalLatency = sl.ServiceLatency + rtt + sl.Status = 200 + sl.Responder = c.getLatencyInfo(true) + sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT + sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT sanitizeLatencyMetric(sl) lsub := remoteLatencySubjectForResponse(c.pa.subject) - c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account + c.srv.sendInternalAccountMsg(nil, lsub, rl) // Send to SYS account } } @@ -2955,28 +2956,25 @@ func (c *client) handleGWReplyMap(msg []byte) bool { c.pa.subject = []byte(rm.ms) var rl *remoteLatency - var rtt time.Duration if c.rrTracking != nil { rl = c.rrTracking.rmap[string(c.pa.subject)] if rl != nil { delete(c.rrTracking.rmap, string(c.pa.subject)) } - rtt = c.rtt } c.mu.Unlock() if rl != nil { sl := &rl.M2 // Fill this in and send it off to the other side. - sl.AppName = c.opts.Name - sl.ServiceLatency = time.Since(sl.RequestStart) - rtt - sl.NATSLatency.Responder = rtt - sl.TotalLatency = sl.ServiceLatency + rtt + sl.Status = 200 + sl.Responder = c.getLatencyInfo(true) + sl.ServiceLatency = time.Since(sl.RequestStart) - sl.Responder.RTT + sl.TotalLatency = sl.ServiceLatency + sl.Responder.RTT sanitizeLatencyMetric(sl) - lsub := remoteLatencySubjectForResponse(c.pa.subject) - c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account + c.srv.sendInternalAccountMsg(nil, lsub, rl) // Send to SYS account } // Check for leaf nodes @@ -3040,6 +3038,17 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt to = string(c.pa.subject) } + // Check to see if this was a bad request with no reply and we were supposed to be tracking. + if !si.response && si.latency != nil && len(c.pa.reply) == 0 && shouldSample(si.latency) { + si.acc.sendBadRequestTrackingLatency(si, c) + } + + // Send tracking info here if we are tracking this request/response. + var didSendTL bool + if si.tracking { + didSendTL = acc.sendTrackingLatency(si, c) + } + // FIXME(dlc) - Do L1 cache trick like normal client? rr := si.acc.sl.Match(to) @@ -3079,17 +3088,10 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // We will remove if we did not deliver, or if we are a response service import and we are // a singleton, or we have an EOF message. shouldRemove := !didDeliver || (si.response && (si.rt == Singleton || len(msg) == LEN_CR_LF)) - - // Calculate tracking info here if we are tracking this request/response. - if si.tracking { - shouldRemove = acc.sendTrackingLatency(si, c) + // If we are tracking and we did not actually send the latency info we need to suppress the removal. + if si.tracking && !didSendTL { + shouldRemove = false } - - // Check to see if this was a bad request with no reply and we were supposed to be tracking. - if !si.response && si.latency != nil && len(c.pa.reply) == 0 && shouldSample(si.latency) { - si.acc.sendBadRequestTrackingLatency(si, c) - } - // If we are streamed or chunked we need to update our timestamp to avoid cleanup. if si.rt != Singleton && didDeliver { acc.mu.Lock() @@ -3696,7 +3698,7 @@ func (c *client) teardownConn() { if c.srv != nil { if c.kind == ROUTER || c.kind == GATEWAY { c.Noticef("%s connection closed", c.typeString()) - } else { // Client, System, Jetstream and Leafnode connections. + } else { // Client, System, Jetstream, Account and Leafnode connections. c.Debugf("%s connection closed", c.typeString()) } } @@ -3971,13 +3973,53 @@ func (c *client) pruneClosedSubFromPerAccountCache() { } } +// Grabs the information for latency reporting. +func (c *client) getLatencyInfo(detailed bool) LatencyClient { + var lc LatencyClient + if c == nil || c.kind != CLIENT { + return lc + } + sn := c.srv.Name() + c.mu.Lock() + lc.RTT = c.rtt + if detailed { + lc.Name = c.opts.Name + lc.User = c.getRawAuthUser() + lc.IP = c.host + lc.CID = c.cid + lc.Server = sn + } + c.mu.Unlock() + return lc +} + +// getRAwAuthUser returns the raw auth user for the client. +// Lock should be held. +func (c *client) getRawAuthUser() string { + switch { + case c.opts.Nkey != "": + return c.opts.Nkey + case c.opts.Username != "": + return c.opts.Username + case c.opts.JWT != "": + return c.pubKey + case c.opts.Token != "": + return c.opts.Token + default: + return "" + } +} + // getAuthUser returns the auth user for the client. +// Lock should be held. func (c *client) getAuthUser() string { switch { case c.opts.Nkey != "": return fmt.Sprintf("Nkey %q", c.opts.Nkey) case c.opts.Username != "": return fmt.Sprintf("User %q", c.opts.Username) + case c.opts.JWT != "": + return fmt.Sprintf("JWT User %q", c.pubKey) default: return `User "N/A"` } diff --git a/server/client_test.go b/server/client_test.go index 5379cd59..dc9dee70 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -256,7 +256,7 @@ func TestClientConnect(t *testing.T) { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } - if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) { + if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Token: "YZZ222", Name: "router"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } } diff --git a/server/events.go b/server/events.go index e955f211..f94377fb 100644 --- a/server/events.go +++ b/server/events.go @@ -1223,6 +1223,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st } m1 := si.m1 m2 := rl.M2 + lsub := si.latency.subject acc.mu.RUnlock() @@ -1255,7 +1256,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st // Make sure we remove the entry here. acc.removeServiceImport(si.from) // Send the metrics - s.sendInternalAccountMsg(acc, lsub, &m1) + s.sendInternalAccountMsg(acc, lsub, m1) } // This is used for all inbox replies so that we do not send supercluster wide interest diff --git a/server/opts.go b/server/opts.go index 17eb1809..eaca5cee 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1676,10 +1676,11 @@ type importStream struct { } type importService struct { - acc *Account - an string - sub string - to string + acc *Account + an string + sub string + to string + share bool } // Checks if an account name is reserved. @@ -1925,6 +1926,11 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, &configErr{tk, msg}) continue } + if err := service.acc.SetServiceImportSharing(ta, service.sub, service.share); err != nil { + msg := fmt.Sprintf("Error setting service import sharing %q: %v", service.sub, err) + *errors = append(*errors, &configErr{tk, msg}) + continue + } } return nil @@ -2298,6 +2304,7 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo curStream *importStream curService *importService pre, to string + share bool lt token ) defer convertPanicToErrorList(<, errors) @@ -2366,6 +2373,7 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo } else { curService.to = subject } + curService.share = share case "prefix": pre = mv.(string) if curStream != nil { @@ -2376,6 +2384,11 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo if curService != nil { curService.to = to } + case "share": + share = mv.(bool) + if curService != nil { + curService.share = share + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/test/operator_test.go b/test/operator_test.go index d813faad..a69e7b3c 100644 --- a/test/operator_test.go +++ b/test/operator_test.go @@ -178,6 +178,12 @@ func createAccount(t *testing.T, s *server.Server) (*server.Account, nkeys.KeyPa } func createUserCreds(t *testing.T, s *server.Server, akp nkeys.KeyPair) nats.Option { + t.Helper() + opt, _ := createUserCredsOption(t, s, akp) + return opt +} + +func createUserCredsOption(t *testing.T, s *server.Server, akp nkeys.KeyPair) (nats.Option, string) { t.Helper() kp, _ := nkeys.CreateUser() pub, _ := kp.PublicKey() @@ -193,7 +199,7 @@ func createUserCreds(t *testing.T, s *server.Server, akp nkeys.KeyPair) nats.Opt sig, _ := kp.Sign(nonce) return sig, nil } - return nats.UserJWT(userCB, sigCB) + return nats.UserJWT(userCB, sigCB), pub } func TestOperatorServer(t *testing.T) { diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 084c9791..fdd39a77 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -16,6 +16,7 @@ package test import ( "encoding/json" "fmt" + "io/ioutil" "math/rand" "os" "strings" @@ -81,6 +82,25 @@ func (sc *supercluster) setResponseThreshold(t *testing.T, maxTime time.Duration } } +func (sc *supercluster) setImportShare(t *testing.T) { + t.Helper() + for _, c := range sc.clusters { + for _, s := range c.servers { + foo, err := s.LookupAccount("FOO") + if err != nil { + t.Fatalf("Error looking up account 'FOO': %v", err) + } + bar, err := s.LookupAccount("BAR") + if err != nil { + t.Fatalf("Error looking up account 'BAR': %v", err) + } + if err := bar.SetServiceImportSharing(foo, "ngs.usage.bar", true); err != nil { + t.Fatalf("Error setting import sharing: %v", err) + } + } + } +} + func (sc *supercluster) setupLatencyTracking(t *testing.T, p int) { t.Helper() for _, c := range sc.clusters { @@ -155,6 +175,10 @@ func clientConnectOldRequest(t *testing.T, opts *server.Options, user string) *n func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time, serviceTime time.Duration) { t.Helper() + if sl.Status != 200 { + t.Fatalf("Bad status received, wanted 200 got %d", sl.Status) + } + serviceTime = serviceTime.Round(time.Millisecond) startDelta := sl.RequestStart.Sub(start) @@ -169,21 +193,59 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time } // We should have NATS latency here that is non-zero with real clients. - if sl.NATSLatency.Requestor == 0 { + if sl.Requestor.RTT == 0 { t.Fatalf("Expected non-zero NATS Requestor latency") } - if sl.NATSLatency.Responder == 0 { + if sl.Responder.RTT == 0 { t.Fatalf("Expected non-zero NATS Requestor latency") } // Make sure they add up got := sl.TotalLatency - expected := sl.ServiceLatency + sl.NATSLatency.TotalTime() + expected := sl.ServiceLatency + sl.NATSTotalTime() if got != expected { t.Fatalf("Numbers do not add up: %+v,\ngot: %v\nexpected: %v", sl, got, expected) } } +func extendedCheck(t *testing.T, lc *server.LatencyClient, eUser, appName, eServer string) { + t.Helper() + if lc.User != eUser { + t.Fatalf("Expected user of %q, got %q", eUser, lc.User) + } + if appName != "" && appName != lc.Name { + t.Fatalf("Expected appname of %q, got %q\n", appName, lc.Name) + } + if lc.IP == "" { + t.Fatalf("Expected non-empty IP") + } + if lc.CID < 1 || lc.CID > 20 { + t.Fatalf("Expected a CID in range, got %d", lc.CID) + } + if eServer != "" && eServer != lc.Server { + t.Fatalf("Expected server of %q, got %q", eServer, lc.Server) + } +} + +func noShareCheck(t *testing.T, lc *server.LatencyClient) { + t.Helper() + if lc.Name != "" { + t.Fatalf("appname should not have been shared, got %q", lc.Name) + } + if lc.User != "" { + t.Fatalf("user should not have been shared, got %q", lc.User) + } + if lc.IP != "" { + t.Fatalf("client ip should not have been shared, got %q", lc.IP) + } + if lc.CID != 0 { + t.Fatalf("client id should not have been shared, got %d", lc.CID) + } + if lc.Server != "" { + t.Fatalf("client' server should not have been shared, got %q", lc.Server) + } +} + func TestServiceLatencySingleServerConnect(t *testing.T) { sc := createSuperCluster(t, 3, 2) defer sc.shutdown() @@ -193,7 +255,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) { // Now we can setup and test, do single node only first. // This is the service provider. - nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22") defer nc.Close() // The service listener. @@ -212,8 +274,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) { // Send the request. start := time.Now() - _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second) - if err != nil { + if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil { t.Fatalf("Expected a response") } @@ -222,6 +283,26 @@ func TestServiceLatencySingleServerConnect(t *testing.T) { json.Unmarshal(rmsg.Data, &sl) checkServiceLatency(t, sl, start, serviceTime) + + rs := sc.clusters[0].servers[0] + extendedCheck(t, &sl.Responder, "foo", "service22", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) + + // Now make sure normal use case works with old request style. + nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar") + defer nc3.Close() + + start = time.Now() + if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + nc3.Close() + + checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, "foo", "service22", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) } // If a client has a longer RTT that the effective RTT for NATS + responder @@ -249,8 +330,9 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) { nc.Flush() // Requestor and processing - requestAndCheck := func(sopts *server.Options) { + requestAndCheck := func(cindex, sindex int) { t.Helper() + sopts := sc.clusters[cindex].opts[sindex] if nmsgs, _, err := rsub.Pending(); err != nil || nmsgs != 0 { t.Fatalf("Did not expect any latency results, got %d", nmsgs) @@ -290,12 +372,18 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) { // We want to test here that when the client requestor RTT is larger then the response time // we still deliver a requestor value > 0. // Now check that it is close to rtt. - if sl.NATSLatency.Requestor < rtt { - t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.NATSLatency.Requestor) + if sl.Requestor.RTT < rtt { + t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.Requestor.RTT) } if sl.TotalLatency < rtt { t.Fatalf("Expected total latency to be > %v, got %v", rtt, sl.TotalLatency) } + + rs := sc.clusters[0].servers[0] + extendedCheck(t, &sl.Responder, "foo", "", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) + // Check for trailing duplicates.. rmsg, err = rsub.NextMsg(100 * time.Millisecond) if err == nil { @@ -304,11 +392,11 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) { } // Check same server. - requestAndCheck(sc.clusters[0].opts[0]) + requestAndCheck(0, 0) // Check from remote server across GW. - requestAndCheck(sc.clusters[1].opts[1]) + requestAndCheck(1, 1) // Same cluster but different server - requestAndCheck(sc.clusters[0].opts[1]) + requestAndCheck(0, 1) } func connRTT(nc *nats.Conn) time.Duration { @@ -362,11 +450,15 @@ func TestServiceLatencyRemoteConnect(t *testing.T) { } json.Unmarshal(rmsg.Data, &sl) checkServiceLatency(t, sl, start, serviceTime) + rs := sc.clusters[0].servers[0] + extendedCheck(t, &sl.Responder, "foo", "", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) // Lastly here, we need to make sure we are properly tracking the extra hops. // We will make sure that NATS latency is close to what we see from the outside in terms of RTT. - if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt { - t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt) + if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt { + t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt) } // Gateway Requestor @@ -386,11 +478,14 @@ func TestServiceLatencyRemoteConnect(t *testing.T) { } json.Unmarshal(rmsg.Data, &sl) checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, "foo", "", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) // Lastly here, we need to make sure we are properly tracking the extra hops. // We will make sure that NATS latency is close to what we see from the outside in terms of RTT. - if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt { - t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt) + if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt { + t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt) } // Now turn off and make sure we no longer receive updates. @@ -530,9 +625,10 @@ func TestServiceLatencyWithName(t *testing.T) { json.Unmarshal(rmsg.Data, &sl) // Make sure we have AppName set. - if sl.AppName != "dlc22" { - t.Fatalf("Expected to have AppName set correctly, %q vs %q", "dlc22", sl.AppName) - } + rs := sc.clusters[0].servers[0] + extendedCheck(t, &sl.Responder, "foo", "dlc22", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) } func TestServiceLatencyWithNameMultiServer(t *testing.T) { @@ -563,9 +659,10 @@ func TestServiceLatencyWithNameMultiServer(t *testing.T) { json.Unmarshal(rmsg.Data, &sl) // Make sure we have AppName set. - if sl.AppName != "dlc22" { - t.Fatalf("Expected to have AppName set correctly, %q vs %q", "dlc22", sl.AppName) - } + rs := sc.clusters[0].servers[1] + extendedCheck(t, &sl.Responder, "foo", "dlc22", rs.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) } func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) { @@ -623,7 +720,7 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) { var sl server.ServiceLatency json.Unmarshal(msg.Data, &sl) rlock.Lock() - results[sl.AppName] += sl.ServiceLatency + results[sl.Responder.Name] += sl.ServiceLatency rlock.Unlock() if r := atomic.AddInt32(&received, 1); r >= toSend { ch <- true @@ -730,7 +827,8 @@ func TestServiceLatencyWithJWT(t *testing.T) { // Create service provider. url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) - nc, err := nats.Connect(url, createUserCreds(t, s, svcKP)) + copt, pubUser := createUserCredsOption(t, s, svcKP) + nc, err := nats.Connect(url, copt, nats.Name("fooService")) if err != nil { t.Fatalf("Error on connect: %v", err) } @@ -781,6 +879,8 @@ func TestServiceLatencyWithJWT(t *testing.T) { serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"} updateAccount() + // Grab the service responder's user. + // Send the request. start := time.Now() _, err = nc2.Request("request", []byte("hello"), time.Second) @@ -795,6 +895,9 @@ func TestServiceLatencyWithJWT(t *testing.T) { } json.Unmarshal(rmsg.Data, &sl) checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, pubUser, "fooService", s.Name()) + // Normally requestor's don't share + noShareCheck(t, &sl.Requestor) // Now we will remove tracking. Do this by simulating a JWT update. serviceExport.Latency = nil @@ -933,7 +1036,7 @@ func TestServiceLatencyRemoteConnectAdjustNegativeValues(t *testing.T) { if sl.ServiceLatency < 0 { t.Fatalf("Unexpected negative service latency value: %v", sl) } - if sl.NATSLatency.System < 0 { + if sl.SystemLatency < 0 { t.Fatalf("Unexpected negative system latency value: %v", sl) } } @@ -977,6 +1080,7 @@ func TestServiceLatencyFailureReportingSingleServer(t *testing.T) { // Test a request with no reply subject nc2.Publish("ngs.usage", []byte("1h")) sl := getMetricResult() + if sl.Status != 400 { t.Fatalf("Expected to get a bad request status [400], got %d", sl.Status) } @@ -1109,3 +1213,276 @@ func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) { time.Sleep(100 * time.Millisecond) } } + +// To test a bug rip@nats.io is seeing. +func TestServiceLatencyOldRequestStyleSingleServer(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + accounts: { + SVC: { + users: [ {user: svc, password: pass} ] + exports: [ { + service: "svc.echo" + accounts: [CLIENT] + latency: { + sampling: 100% + subject: latency.svc + } + } ] + }, + CLIENT: { + users: [{user: client, password: pass} ] + imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ] + }, + SYS: { users: [{user: admin, password: pass}] } + } + + system_account: SYS + `)) + defer os.Remove(conf) + + srv, opts := RunServerWithConfig(conf) + defer srv.Shutdown() + + // Responder + nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // Listen for metrics + rsub, _ := nc.SubscribeSync("latency.svc") + + // Requestor + nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Setup responder + serviceTime := 25 * time.Millisecond + sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) { + time.Sleep(serviceTime) + msg.Respond([]byte("world")) + }) + nc.Flush() + defer sub.Unsubscribe() + + // Send a request + start := time.Now() + if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + + var sl server.ServiceLatency + rmsg, _ := rsub.NextMsg(time.Second) + json.Unmarshal(rmsg.Data, &sl) + + checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, "svc", "", srv.Name()) + noShareCheck(t, &sl.Requestor) +} + +// Check we get the proper detailed information for the requestor when allowed. +func TestServiceLatencyRequestorSharesDetailedInfo(t *testing.T) { + sc := createSuperCluster(t, 3, 3) + defer sc.shutdown() + + // Now add in new service export to FOO and have bar import that with tracking enabled. + sc.setupLatencyTracking(t, 100) + sc.setResponseThreshold(t, 10*time.Millisecond) + sc.setImportShare(t) + + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + // Listen for metrics + rsub, err := nc.SubscribeSync("results") + if err != nil { + t.Fatal(err) + } + nc.Flush() + + getMetricResult := func() *server.ServiceLatency { + t.Helper() + rmsg, err := rsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Expected to receive latency metric: %v", err) + } + var sl server.ServiceLatency + if err = json.Unmarshal(rmsg.Data, &sl); err != nil { + t.Errorf("Unexpected error processing latency metric: %s", err) + } + return &sl + } + + cases := []struct { + ci, si int + desc string + }{ + {0, 0, "same server"}, + {0, 1, "same cluster, different server"}, + {1, 1, "different cluster"}, + } + + for _, cs := range cases { + // Select the server to send request from. + nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar") + defer nc2.Close() + + rs := sc.clusters[cs.ci].servers[cs.si] + + // Test a request with no reply subject + nc2.Publish("ngs.usage", []byte("1h")) + sl := getMetricResult() + if sl.Status != 400 { + t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status) + } + extendedCheck(t, &sl.Requestor, "bar", "", rs.Name()) + + // Proper request, but no responders. + nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond) + sl = getMetricResult() + if sl.Status != 503 { + t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status) + } + extendedCheck(t, &sl.Requestor, "bar", "", rs.Name()) + + // The service listener. Make it slow. 10ms is respThreshold, so take 2X + sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) { + time.Sleep(20 * time.Millisecond) + msg.Respond([]byte("22 msgs")) + }) + defer sub.Unsubscribe() + nc.Flush() + // Wait to propagate. + time.Sleep(100 * time.Millisecond) + + nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond) + sl = getMetricResult() + if sl.Status != 504 { + t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status) + } + extendedCheck(t, &sl.Requestor, "bar", "", rs.Name()) + + // Clean up subscriber and requestor + nc2.Close() + sub.Unsubscribe() + nc.Flush() + // Wait to propagate. + time.Sleep(100 * time.Millisecond) + } +} + +func TestServiceLatencyRequestorSharesConfig(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + accounts: { + SVC: { + users: [ {user: svc, password: pass} ] + exports: [ { + service: "svc.echo" + accounts: [CLIENT] + latency: { + sampling: 100% + subject: latency.svc + } + } ] + }, + CLIENT: { + users: [{user: client, password: pass} ] + imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ] + }, + SYS: { users: [{user: admin, password: pass}] } + } + + system_account: SYS + `)) + defer os.Remove(conf) + + srv, opts := RunServerWithConfig(conf) + defer srv.Shutdown() + + // Responder + nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // Listen for metrics + rsub, _ := nc.SubscribeSync("latency.svc") + + // Requestor + nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Setup responder + serviceTime := 25 * time.Millisecond + sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) { + time.Sleep(serviceTime) + msg.Respond([]byte("world")) + }) + nc.Flush() + defer sub.Unsubscribe() + + // Send a request + start := time.Now() + if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + + var sl server.ServiceLatency + rmsg, _ := rsub.NextMsg(time.Second) + json.Unmarshal(rmsg.Data, &sl) + + checkServiceLatency(t, sl, start, serviceTime) + extendedCheck(t, &sl.Responder, "svc", "", srv.Name()) + extendedCheck(t, &sl.Requestor, "client", "", srv.Name()) + + // Check reload. + newConf := []byte(` + listen: 127.0.0.1:-1 + accounts: { + SVC: { + users: [ {user: svc, password: pass} ] + exports: [ { + service: "svc.echo" + accounts: [CLIENT] + latency: { + sampling: 100% + subject: latency.svc + } + } ] + }, + CLIENT: { + users: [{user: client, password: pass} ] + imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ] + }, + SYS: { users: [{user: admin, password: pass}] } + } + + system_account: SYS + `) + if err := ioutil.WriteFile(conf, newConf, 0600); err != nil { + t.Fatalf("Error rewriting server's config file: %v", err) + } + if err := srv.Reload(); err != nil { + t.Fatalf("Error on server reload: %v", err) + } + + start = time.Now() + if _, err = nc2.Request("SVC", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + + var sl2 server.ServiceLatency + rmsg, _ = rsub.NextMsg(time.Second) + json.Unmarshal(rmsg.Data, &sl2) + noShareCheck(t, &sl2.Requestor) +}