Files
nats-server/test/service_latency_test.go
Ivan Kozlovic b69ffe244e Fixed some tests
Code change:
- Do not start the processMirrorMsgs and processSourceMsgs go routine
if the server has been detected to be shutdown. This would otherwise
leave some go routine running at the end of some tests.
- Pass the fch and qch to the consumerFileStore's flushLoop otherwise
in some tests this routine could be left running.

Tests changes:
- Added missing defer NATS connection close
- Added missing defer server shutdown

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-09-08 11:28:23 -06:00

1922 lines
54 KiB
Go

// Copyright 2019-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)
// Used to setup superclusters for tests.
type supercluster struct {
t *testing.T
clusters []*cluster
}
func (sc *supercluster) shutdown() {
if sc == nil {
return
}
for _, c := range sc.clusters {
shutdownCluster(c)
}
}
const digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
const base = 36
const cnlen = 8
func randClusterName() string {
var name []byte
rn := rand.Int63()
for i := 0; i < cnlen; i++ {
name = append(name, digits[rn%base])
rn /= base
}
return string(name[:cnlen])
}
func createSuperCluster(t *testing.T, numServersPer, numClusters int) *supercluster {
clusters := []*cluster{}
for i := 0; i < numClusters; i++ {
// Pick cluster name and setup default accounts.
c := createClusterEx(t, true, 5*time.Millisecond, true, randClusterName(), numServersPer, clusters...)
clusters = append(clusters, c)
}
return &supercluster{t, clusters}
}
func (sc *supercluster) setResponseThreshold(t *testing.T, maxTime time.Duration) {
t.Helper()
for _, c := range sc.clusters {
for _, s := range c.servers {
foo, err := s.LookupAccount("FOO")
if err != nil {
t.Fatalf("Error looking up account 'FOO': %v", err)
}
if err := foo.SetServiceExportResponseThreshold("ngs.usage.*", maxTime); err != nil {
t.Fatalf("Error setting response threshold")
}
}
}
}
func (sc *supercluster) setImportShare(t *testing.T) {
t.Helper()
for _, c := range sc.clusters {
for _, s := range c.servers {
foo, err := s.LookupAccount("FOO")
if err != nil {
t.Fatalf("Error looking up account 'FOO': %v", err)
}
bar, err := s.LookupAccount("BAR")
if err != nil {
t.Fatalf("Error looking up account 'BAR': %v", err)
}
if err := bar.SetServiceImportSharing(foo, "ngs.usage.bar", true); err != nil {
t.Fatalf("Error setting import sharing: %v", err)
}
}
}
}
func (sc *supercluster) setupLatencyTracking(t *testing.T, p int) {
t.Helper()
for _, c := range sc.clusters {
for _, s := range c.servers {
foo, err := s.LookupAccount("FOO")
if err != nil {
t.Fatalf("Error looking up account 'FOO': %v", err)
}
bar, err := s.LookupAccount("BAR")
if err != nil {
t.Fatalf("Error looking up account 'BAR': %v", err)
}
if err := foo.AddServiceExport("ngs.usage.*", nil); err != nil {
t.Fatalf("Error adding service export to 'FOO': %v", err)
}
if err := foo.TrackServiceExportWithSampling("ngs.usage.*", "results", p); err != nil {
t.Fatalf("Error adding latency tracking to 'FOO': %v", err)
}
if err := bar.AddServiceImport(foo, "ngs.usage", "ngs.usage.bar"); err != nil {
t.Fatalf("Error adding service import to 'ngs.usage': %v", err)
}
}
}
}
func (sc *supercluster) removeLatencyTracking(t *testing.T) {
t.Helper()
for _, c := range sc.clusters {
for _, s := range c.servers {
foo, err := s.LookupAccount("FOO")
if err != nil {
t.Fatalf("Error looking up account 'FOO': %v", err)
}
foo.UnTrackServiceExport("ngs.usage.*")
}
}
}
func (sc *supercluster) totalSubs() int {
totalSubs := 0
for _, c := range sc.clusters {
totalSubs += c.totalSubs()
}
return totalSubs
}
func clientConnectWithName(t *testing.T, opts *server.Options, user, appname string) *nats.Conn {
t.Helper()
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
nc, err := nats.Connect(url, nats.Name(appname))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
return nc
}
func clientConnect(t *testing.T, opts *server.Options, user string) *nats.Conn {
t.Helper()
return clientConnectWithName(t, opts, user, "")
}
func clientConnectOldRequest(t *testing.T, opts *server.Options, user string) *nats.Conn {
t.Helper()
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
nc, err := nats.Connect(url, nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
return nc
}
func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time, serviceTime time.Duration) {
t.Helper()
if sl.Status != 200 {
t.Fatalf("Bad status received, wanted 200 got %d", sl.Status)
}
serviceTime = serviceTime.Round(time.Millisecond)
startDelta := sl.RequestStart.Sub(start)
// Original test was 5ms, but got GitHub Action failure with "Bad start delta 5.033929ms",
// and Travis will get something like: "Bad start delta 15.046059ms", so be more generous.
if startDelta > 20*time.Millisecond {
t.Fatalf("Bad start delta %v", startDelta)
}
// Since RTT during tests is estimate we remove from calculation.
if (sl.ServiceLatency + sl.Responder.RTT) < time.Duration(float64(serviceTime)*0.8) {
t.Fatalf("Bad service latency: %v (%v)", sl.ServiceLatency, serviceTime)
}
if sl.TotalLatency < sl.ServiceLatency {
t.Fatalf("Bad total latency: %v (%v)", sl.TotalLatency, sl.ServiceLatency)
}
// We should have NATS latency here that is non-zero with real clients.
if sl.Requestor.RTT == 0 {
t.Fatalf("Expected non-zero NATS Requestor latency")
}
if sl.Responder.RTT == 0 {
t.Fatalf("Expected non-zero NATS Requestor latency")
}
// Make sure they add up
got := sl.TotalLatency
expected := sl.ServiceLatency + sl.NATSTotalTime()
if got != expected {
t.Fatalf("Numbers do not add up: %+v,\ngot: %v\nexpected: %v", sl, got, expected)
}
}
func extendedCheck(t *testing.T, lc *server.ClientInfo, eUser, appName, eServer string) {
t.Helper()
if lc.User != eUser {
t.Fatalf("Expected user of %q, got %q", eUser, lc.User)
}
if appName != "" && appName != lc.Name {
t.Fatalf("Expected appname of %q, got %q\n", appName, lc.Name)
}
if lc.Host == "" {
t.Fatalf("Expected non-empty IP")
}
if lc.ID < 1 || lc.ID > 20 {
t.Fatalf("Expected a ID in range, got %d", lc.ID)
}
if eServer != "" && eServer != lc.Server {
t.Fatalf("Expected server of %q, got %q", eServer, lc.Server)
}
}
func noShareCheck(t *testing.T, lc *server.ClientInfo) {
t.Helper()
if lc.Name != "" {
t.Fatalf("appname should not have been shared, got %q", lc.Name)
}
if lc.User != "" {
t.Fatalf("user should not have been shared, got %q", lc.User)
}
if lc.Host != "" {
t.Fatalf("client ip should not have been shared, got %q", lc.Host)
}
if lc.ID != 0 {
t.Fatalf("client id should not have been shared, got %d", lc.ID)
}
if lc.Server != "" {
t.Fatalf("client' server should not have been shared, got %q", lc.Server)
}
}
func TestServiceLatencySingleServerConnect(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
// Now we can setup and test, do single node only first.
// This is the service provider.
nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22")
defer nc.Close()
// The service listener.
serviceTime := 25 * time.Millisecond
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, _ := nc.SubscribeSync("results")
// Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
defer nc2.Close()
// Send the request.
start := time.Now()
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
var sl server.ServiceLatency
rmsg, _ := rsub.NextMsg(time.Second)
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
rs := sc.clusters[0].servers[0]
extendedCheck(t, sl.Responder, "foo", "service22", rs.Name())
// Normally requestor's don't share
noShareCheck(t, sl.Requestor)
// Now make sure normal use case works with old request style.
nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar")
defer nc3.Close()
start = time.Now()
if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
nc3.Close()
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, sl.Responder, "foo", "service22", rs.Name())
// Normally requestor's don't share
noShareCheck(t, sl.Requestor)
}
// If a client has a longer RTT that the effective RTT for NATS + responder
// the requestor RTT will be marked as 0. This can happen quite often with
// utility programs that are far away from a cluster like NGS but the service
// response time has a shorter RTT.
func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) {
sc := createSuperCluster(t, 2, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have BAR import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// The service listener. Mostly instant response.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
time.Sleep(time.Millisecond)
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, _ := nc.SubscribeSync("results")
nc.Flush()
// Requestor and processing
requestAndCheck := func(cindex, sindex int) {
t.Helper()
sopts := sc.clusters[cindex].opts[sindex]
if nmsgs, _, err := rsub.Pending(); err != nil || nmsgs != 0 {
t.Fatalf("Did not expect any latency results, got %d", nmsgs)
}
rtt := 10 * time.Millisecond
bw := 1024 * 1024
sp := newSlowProxy(rtt+5*time.Millisecond, bw, bw, sopts)
defer sp.stop()
nc2 := clientConnect(t, sp.opts(), "bar")
defer nc2.Close()
start := time.Now()
nc2.Flush()
// Check rtt for slow proxy
if d := time.Since(start); d < rtt {
t.Fatalf("Expected an rtt of at least %v, got %v", rtt, d)
}
// Send the request.
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
var sl server.ServiceLatency
rmsg, err := rsub.NextMsg(time.Second)
if err != nil || rmsg == nil {
t.Fatalf("Did not receive latency results")
}
json.Unmarshal(rmsg.Data, &sl)
if sl.Status != 200 {
t.Fatalf("Expected a status 200 Ok, got [%d] %q", sl.Status, sl.Error)
}
// We want to test here that when the client requestor RTT is larger then the response time
// we still deliver a requestor value > 0.
// Now check that it is close to rtt.
if sl.Requestor.RTT < rtt {
t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.Requestor.RTT)
}
if sl.TotalLatency < rtt {
t.Fatalf("Expected total latency to be > %v, got %v", rtt, sl.TotalLatency)
}
rs := sc.clusters[0].servers[0]
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
// Normally requestor's don't share
noShareCheck(t, sl.Requestor)
// Check for trailing duplicates..
rmsg, err = rsub.NextMsg(100 * time.Millisecond)
if err == nil {
t.Fatalf("Duplicate metric result, %q", rmsg.Data)
}
}
// Check same server.
requestAndCheck(0, 0)
// Check from remote server across GW.
requestAndCheck(1, 1)
// Same cluster but different server
requestAndCheck(0, 1)
}
func connRTT(nc *nats.Conn) time.Duration {
// Do 5x to flatten
total := time.Duration(0)
for i := 0; i < 5; i++ {
start := time.Now()
nc.Flush()
total += time.Since(start)
}
return total / 5
}
func TestServiceLatencyRemoteConnect(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
// Now we can setup and test, do single node only first.
// This is the service provider.
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
subsBefore := int(sc.clusters[0].servers[0].NumSubscriptions())
// The service listener.
serviceTime := 25 * time.Millisecond
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, _ := nc.SubscribeSync("results")
nc.Flush()
if err := checkExpectedSubs(subsBefore+2, sc.clusters[0].servers...); err != nil {
t.Fatal(err.Error())
}
// Same Cluster Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
defer nc2.Close()
// Send the request.
start := time.Now()
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
var sl server.ServiceLatency
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting latency measurement: %v", err)
}
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
rs := sc.clusters[0].servers[0]
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
// Normally requestor's don't share
noShareCheck(t, sl.Requestor)
// Lastly here, we need to make sure we are properly tracking the extra hops.
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt {
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt)
}
// Gateway Requestor
nc2 = clientConnect(t, sc.clusters[1].opts[1], "bar")
defer nc2.Close()
// Send the request.
start = time.Now()
_, err = nc2.Request("ngs.usage", []byte("1h"), time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
rmsg, err = rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting latency measurement: %v", err)
}
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
// Normally requestor's don't share
noShareCheck(t, sl.Requestor)
// Lastly here, we need to make sure we are properly tracking the extra hops.
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt {
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt)
}
// Now turn off and make sure we no longer receive updates.
sc.removeLatencyTracking(t)
_, err = nc2.Request("ngs.usage", []byte("1h"), time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
_, err = rsub.NextMsg(100 * time.Millisecond)
if err == nil {
t.Fatalf("Did not expect to receive a latency metric")
}
}
func TestServiceLatencySampling(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 50)
// Now we can setup and test, do single node only first.
// This is the service provider.
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
received := int32(0)
nc.Subscribe("results", func(msg *nats.Msg) {
atomic.AddInt32(&received, 1)
})
// Same Cluster Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
defer nc2.Close()
toSend := 1000
for i := 0; i < toSend; i++ {
nc2.Request("ngs.usage", []byte("1h"), time.Second)
}
// Wait for results to flow in.
time.Sleep(100 * time.Millisecond)
mid := toSend / 2
delta := toSend / 10 // 10%
got := int(atomic.LoadInt32(&received))
if got > mid+delta || got < mid-delta {
t.Fatalf("Sampling number incorrect: %d vs %d", mid, got)
}
}
func TestServiceLatencyNoSubsLeak(t *testing.T) {
sc := createSuperCluster(t, 3, 3)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
nc.Flush()
// Propagation of sub through super cluster.
time.Sleep(100 * time.Millisecond)
startSubs := sc.totalSubs()
fooAcc, _ := sc.clusters[1].servers[1].LookupAccount("FOO")
startNumSis := fooAcc.NumServiceImports()
for i := 0; i < 100; i++ {
nc := clientConnect(t, sc.clusters[1].opts[1], "bar")
defer nc.Close()
if _, err := nc.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Error on request: %v", err)
}
nc.Close()
}
// We are adding 3 here for the wildcard response subject for service replies.
// we only have one but it will show in three places.
startSubs += 3
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if numSubs := sc.totalSubs(); numSubs != startSubs {
return fmt.Errorf("Leaked %d subs", numSubs-startSubs)
}
return nil
})
// Now also check to make sure the service imports created for the request go away as well.
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if numSis := fooAcc.NumServiceImports(); numSis != startNumSis {
return fmt.Errorf("Leaked %d service imports", numSis-startNumSis)
}
return nil
})
}
func TestServiceLatencyWithName(t *testing.T) {
sc := createSuperCluster(t, 1, 1)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
opts := sc.clusters[0].opts[0]
nc := clientConnectWithName(t, opts, "foo", "dlc22")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, _ := nc.SubscribeSync("results")
nc.Flush()
nc2 := clientConnect(t, opts, "bar")
defer nc2.Close()
nc2.Request("ngs.usage", []byte("1h"), time.Second)
var sl server.ServiceLatency
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
json.Unmarshal(rmsg.Data, &sl)
// Make sure we have AppName set.
rs := sc.clusters[0].servers[0]
extendedCheck(t, sl.Responder, "foo", "dlc22", rs.Name())
// Normally requestor's don't share
noShareCheck(t, sl.Requestor)
}
func TestServiceLatencyWithNameMultiServer(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, _ := nc.SubscribeSync("results")
nc.Flush()
nc2 := clientConnect(t, sc.clusters[1].opts[1], "bar")
defer nc2.Close()
nc2.Request("ngs.usage", []byte("1h"), time.Second)
var sl server.ServiceLatency
rmsg, _ := rsub.NextMsg(time.Second)
json.Unmarshal(rmsg.Data, &sl)
// Make sure we have AppName set.
rs := sc.clusters[0].servers[1]
extendedCheck(t, sl.Responder, "foo", "dlc22", rs.Name())
// Normally requestor's don't share
noShareCheck(t, sl.Requestor)
}
func createAccountWithJWT(t *testing.T) (string, nkeys.KeyPair, *jwt.AccountClaims) {
t.Helper()
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
jwt, _ := nac.Encode(okp)
return jwt, akp, nac
}
func TestServiceLatencyWithJWT(t *testing.T) {
okp, _ := nkeys.FromSeed(oSeed)
// Create three accounts, system, service and normal account.
sysJWT, sysKP, _ := createAccountWithJWT(t)
sysPub, _ := sysKP.PublicKey()
_, svcKP, svcAcc := createAccountWithJWT(t)
svcPub, _ := svcKP.PublicKey()
// Add in the service export with latency tracking here.
serviceExport := &jwt.Export{Subject: "req.*", Type: jwt.Service}
svcAcc.Exports.Add(serviceExport)
svcJWT, err := svcAcc.Encode(okp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
_, accKP, accAcc := createAccountWithJWT(t)
accPub, _ := accKP.PublicKey()
// Add in the import.
serviceImport := &jwt.Import{Account: svcPub, Subject: "request", To: "req.echo", Type: jwt.Service}
accAcc.Imports.Add(serviceImport)
accJWT, err := accAcc.Encode(okp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
cf := `
listen: 127.0.0.1:-1
cluster {
name: "A"
listen: 127.0.0.1:-1
authorization {
timeout: 2.2
} %s
}
operator = "./configs/nkeys/op.jwt"
system_account = "%s"
resolver = MEMORY
resolver_preload = {
%s : "%s"
%s : "%s"
%s : "%s"
}
`
contents := strings.Replace(fmt.Sprintf(cf, "", sysPub, sysPub, sysJWT, svcPub, svcJWT, accPub, accJWT), "\n\t", "\n", -1)
conf := createConfFile(t, []byte(contents))
defer removeFile(t, conf)
s, opts := RunServerWithConfig(conf)
defer s.Shutdown()
// Create a new server and route to main one.
routeStr := fmt.Sprintf("\n\t\troutes = [nats-route://%s:%d]", opts.Cluster.Host, opts.Cluster.Port)
contents2 := strings.Replace(fmt.Sprintf(cf, routeStr, sysPub, sysPub, sysJWT, svcPub, svcJWT, accPub, accJWT), "\n\t", "\n", -1)
conf2 := createConfFile(t, []byte(contents2))
defer removeFile(t, conf2)
s2, opts2 := RunServerWithConfig(conf2)
defer s2.Shutdown()
checkClusterFormed(t, s, s2)
// Create service provider.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
copt, pubUser := createUserCredsOption(t, s, svcKP)
nc, err := nats.Connect(url, copt, nats.Name("fooService"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// The service listener.
serviceTime := 25 * time.Millisecond
nc.Subscribe("req.echo", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond(msg.Data)
})
// Listen for metrics
rsub, _ := nc.SubscribeSync("results")
nc.Flush()
// Create second client and send request from this one.
url2 := fmt.Sprintf("nats://%s:%d/", opts2.Host, opts2.Port)
nc2, err := nats.Connect(url2, createUserCreds(t, s2, accKP))
if err != nil {
t.Fatalf("Error creating client: %v\n", err)
}
defer nc2.Close()
// Send the request.
_, err = nc2.Request("request", []byte("hello"), time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
// We should not receive latency at this time.
_, err = rsub.NextMsg(100 * time.Millisecond)
if err == nil {
t.Fatalf("Did not expect to receive a latency metric")
}
// Now turn it on..
updateAccount := func() {
t.Helper()
for _, s := range []*server.Server{s, s2} {
svcAccount, err := s.LookupAccount(svcPub)
if err != nil {
t.Fatalf("Could not lookup service account from server %+v", s)
}
s.UpdateAccountClaims(svcAccount, svcAcc)
}
}
serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"}
updateAccount()
// Grab the service responder's user.
// Send the request.
start := time.Now()
_, err = nc2.Request("request", []byte("hello"), time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
var sl server.ServiceLatency
rmsg, err := rsub.NextMsg(time.Second)
if err != nil || rmsg == nil {
t.Fatalf("Did not receive a latency metric")
}
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, sl.Responder, pubUser, "fooService", s.Name())
// Normally requestor's don't share
noShareCheck(t, sl.Requestor)
// Now we will remove tracking. Do this by simulating a JWT update.
serviceExport.Latency = nil
updateAccount()
// Now we should not get any tracking data.
_, err = nc2.Request("request", []byte("hello"), time.Second)
if err != nil {
t.Fatalf("Expected a response")
}
_, err = rsub.NextMsg(100 * time.Millisecond)
if err == nil {
t.Fatalf("Did not expect to receive a latency metric")
}
}
func TestServiceLatencyAdjustNegativeLatencyValues(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import
// that with tracking enabled.
sc.setupLatencyTracking(t, 100)
// Now we can setup and test, do single node only first.
// This is the service provider.
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, err := nc.SubscribeSync("results")
if err != nil {
t.Fatal(err)
}
nc.Flush()
// Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
defer nc2.Close()
// Send the request.
totalSamples := 50
for i := 0; i < totalSamples; i++ {
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
}
var sl server.ServiceLatency
for i := 0; i < totalSamples; i++ {
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected to receive latency metric: %d, %s", i, err)
}
if err := json.Unmarshal(rmsg.Data, &sl); err != nil {
t.Errorf("Unexpected error processing latency metric: %s", err)
}
if sl.ServiceLatency < 0 {
t.Fatalf("Unexpected negative latency value: %v", sl)
}
}
}
func TestServiceLatencyRemoteConnectAdjustNegativeValues(t *testing.T) {
sc := createSuperCluster(t, 3, 2)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
// Now we can setup and test, do single node only first.
// This is the service provider.
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// The service listener.
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
// Listen for metrics
rsub, err := nc.SubscribeSync("results")
if err != nil {
t.Fatal(err)
}
nc.Flush()
// Same Cluster Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
defer nc2.Close()
// Gateway Requestor
nc3 := clientConnect(t, sc.clusters[1].opts[1], "bar")
defer nc3.Close()
// Send a few initial requests to ensure interest is propagated
// both for cluster and gateway requestors.
checkFor(t, 3*time.Second, time.Second, func() error {
_, err1 := nc2.Request("ngs.usage", []byte("1h"), time.Second)
_, err2 := nc3.Request("ngs.usage", []byte("1h"), time.Second)
if err1 != nil || err2 != nil {
return fmt.Errorf("Timed out waiting for super cluster to be ready")
}
return nil
})
// Send the request.
totalSamples := 20
for i := 0; i < totalSamples; i++ {
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
}
for i := 0; i < totalSamples; i++ {
if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
}
var sl server.ServiceLatency
for i := 0; i < totalSamples*2; i++ {
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected to receive latency metric: %d, %s", i, err)
}
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
t.Errorf("Unexpected error processing latency metric: %s", err)
}
if sl.ServiceLatency < 0 {
t.Fatalf("Unexpected negative service latency value: %v", sl)
}
if sl.SystemLatency < 0 {
t.Fatalf("Unexpected negative system latency value: %v", sl)
}
}
}
func TestServiceLatencyFailureReportingSingleServer(t *testing.T) {
sc := createSuperCluster(t, 1, 1)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
sc.setResponseThreshold(t, 20*time.Millisecond)
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// Listen for metrics
rsub, err := nc.SubscribeSync("results")
if err != nil {
t.Fatal(err)
}
nc.Flush()
getMetricResult := func() *server.ServiceLatency {
t.Helper()
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected to receive latency metric: %v", err)
}
var sl server.ServiceLatency
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
t.Errorf("Unexpected error processing latency metric: %s", err)
}
return &sl
}
// Same server
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
defer nc2.Close()
// Test a request with no reply subject
nc2.Publish("ngs.usage", []byte("1h"))
sl := getMetricResult()
if sl.Status != 400 {
t.Fatalf("Expected to get a bad request status [400], got %d", sl.Status)
}
// Proper request, but no responders.
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
sl = getMetricResult()
if sl.Status != 503 {
t.Fatalf("Expected to get a service unavailable status [503], got %d", sl.Status)
}
// The service listener. Make it slow. 20ms is respThreshold, so take 2X
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
time.Sleep(40 * time.Millisecond)
msg.Respond([]byte("22 msgs"))
})
nc.Flush()
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
sl = getMetricResult()
if sl.Status != 504 {
t.Fatalf("Expected to get a service timeout status [504], got %d", sl.Status)
}
// Make sure we do not get duplicates.
if rmsg, err := rsub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Unexpected second response metric: %q\n", rmsg.Data)
}
// Now setup a responder that will respond under the threshold.
sub.Unsubscribe()
nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
time.Sleep(5 * time.Millisecond)
msg.Respond([]byte("22 msgs"))
})
nc.Flush()
time.Sleep(100 * time.Millisecond)
// Now create a responder using old request and we will do a short timeout.
nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar")
defer nc3.Close()
nc3.Request("ngs.usage", []byte("1h"), time.Millisecond)
sl = getMetricResult()
if sl.Status != 408 {
t.Fatalf("Expected to get a request timeout status [408], got %d", sl.Status)
}
}
func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) {
sc := createSuperCluster(t, 3, 3)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
sc.setResponseThreshold(t, 10*time.Millisecond)
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// Listen for metrics
rsub, err := nc.SubscribeSync("results")
if err != nil {
t.Fatal(err)
}
nc.Flush()
getMetricResult := func() *server.ServiceLatency {
t.Helper()
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected to receive latency metric: %v", err)
}
var sl server.ServiceLatency
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
t.Errorf("Unexpected error processing latency metric: %s", err)
}
return &sl
}
cases := []struct {
ci, si int
desc string
}{
{0, 0, "same server"},
{0, 1, "same cluster, different server"},
{1, 1, "different cluster"},
}
for _, cs := range cases {
// Select the server to send request from.
nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar")
defer nc2.Close()
// Test a request with no reply subject
nc2.Publish("ngs.usage", []byte("1h"))
sl := getMetricResult()
if sl.Status != 400 {
t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status)
}
// We wait here for the gateways to report no interest b/c optimistic mode.
time.Sleep(50 * time.Millisecond)
// Proper request, but no responders.
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
sl = getMetricResult()
if sl.Status != 503 {
t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status)
}
// The service listener. Make it slow. 10ms is respThreshold, so make 3X
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
time.Sleep(30 * time.Millisecond)
msg.Respond([]byte("22 msgs"))
})
defer sub.Unsubscribe()
nc.Flush()
// Wait to propagate.
time.Sleep(200 * time.Millisecond)
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
sl = getMetricResult()
if sl.Status != 504 {
t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status)
}
// Clean up subscriber and requestor
nc2.Close()
sub.Unsubscribe()
nc.Flush()
// Wait to propagate.
time.Sleep(200 * time.Millisecond)
}
}
// To test a bug rip@nats.io is seeing.
func TestServiceLatencyOldRequestStyleSingleServer(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
accounts: [CLIENT]
latency: {
sampling: 100%
subject: latency.svc
}
} ]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`))
defer removeFile(t, conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Listen for metrics
rsub, _ := nc.SubscribeSync("latency.svc")
// Requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Setup responder
serviceTime := 25 * time.Millisecond
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond([]byte("world"))
})
nc.Flush()
defer sub.Unsubscribe()
// Send a request
start := time.Now()
if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
var sl server.ServiceLatency
rmsg, _ := rsub.NextMsg(time.Second)
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, sl.Responder, "svc", "", srv.Name())
noShareCheck(t, sl.Requestor)
}
// To test a bug wally@nats.io is seeing.
func TestServiceAndStreamStackOverflow(t *testing.T) {
conf := createConfFile(t, []byte(`
accounts {
STATIC {
users = [ { user: "static", pass: "foo" } ]
exports [
{ stream: > }
{ service: my.service }
]
}
DYN {
users = [ { user: "foo", pass: "bar" } ]
imports [
{ stream { subject: >, account: STATIC } }
{ service { subject: my.service, account: STATIC } }
]
}
}
`))
defer removeFile(t, conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder (just request sub)
nc, err := nats.Connect(fmt.Sprintf("nats://static:foo@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, _ := nc.SubscribeSync("my.service")
nc.Flush()
// Requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://foo:bar@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Send a single request.
nc2.PublishRequest("my.service", "foo", []byte("hi"))
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nm, _, err := sub.Pending(); err != nil || nm != 1 {
return fmt.Errorf("Expected one request, got %d", nm)
}
return nil
})
// Make sure works for queue subscribers as well.
sub.Unsubscribe()
sub, _ = nc.QueueSubscribeSync("my.service", "prod")
nc.Flush()
// Send a single request.
nc2.PublishRequest("my.service", "foo", []byte("hi"))
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nm, _, err := sub.Pending(); err != nil || nm != 1 {
return fmt.Errorf("Expected one request, got %d", nm)
}
return nil
})
// Now create an interest in the stream from nc2. that is a queue subscriber.
sub2, _ := nc2.QueueSubscribeSync("my.service", "prod")
defer sub2.Unsubscribe()
nc2.Flush()
// Send a single request.
nc2.PublishRequest("my.service", "foo", []byte("hi"))
time.Sleep(10 * time.Millisecond)
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nm, _, err := sub.Pending(); err != nil || nm != 2 {
return fmt.Errorf("Expected two requests, got %d", nm)
}
return nil
})
}
// Check we get the proper detailed information for the requestor when allowed.
func TestServiceLatencyRequestorSharesDetailedInfo(t *testing.T) {
sc := createSuperCluster(t, 3, 3)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
sc.setResponseThreshold(t, 10*time.Millisecond)
sc.setImportShare(t)
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
// Listen for metrics
rsub, err := nc.SubscribeSync("results")
if err != nil {
t.Fatal(err)
}
nc.Flush()
getMetricResult := func() *server.ServiceLatency {
t.Helper()
rmsg, err := rsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected to receive latency metric: %v", err)
}
var sl server.ServiceLatency
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
t.Errorf("Unexpected error processing latency metric: %s", err)
}
return &sl
}
cases := []struct {
ci, si int
desc string
}{
{0, 0, "same server"},
{0, 1, "same cluster, different server"},
{1, 1, "different cluster"},
}
for _, cs := range cases {
// Select the server to send request from.
nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar")
defer nc2.Close()
rs := sc.clusters[cs.ci].servers[cs.si]
// Test a request with no reply subject
nc2.Publish("ngs.usage", []byte("1h"))
sl := getMetricResult()
if sl.Status != 400 {
t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status)
}
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
// We wait here for the gateways to report no interest b/c optimistic mode.
time.Sleep(50 * time.Millisecond)
// Proper request, but no responders.
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
sl = getMetricResult()
if sl.Status != 503 {
t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status)
}
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
// The service listener. Make it slow. 10ms is respThreshold, so take 2.5X
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
time.Sleep(25 * time.Millisecond)
msg.Respond([]byte("22 msgs"))
})
defer sub.Unsubscribe()
nc.Flush()
// Wait to propagate.
time.Sleep(200 * time.Millisecond)
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
sl = getMetricResult()
if sl.Status != 504 {
t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status)
}
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
// Clean up subscriber and requestor
nc2.Close()
sub.Unsubscribe()
nc.Flush()
// Wait to propagate.
time.Sleep(200 * time.Millisecond)
}
}
func TestServiceLatencyRequestorSharesConfig(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
accounts: [CLIENT]
latency: {
sampling: 100%
subject: latency.svc
}
} ]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`))
defer removeFile(t, conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Listen for metrics
rsub, _ := nc.SubscribeSync("latency.svc")
// Requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Setup responder
serviceTime := 25 * time.Millisecond
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond([]byte("world"))
})
nc.Flush()
defer sub.Unsubscribe()
// Send a request
start := time.Now()
if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
var sl server.ServiceLatency
rmsg, _ := rsub.NextMsg(time.Second)
json.Unmarshal(rmsg.Data, &sl)
checkServiceLatency(t, sl, start, serviceTime)
extendedCheck(t, sl.Responder, "svc", "", srv.Name())
extendedCheck(t, sl.Requestor, "client", "", srv.Name())
// Check reload.
newConf := []byte(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
accounts: [CLIENT]
latency: {
sampling: 100%
subject: latency.svc
}
} ]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`)
if err := os.WriteFile(conf, newConf, 0600); err != nil {
t.Fatalf("Error rewriting server's config file: %v", err)
}
if err := srv.Reload(); err != nil {
t.Fatalf("Error on server reload: %v", err)
}
if _, err = nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
var sl2 server.ServiceLatency
rmsg, _ = rsub.NextMsg(time.Second)
json.Unmarshal(rmsg.Data, &sl2)
noShareCheck(t, sl2.Requestor)
}
func TestServiceLatencyLossTest(t *testing.T) {
// assure that behavior with respect to requests timing out (and samples being reordered) is as expected.
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
threshold: "500ms"
accounts: [CLIENT]
latency: {
sampling: headers
subject: latency.svc
}
} ]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`))
defer removeFile(t, conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder connection
ncr, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("responder"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncr.Close()
ncl, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("latency"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncl.Close()
// Table of expected state for which message.
// This also codifies that the first message, in respsonse to second request is ok.
// Second message, in response to first request times out.
expectedState := map[int]int{1: http.StatusOK, 2: http.StatusGatewayTimeout}
msgCnt := 0
start := time.Now().Add(250 * time.Millisecond)
var latErr []error
// Listen for metrics
wg := sync.WaitGroup{}
wg.Add(2)
rsub, _ := ncl.Subscribe("latency.svc", func(rmsg *nats.Msg) {
defer wg.Done()
var sl server.ServiceLatency
json.Unmarshal(rmsg.Data, &sl)
msgCnt++
if want := expectedState[msgCnt]; want != sl.Status {
latErr = append(latErr, fmt.Errorf("Expected different status for msg #%d: %d != %d", msgCnt, want, sl.Status))
}
if msgCnt > 1 {
if start.Before(sl.RequestStart) {
latErr = append(latErr, fmt.Errorf("start times should indicate reordering %v : %v", start, sl.RequestStart))
}
}
start = sl.RequestStart
if strings.EqualFold(sl.RequestHeader.Get("Uber-Trace-Id"), fmt.Sprintf("msg-%d", msgCnt)) {
latErr = append(latErr, fmt.Errorf("no header present"))
}
})
defer rsub.Unsubscribe()
// Setup requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port),
nats.UseOldRequestStyle(), nats.Name("requestor"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
respCnt := int64(0)
reply := nc2.NewRespInbox()
repSub, _ := nc2.Subscribe(reply, func(msg *nats.Msg) {
atomic.AddInt64(&respCnt, 1)
})
defer repSub.Unsubscribe()
nc2.Flush()
// use dedicated send that publishes requests using same reply subject
send := func(msg string) {
if err := nc2.PublishMsg(&nats.Msg{Subject: "SVC", Data: []byte(msg), Reply: reply,
Header: nats.Header{"X-B3-Sampled": []string{"1"}}}); err != nil {
t.Fatalf("Expected a response got: %v", err)
}
}
// Setup responder that skips responding and triggers next request OR responds
sub, _ := ncr.Subscribe("svc.echo", func(msg *nats.Msg) {
if string(msg.Data) != "msg2" {
msg.Respond([]byte("response"))
} else {
wg.Add(1)
go func() { // second request (use go routine to not block in responders callback)
defer wg.Done()
time.Sleep(250 * time.Millisecond)
send("msg1") // will cause the first latency measurement
}()
}
})
ncr.Flush()
ncl.Flush()
nc2.Flush()
defer sub.Unsubscribe()
// Send first request, which is expected to timeout
send("msg2")
// wait till we got enough responses
wg.Wait()
if len(latErr) > 0 {
t.Fatalf("Got errors %v", latErr)
}
if atomic.LoadInt64(&respCnt) != 1 {
t.Fatalf("Expected only one message")
}
}
func TestServiceLatencyHeaderTriggered(t *testing.T) {
receiveAndTest := func(t *testing.T, rsub *nats.Subscription, shared bool, header nats.Header, status int, srvName string) server.ServiceLatency {
t.Helper()
var sl server.ServiceLatency
rmsg, _ := rsub.NextMsg(time.Second)
if rmsg == nil {
t.Fatal("Expected message")
return sl
}
json.Unmarshal(rmsg.Data, &sl)
if sl.Status != status {
t.Fatalf("Expected different status %d != %d", status, sl.Status)
}
if status == http.StatusOK {
extendedCheck(t, sl.Responder, "svc", "", srvName)
}
if shared {
extendedCheck(t, sl.Requestor, "client", "", srvName)
} else {
noShareCheck(t, sl.Requestor)
}
// header are always included
if v := sl.RequestHeader.Get("Some-Other"); v != "" {
t.Fatalf("Expected header to be gone")
}
for k, value := range header {
if v := sl.RequestHeader.Get(k); v != value[0] {
t.Fatalf("Expected header %q to be set", k)
}
}
return sl
}
for _, v := range []struct {
shared bool
header nats.Header
}{
{true, nats.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}},
{true, nats.Header{"X-B3-Sampled": []string{"1"}}},
{true, nats.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}},
{true, nats.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}},
{true, nats.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}},
{false, nats.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}},
{false, nats.Header{"X-B3-Sampled": []string{"1"}}},
{false, nats.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}},
{false, nats.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}},
{false, nats.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}},
{false, nats.Header{
"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"},
"X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"},
"X-B3-SpanId": []string{"e457b5a2e4d86bd1"},
"X-B3-Sampled": []string{"1"},
}},
{false, nats.Header{
"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"},
"X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"},
"X-B3-SpanId": []string{"e457b5a2e4d86bd1"},
}},
{false, nats.Header{
"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"},
"Uberctx-X": []string{"foo"},
}},
{false, nats.Header{
"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"},
"Tracestate": []string{"rojo=00f067aa0ba902b7,congo=t61rcWkgMzE"},
}},
} {
t.Run(fmt.Sprintf("%s_%t_%s", t.Name(), v.shared, v.header), func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
accounts: [CLIENT]
latency: {
sampling: headers
subject: latency.svc
}
}]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:%t} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`, v.shared)))
defer removeFile(t, conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Listen for metrics
rsub, _ := nc.SubscribeSync("latency.svc")
defer rsub.Unsubscribe()
// Setup responder
serviceTime := 25 * time.Millisecond
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond([]byte("world"))
})
nc.Flush()
defer sub.Unsubscribe()
// Setup requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Send a request
start := time.Now()
msg := &nats.Msg{
Subject: "SVC",
Data: []byte("1h"),
Header: make(nats.Header),
}
for k, v := range v.header {
for _, val := range v {
msg.Header.Add(k, val)
}
}
msg.Header.Add("Some-Other", "value")
if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err != nil {
t.Fatalf("Expected a response")
}
sl := receiveAndTest(t, rsub, v.shared, v.header, http.StatusOK, srv.Name())
checkServiceLatency(t, sl, start, serviceTime)
// shut down responder to test various error scenarios
sub.Unsubscribe()
nc.Flush()
// Send a request without responder
if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err == nil {
t.Fatalf("Expected no response")
}
receiveAndTest(t, rsub, v.shared, v.header, http.StatusServiceUnavailable, srv.Name())
// send a message without a response
msg.Reply = ""
if err := nc2.PublishMsg(msg); err != nil {
t.Fatalf("Expected no error got %v", err)
}
receiveAndTest(t, rsub, v.shared, v.header, http.StatusBadRequest, srv.Name())
})
}
}
// From a report by rip@nats.io on simple latency reporting missing in 2 server cluster setup.
func TestServiceLatencyMissingResults(t *testing.T) {
accConf := createConfFile(t, []byte(`
accounts {
one: {
users = [ {user: one, password: password} ]
imports = [ {service: {account: weather, subject: service.weather.requests.>}, to: service.weather.>, share: true} ]
}
weather: {
users = [ {user: weather, password: password} ]
exports = [ {
service: service.weather.requests.>
accounts: [one]
latency: { sampling: 100%, subject: service.weather.latency }
} ]
}
}
`))
defer removeFile(t, accConf)
s1Conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
server_name: s1
cluster { port: -1 }
include %q
`, filepath.Base(accConf))))
defer removeFile(t, s1Conf)
s1, opts1 := RunServerWithConfig(s1Conf)
defer s1.Shutdown()
s2Conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
server_name: s2
cluster {
port: -1
routes = [ nats-route://127.0.0.1:%d ]
}
include %q
`, opts1.Cluster.Port, filepath.Base(accConf))))
defer removeFile(t, s2Conf)
s2, opts2 := RunServerWithConfig(s2Conf)
defer s2.Shutdown()
checkClusterFormed(t, s1, s2)
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "weather", "password", opts1.Host, opts1.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()
// Create responder
sub, _ := nc1.Subscribe("service.weather.requests.>", func(msg *nats.Msg) {
time.Sleep(25 * time.Millisecond)
msg.Respond([]byte("sunny!"))
})
defer sub.Unsubscribe()
// Create sync listener for latency.
latSubj := "service.weather.latency"
lsub, _ := nc1.SubscribeSync(latSubj)
defer lsub.Unsubscribe()
nc1.Flush()
// Make sure the subscription propagates to s2 server.
checkSubInterest(t, s2, "weather", latSubj, time.Second)
// Create requestor on s2.
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "one", "password", opts2.Host, opts2.Port), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
nc2.Request("service.weather.los_angeles", nil, time.Second)
lr, err := lsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Expected a latency result, got %v", err)
}
// Make sure we reported ok and have valid results for service, system and total.
var sl server.ServiceLatency
json.Unmarshal(lr.Data, &sl)
if sl.Status != 200 {
t.Fatalf("Expected a 200 status, got %d\n", sl.Status)
}
if sl.ServiceLatency == 0 || sl.SystemLatency == 0 || sl.TotalLatency == 0 {
t.Fatalf("Received invalid tracking measurements, %d %d %d", sl.ServiceLatency, sl.SystemLatency, sl.TotalLatency)
}
}
// To test a bug I was seeing.
func TestServiceLatencyDoubleResponse(t *testing.T) {
sc := createSuperCluster(t, 3, 1)
defer sc.shutdown()
// Now add in new service export to FOO and have bar import that with tracking enabled.
sc.setupLatencyTracking(t, 100)
// Responder
nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22")
defer nc.Close()
// Setup responder
sub, _ := nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
msg.Respond([]byte("boom"))
})
nc.Flush()
defer sub.Unsubscribe()
// Listen for metrics
rsub, _ := nc.SubscribeSync("latency.svc")
// Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
defer nc2.Close()
// Send a request
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
rsub.NextMsg(time.Second)
time.Sleep(time.Second)
}