Fix for latency tracking bug.

The bug occures when latency tracking is on, a requestor and responder are not connected to the same server, and the responder sends two responses for a single request.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-11-12 17:37:15 -08:00
parent 8ac1ca0e98
commit 65d098f526
3 changed files with 43 additions and 3 deletions

View File

@@ -1175,7 +1175,11 @@ func (m1 *ServiceLatency) NATSTotalTime() time.Duration {
// m1 TotalLatency is correct, so use that.
// Will use those to back into NATS latency.
func (m1 *ServiceLatency) merge(m2 *ServiceLatency) {
m1.SystemLatency = m1.ServiceLatency - (m2.ServiceLatency + m2.Responder.RTT)
rtt := time.Duration(0)
if m2.Responder != nil {
rtt = m2.Responder.RTT
}
m1.SystemLatency = m1.ServiceLatency - (m2.ServiceLatency + rtt)
m1.ServiceLatency = m2.ServiceLatency
m1.Responder = m2.Responder
sanitizeLatencyMetric(m1)

View File

@@ -3843,7 +3843,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// Send tracking info here if we are tracking this response.
// This is always a response.
var didSendTL bool
if si.tracking {
if si.tracking && !si.didDeliver {
// Stamp that we attempted delivery.
si.didDeliver = true
didSendTL = acc.sendTrackingLatency(si, c)

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2020 The NATS Authors
// Copyright 2019-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1884,3 +1884,39 @@ func TestServiceLatencyMissingResults(t *testing.T) {
t.Fatalf("Received invalid tracking measurements, %d %d %d", sl.ServiceLatency, sl.SystemLatency, sl.TotalLatency)
}
}
// To test a bug I was seeing.
func TestServiceLatencyDoubleResponse(t *testing.T) {
sc := createSuperCluster(t, 3, 1)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
// Responder
nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22")
defer nc.Close()
// Setup responder
sub, _ := nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
msg.Respond([]byte("boom"))
})
nc.Flush()
defer sub.Unsubscribe()
// Listen for metrics
rsub, _ := nc.SubscribeSync("latency.svc")
// Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
defer nc2.Close()
// Send a request
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
rsub.NextMsg(time.Second)
time.Sleep(time.Second)
}