Merge pull request #1136 from nats-io/svc-latency-values

Adjust to zero negative latency values
This commit is contained in:
Waldemar Quevedo
2019-09-20 11:39:33 -05:00
committed by GitHub
3 changed files with 158 additions and 8 deletions

View File

@@ -682,6 +682,21 @@ func (m1 *ServiceLatency) merge(m2 *ServiceLatency) {
m1.NATSLatency.System = m1.ServiceLatency - (m2.ServiceLatency + m2.NATSLatency.Responder)
m1.ServiceLatency = m2.ServiceLatency
m1.NATSLatency.Responder = m2.NATSLatency.Responder
sanitizeLatencyMetric(m1)
}
// sanitizeLatencyMetric adjusts latency metric values that could go
// negative in some edge conditions since we estimate client RTT
// for both requestor and responder.
// These numbers are never meant to be negative, it just could be
// how we back into the values based on estimated RTT.
func sanitizeLatencyMetric(sl *ServiceLatency) {
if sl.ServiceLatency < 0 {
sl.ServiceLatency = 0
}
if sl.NATSLatency.System < 0 {
sl.NATSLatency.System = 0
}
}
// Used for transporting remote latency measurements.
@@ -726,6 +741,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c
},
TotalLatency: reqClientRTT + serviceRTT,
}
sanitizeLatencyMetric(&sl)
// If we are expecting a remote measurement, store our sl here.
// We need to account for the race between this and us receiving the

View File

@@ -2537,6 +2537,8 @@ func (c *client) processInboundClientMsg(msg []byte) {
sl.ServiceLatency = time.Since(sl.RequestStart) - rtt
sl.NATSLatency.Responder = rtt
sl.TotalLatency = sl.ServiceLatency + rtt
sanitizeLatencyMetric(sl)
lsub := remoteLatencySubjectForResponse(c.pa.subject)
c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account
}

View File

@@ -142,9 +142,12 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time
if sl.NATSLatency.Responder == 0 {
t.Fatalf("Expected non-zero NATS Requestor latency")
}
// Make sure they add up
if sl.TotalLatency != sl.ServiceLatency+sl.NATSLatency.TotalTime() {
t.Fatalf("Numbers do not add up: %+v", sl)
got := sl.TotalLatency
expected := sl.ServiceLatency + sl.NATSLatency.TotalTime()
if got != expected {
t.Fatalf("Numbers do not add up: %+v,\ngot: %v\nexpected: %v", sl, got, expected)
}
}
@@ -172,7 +175,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) {
// Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
defer nc.Close()
defer nc2.Close()
// Send the request.
start := time.Now()
@@ -223,7 +226,7 @@ func TestServiceLatencyRemoteConnect(t *testing.T) {
// Same Cluster Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
defer nc.Close()
defer nc2.Close()
// Send the request.
start := time.Now()
@@ -248,7 +251,7 @@ func TestServiceLatencyRemoteConnect(t *testing.T) {
// Gateway Requestor
nc2 = clientConnect(t, sc.clusters[1].opts[1], "bar")
defer nc.Close()
defer nc2.Close()
// Send the request.
start = time.Now()
@@ -309,7 +312,7 @@ func TestServiceLatencySampling(t *testing.T) {
// Same Cluster Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
defer nc.Close()
defer nc2.Close()
toSend := 1000
for i := 0; i < toSend; i++ {
@@ -348,7 +351,7 @@ func TestServiceLatencyWithName(t *testing.T) {
rsub, _ := nc.SubscribeSync("results")
nc2 := clientConnect(t, opts, "bar")
defer nc.Close()
defer nc2.Close()
nc2.Request("ngs.usage", []byte("1h"), time.Second)
var sl server.ServiceLatency
@@ -380,7 +383,7 @@ func TestServiceLatencyWithNameMultiServer(t *testing.T) {
rsub, _ := nc.SubscribeSync("results")
nc2 := clientConnect(t, sc.clusters[1].opts[1], "bar")
defer nc.Close()
defer nc2.Close()
nc2.Request("ngs.usage", []byte("1h"), time.Second)
var sl server.ServiceLatency
@@ -632,3 +635,132 @@ func TestServiceLatencyWithJWT(t *testing.T) {
t.Fatalf("Did not expect to receive a latency metric")
}
}
func TestServiceLatencyAdjustNegativeLatencyValues(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import
// that with tracking enabled.
sc.setupLatencyTracking(t, 100)
// Now we can setup and test, do single node only first.
// This is the service provider.
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, err := nc.SubscribeSync("results")
if err != nil {
t.Fatal(err)
}
nc.Flush()
// Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
defer nc2.Close()
// Send the request.
totalSamples := 50
for i := 0; i < totalSamples; i++ {
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
}
var sl server.ServiceLatency
for i := 0; i < totalSamples; i++ {
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected to receive latency metric: %d, %s", i, err)
}
if err := json.Unmarshal(rmsg.Data, &sl); err != nil {
t.Errorf("Unexpected error processing latency metric: %s", err)
}
if sl.ServiceLatency < 0 {
t.Fatalf("Unexpected negative latency value: %v", sl)
}
}
}
func TestServiceLatencyRemoteConnectAdjustNegativeValues(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
// Now we can setup and test, do single node only first.
// This is the service provider.
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
// time.Sleep(serviceTime)
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, err := nc.SubscribeSync("results")
if err != nil {
t.Fatal(err)
}
nc.Flush()
// Same Cluster Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
defer nc2.Close()
// Gateway Requestor
nc3 := clientConnect(t, sc.clusters[1].opts[1], "bar")
defer nc3.Close()
// Send a few initial requests to ensure interest is propagated
// both for cluster and gateway requestors.
checkFor(t, 3*time.Second, time.Second, func() error {
_, err1 := nc2.Request("ngs.usage", []byte("1h"), time.Second)
_, err2 := nc3.Request("ngs.usage", []byte("1h"), time.Second)
if err1 != nil || err2 != nil {
return fmt.Errorf("Timed out waiting for super cluster to be ready")
}
return nil
})
// Send the request.
totalSamples := 20
for i := 0; i < totalSamples; i++ {
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
}
for i := 0; i < totalSamples; i++ {
if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
}
var sl server.ServiceLatency
for i := 0; i < totalSamples*2; i++ {
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected to receive latency metric: %d, %s", i, err)
}
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
t.Errorf("Unexpected error processing latency metric: %s", err)
}
if sl.ServiceLatency < 0 {
t.Fatalf("Unexpected negative service latency value: %v", sl)
}
if sl.NATSLatency.System < 0 {
t.Fatalf("Unexpected negative system latency value: %v", sl)
}
}
}