diff --git a/server/accounts.go b/server/accounts.go index 095c6bed..0ac42de0 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -682,6 +682,21 @@ func (m1 *ServiceLatency) merge(m2 *ServiceLatency) { m1.NATSLatency.System = m1.ServiceLatency - (m2.ServiceLatency + m2.NATSLatency.Responder) m1.ServiceLatency = m2.ServiceLatency m1.NATSLatency.Responder = m2.NATSLatency.Responder + sanitizeLatencyMetric(m1) +} + +// sanitizeLatencyMetric adjusts latency metric values that could go +// negative in some edge conditions since we estimate client RTT +// for both requestor and responder. +// These numbers are never meant to be negative, it just could be +// how we back into the values based on estimated RTT. +func sanitizeLatencyMetric(sl *ServiceLatency) { + if sl.ServiceLatency < 0 { + sl.ServiceLatency = 0 + } + if sl.NATSLatency.System < 0 { + sl.NATSLatency.System = 0 + } } // Used for transporting remote latency measurements. @@ -726,6 +741,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c }, TotalLatency: reqClientRTT + serviceRTT, } + sanitizeLatencyMetric(&sl) // If we are expecting a remote measurement, store our sl here. // We need to account for the race between this and us receiving the diff --git a/server/client.go b/server/client.go index a968c23b..a2965e25 100644 --- a/server/client.go +++ b/server/client.go @@ -2537,6 +2537,8 @@ func (c *client) processInboundClientMsg(msg []byte) { sl.ServiceLatency = time.Since(sl.RequestStart) - rtt sl.NATSLatency.Responder = rtt sl.TotalLatency = sl.ServiceLatency + rtt + sanitizeLatencyMetric(sl) + lsub := remoteLatencySubjectForResponse(c.pa.subject) c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account } diff --git a/test/service_latency_test.go b/test/service_latency_test.go index d8379227..c64147b7 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -142,9 +142,12 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time if sl.NATSLatency.Responder == 0 { t.Fatalf("Expected non-zero NATS Requestor latency") } + // Make sure they add up - if sl.TotalLatency != sl.ServiceLatency+sl.NATSLatency.TotalTime() { - t.Fatalf("Numbers do not add up: %+v", sl) + got := sl.TotalLatency + expected := sl.ServiceLatency + sl.NATSLatency.TotalTime() + if got != expected { + t.Fatalf("Numbers do not add up: %+v,\ngot: %v\nexpected: %v", sl, got, expected) } } @@ -172,7 +175,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) { // Requestor nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar") - defer nc.Close() + defer nc2.Close() // Send the request. start := time.Now() @@ -223,7 +226,7 @@ func TestServiceLatencyRemoteConnect(t *testing.T) { // Same Cluster Requestor nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar") - defer nc.Close() + defer nc2.Close() // Send the request. start := time.Now() @@ -248,7 +251,7 @@ func TestServiceLatencyRemoteConnect(t *testing.T) { // Gateway Requestor nc2 = clientConnect(t, sc.clusters[1].opts[1], "bar") - defer nc.Close() + defer nc2.Close() // Send the request. start = time.Now() @@ -309,7 +312,7 @@ func TestServiceLatencySampling(t *testing.T) { // Same Cluster Requestor nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar") - defer nc.Close() + defer nc2.Close() toSend := 1000 for i := 0; i < toSend; i++ { @@ -348,7 +351,7 @@ func TestServiceLatencyWithName(t *testing.T) { rsub, _ := nc.SubscribeSync("results") nc2 := clientConnect(t, opts, "bar") - defer nc.Close() + defer nc2.Close() nc2.Request("ngs.usage", []byte("1h"), time.Second) var sl server.ServiceLatency @@ -380,7 +383,7 @@ func TestServiceLatencyWithNameMultiServer(t *testing.T) { rsub, _ := nc.SubscribeSync("results") nc2 := clientConnect(t, sc.clusters[1].opts[1], "bar") - defer nc.Close() + defer nc2.Close() nc2.Request("ngs.usage", []byte("1h"), time.Second) var sl server.ServiceLatency @@ -632,3 +635,132 @@ func TestServiceLatencyWithJWT(t *testing.T) { t.Fatalf("Did not expect to receive a latency metric") } } + +func TestServiceLatencyAdjustNegativeLatencyValues(t *testing.T) { + sc := createSuperCluster(t, 3, 2) + defer sc.shutdown() + + // Now add in new service export to FOO and have bar import + // that with tracking enabled. + sc.setupLatencyTracking(t, 100) + + // Now we can setup and test, do single node only first. + // This is the service provider. + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + // The service listener. + nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) { + msg.Respond([]byte("22 msgs")) + }) + + // Listen for metrics + rsub, err := nc.SubscribeSync("results") + if err != nil { + t.Fatal(err) + } + nc.Flush() + + // Requestor + nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar") + defer nc2.Close() + + // Send the request. + totalSamples := 50 + for i := 0; i < totalSamples; i++ { + if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + } + + var sl server.ServiceLatency + for i := 0; i < totalSamples; i++ { + rmsg, err := rsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Expected to receive latency metric: %d, %s", i, err) + } + if err := json.Unmarshal(rmsg.Data, &sl); err != nil { + t.Errorf("Unexpected error processing latency metric: %s", err) + } + if sl.ServiceLatency < 0 { + t.Fatalf("Unexpected negative latency value: %v", sl) + } + } +} + +func TestServiceLatencyRemoteConnectAdjustNegativeValues(t *testing.T) { + sc := createSuperCluster(t, 3, 2) + defer sc.shutdown() + + // Now add in new service export to FOO and have bar import that with tracking enabled. + sc.setupLatencyTracking(t, 100) + + // Now we can setup and test, do single node only first. + // This is the service provider. + nc := clientConnect(t, sc.clusters[0].opts[0], "foo") + defer nc.Close() + + // The service listener. + nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) { + // time.Sleep(serviceTime) + msg.Respond([]byte("22 msgs")) + }) + + // Listen for metrics + rsub, err := nc.SubscribeSync("results") + if err != nil { + t.Fatal(err) + } + nc.Flush() + + // Same Cluster Requestor + nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar") + defer nc2.Close() + + // Gateway Requestor + nc3 := clientConnect(t, sc.clusters[1].opts[1], "bar") + defer nc3.Close() + + // Send a few initial requests to ensure interest is propagated + // both for cluster and gateway requestors. + checkFor(t, 3*time.Second, time.Second, func() error { + _, err1 := nc2.Request("ngs.usage", []byte("1h"), time.Second) + _, err2 := nc3.Request("ngs.usage", []byte("1h"), time.Second) + + if err1 != nil || err2 != nil { + return fmt.Errorf("Timed out waiting for super cluster to be ready") + } + return nil + }) + + // Send the request. + totalSamples := 20 + for i := 0; i < totalSamples; i++ { + if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + } + + for i := 0; i < totalSamples; i++ { + if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil { + t.Fatalf("Expected a response") + } + } + + var sl server.ServiceLatency + for i := 0; i < totalSamples*2; i++ { + rmsg, err := rsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Expected to receive latency metric: %d, %s", i, err) + } + if err = json.Unmarshal(rmsg.Data, &sl); err != nil { + t.Errorf("Unexpected error processing latency metric: %s", err) + } + if sl.ServiceLatency < 0 { + t.Fatalf("Unexpected negative service latency value: %v", sl) + } + if sl.NATSLatency.System < 0 { + t.Fatalf("Unexpected negative system latency value: %v", sl) + } + } +}