From 94f143ccce8e5cb9946fd18c803a2fd5266e25f9 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 11 Sep 2019 16:28:53 -0700 Subject: [PATCH 1/2] Latency tracking updates. Will now breakout the internal NATS latency to show requestor client RTT, responder client RTT and any internal latency caused by hopping between servers, etc. Signed-off-by: Derek Collison --- server/accounts.go | 52 +++++++++++++++++++++++++++--------- server/client.go | 37 ++++++++++++------------- server/events.go | 6 ++--- server/gateway.go | 2 +- test/service_latency_test.go | 19 +++++++------ 5 files changed, 71 insertions(+), 45 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 97556b1e..da6df407 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -486,12 +486,13 @@ func (a *Account) IsExportService(service string) bool { // IsExportServiceTracking will indicate if given publish subject is an export service with tracking enabled. func (a *Account) IsExportServiceTracking(service string) bool { a.mu.RLock() - defer a.mu.RUnlock() ea, ok := a.exports.services[service] if ok && ea == nil { + a.mu.RUnlock() return false } if ok && ea != nil && ea.latency != nil { + a.mu.RUnlock() return true } // FIXME(dlc) - Might want to cache this is in the hot path checking for @@ -499,23 +500,51 @@ func (a *Account) IsExportServiceTracking(service string) bool { tokens := strings.Split(service, tsep) for subj, ea := range a.exports.services { if isSubsetMatch(tokens, subj) && ea != nil && ea.latency != nil { + a.mu.RUnlock() return true } } + a.mu.RUnlock() return false } +// NATSLatency represents the internal NATS latencies, including RTTs to clients. +type NATSLatency struct { + Requestor time.Duration `json:"request_rtt"` + Responder time.Duration `json:"responder_rtt"` + System time.Duration `json:"system_latency"` +} + +// TotalTime is a helper function that totals the NATS latencies. +func (nl *NATSLatency) TotalTime() time.Duration { + return nl.Requestor + nl.Responder + nl.System +} + // ServiceLatency is the JSON message sent out in response to latency tracking for // exported services. type ServiceLatency struct { AppName string `json:"app_name,omitempty"` RequestStart time.Time `json:"request_start"` ServiceLatency time.Duration `json:"service_latency"` - NATSLatency time.Duration `json:"nats_latency"` + NATSLatency NATSLatency `json:"nats_latency"` TotalLatency time.Duration `json:"total_latency"` } -// Used for transporting remote laytency measurements. +// Merge function to merge m1 and m2 (requestor and responder) measurements +// when there are two samples. This happens when the requestor and responder +// are on different servers. +// +// m2 ServiceLatency is correct, so use that. +// m1 TotalLatency is correct, so use that. +// Will use those to back into NATS latency. +func (m1 *ServiceLatency) merge(m2 *ServiceLatency) { + m1.AppName = m2.AppName + m1.NATSLatency.System = m1.ServiceLatency - (m2.ServiceLatency + m2.NATSLatency.Responder) + m1.ServiceLatency = m2.ServiceLatency + m1.NATSLatency.Responder = m2.NATSLatency.Responder +} + +// Used for transporting remote latency measurements. type remoteLatency struct { Account string `json:"account"` ReqId string `json:"req_id"` @@ -532,7 +561,6 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c var ( reqClientRTT = requestor.getRTTValue() - natsRTT = reqClientRTT respClientRTT time.Duration appName string ) @@ -541,7 +569,6 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c if responder != nil && responder.kind == CLIENT { respClientRTT = responder.getRTTValue() - natsRTT += respClientRTT appName = responder.GetName() } @@ -552,8 +579,12 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c AppName: appName, RequestStart: reqStart, ServiceLatency: serviceRTT - respClientRTT, - NATSLatency: natsRTT, - TotalLatency: reqClientRTT + serviceRTT, + NATSLatency: NATSLatency{ + Requestor: reqClientRTT, + Responder: respClientRTT, + System: 0, + }, + TotalLatency: reqClientRTT + serviceRTT, } // If we are expecting a remote measurement, store our sl here. @@ -564,11 +595,8 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c if expectRemoteM2 { si.acc.mu.Lock() if si.m1 != nil { - m2 := si.m1 - m1 := &sl - m1.AppName = m2.AppName - m1.ServiceLatency = m2.ServiceLatency - m1.NATSLatency = m1.TotalLatency - m1.ServiceLatency + m1, m2 := &sl, si.m1 + m1.merge(m2) si.acc.mu.Unlock() a.srv.sendInternalAccountMsg(a, si.latency.subject, m1) return true diff --git a/server/client.go b/server/client.go index 3ec26f5c..fb317957 100644 --- a/server/client.go +++ b/server/client.go @@ -2124,7 +2124,7 @@ var needFlush = struct{}{} // deliverMsg will deliver a message to a matching subscription and its underlying client. // We process all connection/client types. mh is the part that will be protocol/client specific. -func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { +func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte) bool { if sub.client == nil { return false } @@ -2139,7 +2139,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { // Check if we have a subscribe deny clause. This will trigger us to check the subject // for a match against the denied subjects. - if client.mperms != nil && client.checkDenySub(string(c.pa.subject)) { + if client.mperms != nil && client.checkDenySub(string(subject)) { client.mu.Unlock() return false } @@ -2204,7 +2204,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { if client.kind == SYSTEM { s := client.srv client.mu.Unlock() - s.deliverInternalMsg(sub, c.pa.subject, c.pa.reply, msg[:msgSize]) + s.deliverInternalMsg(sub, subject, c.pa.reply, msg[:msgSize]) return true } @@ -2224,18 +2224,16 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { // Do a fast check here to see if we should be tracking this from a latency // persepective. This will be for a request being received for an exported service. // This needs to be from a non-client (otherwise tracking happens at requestor). - if c.kind != CLIENT && client.kind == CLIENT && len(c.pa.reply) > minReplyLen { + if client.kind == CLIENT && len(c.pa.reply) > minReplyLen { + // If we do not have a registered RTT queue that up now. + if client.rtt == 0 && client.flags.isSet(connectReceived) { + client.sendPing() + } // FIXME(dlc) - We may need to optimize this. - if client.acc.IsExportServiceTracking(string(c.pa.subject)) { - // If we do not have a registered RTT queue that up now. - if client.rtt == 0 && c.flags.isSet(connectReceived) { - client.sendPing() - } - // We will have tagged this with a suffix ('.T') if we are tracking. This is - // needed from sampling. Not all will be tracked. - if isTrackedReply(c.pa.reply) { - client.trackRemoteReply(string(c.pa.reply)) - } + // We will have tagged this with a suffix ('.T') if we are tracking. This is + // needed from sampling. Not all will be tracked. + if c.kind != CLIENT && client.acc.IsExportServiceTracking(string(subject)) && isTrackedReply(c.pa.reply) { + client.trackRemoteReply(string(c.pa.reply)) } } @@ -2531,8 +2529,8 @@ func (c *client) processInboundClientMsg(msg []byte) { // Fill this in and send it off to the other side. sl.AppName = c.opts.Name sl.ServiceLatency = time.Since(sl.RequestStart) - rtt - sl.NATSLatency = rtt - sl.TotalLatency = sl.ServiceLatency + sl.NATSLatency + sl.NATSLatency.Responder = rtt + sl.TotalLatency = sl.ServiceLatency + rtt lsub := remoteLatencySubjectForResponse(c.pa.subject) c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account } @@ -2624,7 +2622,6 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { // We have a service import that we are tracking but have not established RTT c.sendRTTPing() } - // If this is a client or leaf connection and we are in gateway mode, // we need to send RS+ to our local cluster and possibly to inbound // GW connections for which we are in interest-only mode. @@ -2757,7 +2754,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, } // Normal delivery mh := c.msgHeader(msgh[:si], sub, reply) - c.deliverMsg(sub, mh, msg) + c.deliverMsg(sub, subject, mh, msg) } // Set these up to optionally filter based on the queue lists. @@ -2844,7 +2841,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, } mh := c.msgHeader(msgh[:si], sub, reply) - if c.deliverMsg(sub, mh, msg) { + if c.deliverMsg(sub, subject, mh, msg) { // Clear rsub rsub = nil if flags&pmrCollectQueueNames != 0 { @@ -2908,7 +2905,7 @@ sendToRoutesOrLeafs: } mh = append(mh, c.pa.szb...) mh = append(mh, _CRLF_...) - c.deliverMsg(rt.sub, mh, msg) + c.deliverMsg(rt.sub, subject, mh, msg) } return queues } diff --git a/server/events.go b/server/events.go index eb5f69d6..968e342a 100644 --- a/server/events.go +++ b/server/events.go @@ -1033,7 +1033,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, subject, _ string, msg [ lsub := si.latency.subject acc.mu.RUnlock() - // So we have no processed the response tracking measurement yet. + // So we have not processed the response tracking measurement yet. if m1 == nil { acc.mu.Lock() // Double check since could have slipped in. @@ -1052,9 +1052,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, subject, _ string, msg [ // M2 ServiceLatency is correct, so use that. // M1 TotalLatency is correct, so use that. // Will use those to back into NATS latency. - m1.AppName = m2.AppName - m1.ServiceLatency = m2.ServiceLatency - m1.NATSLatency = m1.TotalLatency - m1.ServiceLatency + m1.merge(&m2) // Make sure we remove the entry here. si.acc.removeServiceImport(si.from) diff --git a/server/gateway.go b/server/gateway.go index 9d0f3a4e..603269b6 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2356,7 +2356,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr sub.nm, sub.max = 0, 0 sub.client = gwc sub.subject = c.pa.subject - c.deliverMsg(sub, mh, msg) + c.deliverMsg(sub, c.pa.subject, mh, msg) } // Done with subscription, put back to pool. We don't need // to reset content since we explicitly set when using it. diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 7f5b2339..08f02182 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -115,15 +115,18 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time if sl.TotalLatency < sl.ServiceLatency { t.Fatalf("Bad total latency: %v", sl.ServiceLatency) } + // We should have NATS latency here that is non-zero with real clients. - if sl.NATSLatency == 0 { - t.Fatalf("Expected non-zero NATS latency") + if sl.NATSLatency.Requestor == 0 { + t.Fatalf("Expected non-zero NATS Requestor latency") + } + if sl.NATSLatency.Responder == 0 { + t.Fatalf("Expected non-zero NATS Requestor latency") } // Make sure they add up - if sl.TotalLatency != sl.ServiceLatency+sl.NATSLatency { + if sl.TotalLatency != sl.ServiceLatency+sl.NATSLatency.TotalTime() { t.Fatalf("Numbers do not add up: %+v", sl) } - } func TestServiceLatencySingleServerConnect(t *testing.T) { @@ -220,8 +223,8 @@ func TestServiceLatencyRemoteConnect(t *testing.T) { // Lastly here, we need to make sure we are properly tracking the extra hops. // We will make sure that NATS latency is close to what we see from the outside in terms of RTT. - if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency < crtt { - t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency, crtt) + if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt { + t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt) } // Gateway Requestor @@ -244,8 +247,8 @@ func TestServiceLatencyRemoteConnect(t *testing.T) { // Lastly here, we need to make sure we are properly tracking the extra hops. // We will make sure that NATS latency is close to what we see from the outside in terms of RTT. - if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency < crtt { - t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency, crtt) + if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt { + t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt) } } From 25d5cb337d988bf392005d5080361849d01ee1f1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 11 Sep 2019 17:30:01 -0700 Subject: [PATCH 2/2] Make json tags consistent Signed-off-by: Derek Collison --- server/accounts.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/accounts.go b/server/accounts.go index da6df407..437b548c 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -510,7 +510,7 @@ func (a *Account) IsExportServiceTracking(service string) bool { // NATSLatency represents the internal NATS latencies, including RTTs to clients. type NATSLatency struct { - Requestor time.Duration `json:"request_rtt"` + Requestor time.Duration `json:"requestor_rtt"` Responder time.Duration `json:"responder_rtt"` System time.Duration `json:"system_latency"` }