mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Currently in tests, we have calls to os.Remove and os.RemoveAll where we don't check the returned error. This hides useful error messages when tests fail to run, such as "too many open files". This change checks for more filesystem related errors and calls t.Fatal if there is an error.
1878 lines
53 KiB
Go
1878 lines
53 KiB
Go
// Copyright 2019-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net/http"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/jwt/v2"
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nkeys"
|
|
)
|
|
|
|
// Used to setup superclusters for tests.
|
|
type supercluster struct {
|
|
t *testing.T
|
|
clusters []*cluster
|
|
}
|
|
|
|
func (sc *supercluster) shutdown() {
|
|
if sc == nil {
|
|
return
|
|
}
|
|
for _, c := range sc.clusters {
|
|
shutdownCluster(c)
|
|
}
|
|
}
|
|
|
|
const digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
|
const base = 36
|
|
const cnlen = 8
|
|
|
|
func randClusterName() string {
|
|
var name []byte
|
|
rn := rand.Int63()
|
|
for i := 0; i < cnlen; i++ {
|
|
name = append(name, digits[rn%base])
|
|
rn /= base
|
|
}
|
|
return string(name[:cnlen])
|
|
}
|
|
|
|
func createSuperCluster(t *testing.T, numServersPer, numClusters int) *supercluster {
|
|
clusters := []*cluster{}
|
|
|
|
for i := 0; i < numClusters; i++ {
|
|
// Pick cluster name and setup default accounts.
|
|
c := createClusterEx(t, true, 5*time.Millisecond, true, randClusterName(), numServersPer, clusters...)
|
|
clusters = append(clusters, c)
|
|
}
|
|
return &supercluster{t, clusters}
|
|
}
|
|
|
|
func (sc *supercluster) setResponseThreshold(t *testing.T, maxTime time.Duration) {
|
|
t.Helper()
|
|
for _, c := range sc.clusters {
|
|
for _, s := range c.servers {
|
|
foo, err := s.LookupAccount("FOO")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'FOO': %v", err)
|
|
}
|
|
if err := foo.SetServiceExportResponseThreshold("ngs.usage.*", maxTime); err != nil {
|
|
t.Fatalf("Error setting response threshold")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) setImportShare(t *testing.T) {
|
|
t.Helper()
|
|
for _, c := range sc.clusters {
|
|
for _, s := range c.servers {
|
|
foo, err := s.LookupAccount("FOO")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'FOO': %v", err)
|
|
}
|
|
bar, err := s.LookupAccount("BAR")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'BAR': %v", err)
|
|
}
|
|
if err := bar.SetServiceImportSharing(foo, "ngs.usage.bar", true); err != nil {
|
|
t.Fatalf("Error setting import sharing: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) setupLatencyTracking(t *testing.T, p int) {
|
|
t.Helper()
|
|
for _, c := range sc.clusters {
|
|
for _, s := range c.servers {
|
|
foo, err := s.LookupAccount("FOO")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'FOO': %v", err)
|
|
}
|
|
bar, err := s.LookupAccount("BAR")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'BAR': %v", err)
|
|
}
|
|
if err := foo.AddServiceExport("ngs.usage.*", nil); err != nil {
|
|
t.Fatalf("Error adding service export to 'FOO': %v", err)
|
|
}
|
|
if err := foo.TrackServiceExportWithSampling("ngs.usage.*", "results", p); err != nil {
|
|
t.Fatalf("Error adding latency tracking to 'FOO': %v", err)
|
|
}
|
|
if err := bar.AddServiceImport(foo, "ngs.usage", "ngs.usage.bar"); err != nil {
|
|
t.Fatalf("Error adding service import to 'ngs.usage': %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) removeLatencyTracking(t *testing.T) {
|
|
t.Helper()
|
|
for _, c := range sc.clusters {
|
|
for _, s := range c.servers {
|
|
foo, err := s.LookupAccount("FOO")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'FOO': %v", err)
|
|
}
|
|
foo.UnTrackServiceExport("ngs.usage.*")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) totalSubs() int {
|
|
totalSubs := 0
|
|
for _, c := range sc.clusters {
|
|
totalSubs += c.totalSubs()
|
|
}
|
|
return totalSubs
|
|
}
|
|
|
|
func clientConnectWithName(t *testing.T, opts *server.Options, user, appname string) *nats.Conn {
|
|
t.Helper()
|
|
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
|
|
nc, err := nats.Connect(url, nats.Name(appname))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
return nc
|
|
}
|
|
|
|
func clientConnect(t *testing.T, opts *server.Options, user string) *nats.Conn {
|
|
t.Helper()
|
|
return clientConnectWithName(t, opts, user, "")
|
|
}
|
|
|
|
func clientConnectOldRequest(t *testing.T, opts *server.Options, user string) *nats.Conn {
|
|
t.Helper()
|
|
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
|
|
nc, err := nats.Connect(url, nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
return nc
|
|
}
|
|
|
|
func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time, serviceTime time.Duration) {
|
|
t.Helper()
|
|
|
|
if sl.Status != 200 {
|
|
t.Fatalf("Bad status received, wanted 200 got %d", sl.Status)
|
|
}
|
|
|
|
serviceTime = serviceTime.Round(time.Millisecond)
|
|
|
|
startDelta := sl.RequestStart.Sub(start)
|
|
// Original test was 5ms, but got GitHub Action failure with "Bad start delta 5.033929ms",
|
|
// and Travis will get something like: "Bad start delta 15.046059ms", so be more generous.
|
|
if startDelta > 20*time.Millisecond {
|
|
t.Fatalf("Bad start delta %v", startDelta)
|
|
}
|
|
// Since RTT during tests is estimate we remove from calculation.
|
|
if (sl.ServiceLatency + sl.Responder.RTT) < time.Duration(float64(serviceTime)*0.8) {
|
|
t.Fatalf("Bad service latency: %v (%v)", sl.ServiceLatency, serviceTime)
|
|
}
|
|
if sl.TotalLatency < sl.ServiceLatency {
|
|
t.Fatalf("Bad total latency: %v (%v)", sl.TotalLatency, sl.ServiceLatency)
|
|
}
|
|
// We should have NATS latency here that is non-zero with real clients.
|
|
if sl.Requestor.RTT == 0 {
|
|
t.Fatalf("Expected non-zero NATS Requestor latency")
|
|
}
|
|
if sl.Responder.RTT == 0 {
|
|
t.Fatalf("Expected non-zero NATS Requestor latency")
|
|
}
|
|
|
|
// Make sure they add up
|
|
got := sl.TotalLatency
|
|
expected := sl.ServiceLatency + sl.NATSTotalTime()
|
|
if got != expected {
|
|
t.Fatalf("Numbers do not add up: %+v,\ngot: %v\nexpected: %v", sl, got, expected)
|
|
}
|
|
}
|
|
|
|
func extendedCheck(t *testing.T, lc *server.ClientInfo, eUser, appName, eServer string) {
|
|
t.Helper()
|
|
if lc.User != eUser {
|
|
t.Fatalf("Expected user of %q, got %q", eUser, lc.User)
|
|
}
|
|
if appName != "" && appName != lc.Name {
|
|
t.Fatalf("Expected appname of %q, got %q\n", appName, lc.Name)
|
|
}
|
|
if lc.Host == "" {
|
|
t.Fatalf("Expected non-empty IP")
|
|
}
|
|
if lc.ID < 1 || lc.ID > 20 {
|
|
t.Fatalf("Expected a ID in range, got %d", lc.ID)
|
|
}
|
|
if eServer != "" && eServer != lc.Server {
|
|
t.Fatalf("Expected server of %q, got %q", eServer, lc.Server)
|
|
}
|
|
}
|
|
|
|
func noShareCheck(t *testing.T, lc *server.ClientInfo) {
|
|
t.Helper()
|
|
if lc.Name != "" {
|
|
t.Fatalf("appname should not have been shared, got %q", lc.Name)
|
|
}
|
|
if lc.User != "" {
|
|
t.Fatalf("user should not have been shared, got %q", lc.User)
|
|
}
|
|
if lc.Host != "" {
|
|
t.Fatalf("client ip should not have been shared, got %q", lc.Host)
|
|
}
|
|
if lc.ID != 0 {
|
|
t.Fatalf("client id should not have been shared, got %d", lc.ID)
|
|
}
|
|
if lc.Server != "" {
|
|
t.Fatalf("client' server should not have been shared, got %q", lc.Server)
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencySingleServerConnect(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
serviceTime := 25 * time.Millisecond
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
|
|
// Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
start := time.Now()
|
|
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
|
|
rs := sc.clusters[0].servers[0]
|
|
extendedCheck(t, sl.Responder, "foo", "service22", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Now make sure normal use case works with old request style.
|
|
nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc3.Close()
|
|
|
|
start = time.Now()
|
|
if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
nc3.Close()
|
|
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, "foo", "service22", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
|
|
// If a client has a longer RTT that the effective RTT for NATS + responder
|
|
// the requestor RTT will be marked as 0. This can happen quite often with
|
|
// utility programs that are far away from a cluster like NGS but the service
|
|
// response time has a shorter RTT.
|
|
func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) {
|
|
sc := createSuperCluster(t, 2, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have BAR import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// The service listener. Mostly instant response.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
time.Sleep(time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
// Requestor and processing
|
|
requestAndCheck := func(cindex, sindex int) {
|
|
t.Helper()
|
|
sopts := sc.clusters[cindex].opts[sindex]
|
|
|
|
if nmsgs, _, err := rsub.Pending(); err != nil || nmsgs != 0 {
|
|
t.Fatalf("Did not expect any latency results, got %d", nmsgs)
|
|
}
|
|
|
|
rtt := 10 * time.Millisecond
|
|
bw := 1024 * 1024
|
|
sp := newSlowProxy(rtt+5*time.Millisecond, bw, bw, sopts)
|
|
defer sp.stop()
|
|
|
|
nc2 := clientConnect(t, sp.opts(), "bar")
|
|
defer nc2.Close()
|
|
|
|
start := time.Now()
|
|
nc2.Flush()
|
|
// Check rtt for slow proxy
|
|
if d := time.Since(start); d < rtt {
|
|
t.Fatalf("Expected an rtt of at least %v, got %v", rtt, d)
|
|
}
|
|
|
|
// Send the request.
|
|
start = time.Now()
|
|
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil || rmsg == nil {
|
|
t.Fatalf("Did not receive latency results")
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
if sl.Status != 200 {
|
|
t.Fatalf("Expected a status 200 Ok, got [%d] %q", sl.Status, sl.Error)
|
|
}
|
|
// We want to test here that when the client requestor RTT is larger then the response time
|
|
// we still deliver a requestor value > 0.
|
|
// Now check that it is close to rtt.
|
|
if sl.Requestor.RTT < rtt {
|
|
t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.Requestor.RTT)
|
|
}
|
|
if sl.TotalLatency < rtt {
|
|
t.Fatalf("Expected total latency to be > %v, got %v", rtt, sl.TotalLatency)
|
|
}
|
|
|
|
rs := sc.clusters[0].servers[0]
|
|
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Check for trailing duplicates..
|
|
rmsg, err = rsub.NextMsg(100 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Duplicate metric result, %q", rmsg.Data)
|
|
}
|
|
}
|
|
|
|
// Check same server.
|
|
requestAndCheck(0, 0)
|
|
// Check from remote server across GW.
|
|
requestAndCheck(1, 1)
|
|
// Same cluster but different server
|
|
requestAndCheck(0, 1)
|
|
}
|
|
|
|
func connRTT(nc *nats.Conn) time.Duration {
|
|
// Do 5x to flatten
|
|
total := time.Duration(0)
|
|
for i := 0; i < 5; i++ {
|
|
start := time.Now()
|
|
nc.Flush()
|
|
total += time.Since(start)
|
|
}
|
|
return total / 5
|
|
}
|
|
|
|
func TestServiceLatencyRemoteConnect(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
subsBefore := int(sc.clusters[0].servers[0].NumSubscriptions())
|
|
|
|
// The service listener.
|
|
serviceTime := 25 * time.Millisecond
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
if err := checkExpectedSubs(subsBefore+2, sc.clusters[0].servers...); err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
// Same Cluster Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
start := time.Now()
|
|
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error getting latency measurement: %v", err)
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
rs := sc.clusters[0].servers[0]
|
|
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Lastly here, we need to make sure we are properly tracking the extra hops.
|
|
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
|
|
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt {
|
|
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt)
|
|
}
|
|
|
|
// Gateway Requestor
|
|
nc2 = clientConnect(t, sc.clusters[1].opts[1], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
start = time.Now()
|
|
_, err = nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
rmsg, err = rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error getting latency measurement: %v", err)
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Lastly here, we need to make sure we are properly tracking the extra hops.
|
|
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
|
|
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt {
|
|
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt)
|
|
}
|
|
|
|
// Now turn off and make sure we no longer receive updates.
|
|
sc.removeLatencyTracking(t)
|
|
_, err = nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
_, err = rsub.NextMsg(100 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Did not expect to receive a latency metric")
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencySampling(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 50)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
received := int32(0)
|
|
|
|
nc.Subscribe("results", func(msg *nats.Msg) {
|
|
atomic.AddInt32(&received, 1)
|
|
})
|
|
|
|
// Same Cluster Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
|
defer nc2.Close()
|
|
|
|
toSend := 1000
|
|
for i := 0; i < toSend; i++ {
|
|
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
}
|
|
// Wait for results to flow in.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
mid := toSend / 2
|
|
delta := toSend / 10 // 10%
|
|
got := int(atomic.LoadInt32(&received))
|
|
|
|
if got > mid+delta || got < mid-delta {
|
|
t.Fatalf("Sampling number incorrect: %d vs %d", mid, got)
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyNoSubsLeak(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 3)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
nc.Flush()
|
|
// Propagation of sub through super cluster.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
startSubs := sc.totalSubs()
|
|
|
|
fooAcc, _ := sc.clusters[1].servers[1].LookupAccount("FOO")
|
|
startNumSis := fooAcc.NumServiceImports()
|
|
|
|
for i := 0; i < 100; i++ {
|
|
nc := clientConnect(t, sc.clusters[1].opts[1], "bar")
|
|
if _, err := nc.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Error on request: %v", err)
|
|
}
|
|
nc.Close()
|
|
}
|
|
|
|
// We are adding 3 here for the wildcard response subject for service replies.
|
|
// we only have one but it will show in three places.
|
|
startSubs += 3
|
|
|
|
checkFor(t, time.Second, 50*time.Millisecond, func() error {
|
|
if numSubs := sc.totalSubs(); numSubs != startSubs {
|
|
return fmt.Errorf("Leaked %d subs", numSubs-startSubs)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Now also check to make sure the service imports created for the request go away as well.
|
|
checkFor(t, time.Second, 50*time.Millisecond, func() error {
|
|
if numSis := fooAcc.NumServiceImports(); numSis != startNumSis {
|
|
return fmt.Errorf("Leaked %d service imports", numSis-startNumSis)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestServiceLatencyWithName(t *testing.T) {
|
|
sc := createSuperCluster(t, 1, 1)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
opts := sc.clusters[0].opts[0]
|
|
|
|
nc := clientConnectWithName(t, opts, "foo", "dlc22")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
nc2 := clientConnect(t, opts, "bar")
|
|
defer nc2.Close()
|
|
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error getting message: %v", err)
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
// Make sure we have AppName set.
|
|
rs := sc.clusters[0].servers[0]
|
|
extendedCheck(t, sl.Responder, "foo", "dlc22", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
|
|
func TestServiceLatencyWithNameMultiServer(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
nc2 := clientConnect(t, sc.clusters[1].opts[1], "bar")
|
|
defer nc2.Close()
|
|
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
// Make sure we have AppName set.
|
|
rs := sc.clusters[0].servers[1]
|
|
extendedCheck(t, sl.Responder, "foo", "dlc22", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
|
|
func createAccountWithJWT(t *testing.T) (string, nkeys.KeyPair, *jwt.AccountClaims) {
|
|
t.Helper()
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
jwt, _ := nac.Encode(okp)
|
|
return jwt, akp, nac
|
|
}
|
|
|
|
func TestServiceLatencyWithJWT(t *testing.T) {
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
|
|
// Create three accounts, system, service and normal account.
|
|
sysJWT, sysKP, _ := createAccountWithJWT(t)
|
|
sysPub, _ := sysKP.PublicKey()
|
|
|
|
_, svcKP, svcAcc := createAccountWithJWT(t)
|
|
svcPub, _ := svcKP.PublicKey()
|
|
|
|
// Add in the service export with latency tracking here.
|
|
serviceExport := &jwt.Export{Subject: "req.*", Type: jwt.Service}
|
|
svcAcc.Exports.Add(serviceExport)
|
|
svcJWT, err := svcAcc.Encode(okp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating account JWT: %v", err)
|
|
}
|
|
|
|
_, accKP, accAcc := createAccountWithJWT(t)
|
|
accPub, _ := accKP.PublicKey()
|
|
|
|
// Add in the import.
|
|
serviceImport := &jwt.Import{Account: svcPub, Subject: "request", To: "req.echo", Type: jwt.Service}
|
|
accAcc.Imports.Add(serviceImport)
|
|
accJWT, err := accAcc.Encode(okp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating account JWT: %v", err)
|
|
}
|
|
|
|
cf := `
|
|
listen: 127.0.0.1:-1
|
|
cluster {
|
|
name: "A"
|
|
listen: 127.0.0.1:-1
|
|
authorization {
|
|
timeout: 2.2
|
|
} %s
|
|
}
|
|
|
|
operator = "./configs/nkeys/op.jwt"
|
|
system_account = "%s"
|
|
|
|
resolver = MEMORY
|
|
resolver_preload = {
|
|
%s : "%s"
|
|
%s : "%s"
|
|
%s : "%s"
|
|
}
|
|
`
|
|
contents := strings.Replace(fmt.Sprintf(cf, "", sysPub, sysPub, sysJWT, svcPub, svcJWT, accPub, accJWT), "\n\t", "\n", -1)
|
|
conf := createConfFile(t, []byte(contents))
|
|
defer removeFile(t, conf)
|
|
|
|
s, opts := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
// Create a new server and route to main one.
|
|
routeStr := fmt.Sprintf("\n\t\troutes = [nats-route://%s:%d]", opts.Cluster.Host, opts.Cluster.Port)
|
|
contents2 := strings.Replace(fmt.Sprintf(cf, routeStr, sysPub, sysPub, sysJWT, svcPub, svcJWT, accPub, accJWT), "\n\t", "\n", -1)
|
|
|
|
conf2 := createConfFile(t, []byte(contents2))
|
|
defer removeFile(t, conf2)
|
|
|
|
s2, opts2 := RunServerWithConfig(conf2)
|
|
defer s2.Shutdown()
|
|
|
|
checkClusterFormed(t, s, s2)
|
|
|
|
// Create service provider.
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
copt, pubUser := createUserCredsOption(t, s, svcKP)
|
|
nc, err := nats.Connect(url, copt, nats.Name("fooService"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
serviceTime := 25 * time.Millisecond
|
|
nc.Subscribe("req.echo", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond(msg.Data)
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
// Create second client and send request from this one.
|
|
url2 := fmt.Sprintf("nats://%s:%d/", opts2.Host, opts2.Port)
|
|
nc2, err := nats.Connect(url2, createUserCreds(t, s2, accKP))
|
|
if err != nil {
|
|
t.Fatalf("Error creating client: %v\n", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
// We should not receive latency at this time.
|
|
_, err = rsub.NextMsg(100 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Did not expect to receive a latency metric")
|
|
}
|
|
|
|
// Now turn it on..
|
|
updateAccount := func() {
|
|
t.Helper()
|
|
for _, s := range []*server.Server{s, s2} {
|
|
svcAccount, err := s.LookupAccount(svcPub)
|
|
if err != nil {
|
|
t.Fatalf("Could not lookup service account from server %+v", s)
|
|
}
|
|
s.UpdateAccountClaims(svcAccount, svcAcc)
|
|
}
|
|
}
|
|
serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"}
|
|
updateAccount()
|
|
|
|
// Grab the service responder's user.
|
|
|
|
// Send the request.
|
|
start := time.Now()
|
|
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil || rmsg == nil {
|
|
t.Fatalf("Did not receive a latency metric")
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, pubUser, "fooService", s.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Now we will remove tracking. Do this by simulating a JWT update.
|
|
serviceExport.Latency = nil
|
|
updateAccount()
|
|
|
|
// Now we should not get any tracking data.
|
|
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
_, err = rsub.NextMsg(100 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Did not expect to receive a latency metric")
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyAdjustNegativeLatencyValues(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import
|
|
// that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
totalSamples := 50
|
|
for i := 0; i < totalSamples; i++ {
|
|
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
for i := 0; i < totalSamples; i++ {
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %d, %s", i, err)
|
|
}
|
|
if err := json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
if sl.ServiceLatency < 0 {
|
|
t.Fatalf("Unexpected negative latency value: %v", sl)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyRemoteConnectAdjustNegativeValues(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Same Cluster Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Gateway Requestor
|
|
nc3 := clientConnect(t, sc.clusters[1].opts[1], "bar")
|
|
defer nc3.Close()
|
|
|
|
// Send a few initial requests to ensure interest is propagated
|
|
// both for cluster and gateway requestors.
|
|
checkFor(t, 3*time.Second, time.Second, func() error {
|
|
_, err1 := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
_, err2 := nc3.Request("ngs.usage", []byte("1h"), time.Second)
|
|
|
|
if err1 != nil || err2 != nil {
|
|
return fmt.Errorf("Timed out waiting for super cluster to be ready")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Send the request.
|
|
totalSamples := 20
|
|
for i := 0; i < totalSamples; i++ {
|
|
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
}
|
|
|
|
for i := 0; i < totalSamples; i++ {
|
|
if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
for i := 0; i < totalSamples*2; i++ {
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %d, %s", i, err)
|
|
}
|
|
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
if sl.ServiceLatency < 0 {
|
|
t.Fatalf("Unexpected negative service latency value: %v", sl)
|
|
}
|
|
if sl.SystemLatency < 0 {
|
|
t.Fatalf("Unexpected negative system latency value: %v", sl)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyFailureReportingSingleServer(t *testing.T) {
|
|
sc := createSuperCluster(t, 1, 1)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
sc.setResponseThreshold(t, 20*time.Millisecond)
|
|
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
getMetricResult := func() *server.ServiceLatency {
|
|
t.Helper()
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %v", err)
|
|
}
|
|
var sl server.ServiceLatency
|
|
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
return &sl
|
|
}
|
|
|
|
// Same server
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Test a request with no reply subject
|
|
nc2.Publish("ngs.usage", []byte("1h"))
|
|
sl := getMetricResult()
|
|
|
|
if sl.Status != 400 {
|
|
t.Fatalf("Expected to get a bad request status [400], got %d", sl.Status)
|
|
}
|
|
|
|
// Proper request, but no responders.
|
|
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 503 {
|
|
t.Fatalf("Expected to get a service unavailable status [503], got %d", sl.Status)
|
|
}
|
|
|
|
// The service listener. Make it slow. 20ms is respThreshold, so take 2X
|
|
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
|
|
time.Sleep(40 * time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
nc.Flush()
|
|
|
|
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 504 {
|
|
t.Fatalf("Expected to get a service timeout status [504], got %d", sl.Status)
|
|
}
|
|
|
|
// Make sure we do not get duplicates.
|
|
if rmsg, err := rsub.NextMsg(50 * time.Millisecond); err == nil {
|
|
t.Fatalf("Unexpected second response metric: %q\n", rmsg.Data)
|
|
}
|
|
|
|
// Now setup a responder that will respond under the threshold.
|
|
sub.Unsubscribe()
|
|
nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
|
|
time.Sleep(5 * time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
nc.Flush()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Now create a responder using old request and we will do a short timeout.
|
|
nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc3.Close()
|
|
|
|
nc3.Request("ngs.usage", []byte("1h"), time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 408 {
|
|
t.Fatalf("Expected to get a request timeout status [408], got %d", sl.Status)
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 3)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
sc.setResponseThreshold(t, 10*time.Millisecond)
|
|
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
getMetricResult := func() *server.ServiceLatency {
|
|
t.Helper()
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %v", err)
|
|
}
|
|
var sl server.ServiceLatency
|
|
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
return &sl
|
|
}
|
|
|
|
cases := []struct {
|
|
ci, si int
|
|
desc string
|
|
}{
|
|
{0, 0, "same server"},
|
|
{0, 1, "same cluster, different server"},
|
|
{1, 1, "different cluster"},
|
|
}
|
|
|
|
for _, cs := range cases {
|
|
// Select the server to send request from.
|
|
nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Test a request with no reply subject
|
|
nc2.Publish("ngs.usage", []byte("1h"))
|
|
sl := getMetricResult()
|
|
if sl.Status != 400 {
|
|
t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status)
|
|
}
|
|
|
|
// We wait here for the gateways to report no interest b/c optimistic mode.
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Proper request, but no responders.
|
|
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 503 {
|
|
t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status)
|
|
}
|
|
|
|
// The service listener. Make it slow. 10ms is respThreshold, so make 3X
|
|
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
|
|
time.Sleep(30 * time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
defer sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Wait to propagate.
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 504 {
|
|
t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status)
|
|
}
|
|
|
|
// Clean up subscriber and requestor
|
|
nc2.Close()
|
|
sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Wait to propagate.
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// To test a bug rip@nats.io is seeing.
|
|
func TestServiceLatencyOldRequestStyleSingleServer(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: 100%
|
|
subject: latency.svc
|
|
}
|
|
} ]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
|
|
system_account: SYS
|
|
`))
|
|
defer removeFile(t, conf)
|
|
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("latency.svc")
|
|
|
|
// Requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Setup responder
|
|
serviceTime := 25 * time.Millisecond
|
|
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("world"))
|
|
})
|
|
nc.Flush()
|
|
defer sub.Unsubscribe()
|
|
|
|
// Send a request
|
|
start := time.Now()
|
|
if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, "svc", "", srv.Name())
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
|
|
// To test a bug wally@nats.io is seeing.
|
|
func TestServiceAndStreamStackOverflow(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
STATIC {
|
|
users = [ { user: "static", pass: "foo" } ]
|
|
exports [
|
|
{ stream: > }
|
|
{ service: my.service }
|
|
]
|
|
}
|
|
DYN {
|
|
users = [ { user: "foo", pass: "bar" } ]
|
|
imports [
|
|
{ stream { subject: >, account: STATIC } }
|
|
{ service { subject: my.service, account: STATIC } }
|
|
]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, conf)
|
|
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder (just request sub)
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://static:foo@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.SubscribeSync("my.service")
|
|
nc.Flush()
|
|
|
|
// Requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://foo:bar@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Send a single request.
|
|
nc2.PublishRequest("my.service", "foo", []byte("hi"))
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if nm, _, err := sub.Pending(); err != nil || nm != 1 {
|
|
return fmt.Errorf("Expected one request, got %d", nm)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Make sure works for queue subscribers as well.
|
|
sub.Unsubscribe()
|
|
sub, _ = nc.QueueSubscribeSync("my.service", "prod")
|
|
nc.Flush()
|
|
|
|
// Send a single request.
|
|
nc2.PublishRequest("my.service", "foo", []byte("hi"))
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if nm, _, err := sub.Pending(); err != nil || nm != 1 {
|
|
return fmt.Errorf("Expected one request, got %d", nm)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Now create an interest in the stream from nc2. that is a queue subscriber.
|
|
sub2, _ := nc2.QueueSubscribeSync("my.service", "prod")
|
|
defer sub2.Unsubscribe()
|
|
nc2.Flush()
|
|
|
|
// Send a single request.
|
|
nc2.PublishRequest("my.service", "foo", []byte("hi"))
|
|
time.Sleep(10 * time.Millisecond)
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if nm, _, err := sub.Pending(); err != nil || nm != 2 {
|
|
return fmt.Errorf("Expected two requests, got %d", nm)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Check we get the proper detailed information for the requestor when allowed.
|
|
func TestServiceLatencyRequestorSharesDetailedInfo(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 3)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
sc.setResponseThreshold(t, 10*time.Millisecond)
|
|
sc.setImportShare(t)
|
|
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
getMetricResult := func() *server.ServiceLatency {
|
|
t.Helper()
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %v", err)
|
|
}
|
|
var sl server.ServiceLatency
|
|
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
return &sl
|
|
}
|
|
|
|
cases := []struct {
|
|
ci, si int
|
|
desc string
|
|
}{
|
|
{0, 0, "same server"},
|
|
{0, 1, "same cluster, different server"},
|
|
{1, 1, "different cluster"},
|
|
}
|
|
|
|
for _, cs := range cases {
|
|
// Select the server to send request from.
|
|
nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar")
|
|
defer nc2.Close()
|
|
|
|
rs := sc.clusters[cs.ci].servers[cs.si]
|
|
|
|
// Test a request with no reply subject
|
|
nc2.Publish("ngs.usage", []byte("1h"))
|
|
sl := getMetricResult()
|
|
if sl.Status != 400 {
|
|
t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status)
|
|
}
|
|
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
|
|
|
|
// We wait here for the gateways to report no interest b/c optimistic mode.
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Proper request, but no responders.
|
|
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 503 {
|
|
t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status)
|
|
}
|
|
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
|
|
|
|
// The service listener. Make it slow. 10ms is respThreshold, so take 2.5X
|
|
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
|
|
time.Sleep(25 * time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
defer sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Wait to propagate.
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 504 {
|
|
t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status)
|
|
}
|
|
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
|
|
|
|
// Clean up subscriber and requestor
|
|
nc2.Close()
|
|
sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Wait to propagate.
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyRequestorSharesConfig(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: 100%
|
|
subject: latency.svc
|
|
}
|
|
} ]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
|
|
system_account: SYS
|
|
`))
|
|
defer removeFile(t, conf)
|
|
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("latency.svc")
|
|
|
|
// Requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Setup responder
|
|
serviceTime := 25 * time.Millisecond
|
|
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("world"))
|
|
})
|
|
nc.Flush()
|
|
defer sub.Unsubscribe()
|
|
|
|
// Send a request
|
|
start := time.Now()
|
|
if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, "svc", "", srv.Name())
|
|
extendedCheck(t, sl.Requestor, "client", "", srv.Name())
|
|
|
|
// Check reload.
|
|
newConf := []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: 100%
|
|
subject: latency.svc
|
|
}
|
|
} ]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
|
|
system_account: SYS
|
|
`)
|
|
if err := ioutil.WriteFile(conf, newConf, 0600); err != nil {
|
|
t.Fatalf("Error rewriting server's config file: %v", err)
|
|
}
|
|
if err := srv.Reload(); err != nil {
|
|
t.Fatalf("Error on server reload: %v", err)
|
|
}
|
|
|
|
start = time.Now()
|
|
if _, err = nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl2 server.ServiceLatency
|
|
rmsg, _ = rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl2)
|
|
noShareCheck(t, sl2.Requestor)
|
|
}
|
|
|
|
func TestServiceLatencyLossTest(t *testing.T) {
|
|
// assure that behavior with respect to requests timing out (and samples being reordered) is as expected.
|
|
conf := createConfFile(t, []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
threshold: "500ms"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: headers
|
|
subject: latency.svc
|
|
}
|
|
} ]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
system_account: SYS
|
|
`))
|
|
defer removeFile(t, conf)
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder connection
|
|
ncr, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("responder"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncr.Close()
|
|
|
|
ncl, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("latency"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncl.Close()
|
|
// Table of expected state for which message.
|
|
// This also codifies that the first message, in respsonse to second request is ok.
|
|
// Second message, in response to first request times out.
|
|
expectedState := map[int]int{1: http.StatusOK, 2: http.StatusGatewayTimeout}
|
|
msgCnt := 0
|
|
start := time.Now().Add(250 * time.Millisecond)
|
|
|
|
var latErr []error
|
|
// Listen for metrics
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(2)
|
|
rsub, _ := ncl.Subscribe("latency.svc", func(rmsg *nats.Msg) {
|
|
defer wg.Done()
|
|
var sl server.ServiceLatency
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
msgCnt++
|
|
if want := expectedState[msgCnt]; want != sl.Status {
|
|
latErr = append(latErr, fmt.Errorf("Expected different status for msg #%d: %d != %d", msgCnt, want, sl.Status))
|
|
}
|
|
if msgCnt > 1 {
|
|
if start.Before(sl.RequestStart) {
|
|
latErr = append(latErr, fmt.Errorf("start times should indicate reordering %v : %v", start, sl.RequestStart))
|
|
}
|
|
}
|
|
start = sl.RequestStart
|
|
if strings.EqualFold(sl.RequestHeader.Get("Uber-Trace-Id"), fmt.Sprintf("msg-%d", msgCnt)) {
|
|
latErr = append(latErr, fmt.Errorf("no header present"))
|
|
}
|
|
})
|
|
defer rsub.Unsubscribe()
|
|
// Setup requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port),
|
|
nats.UseOldRequestStyle(), nats.Name("requestor"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
respCnt := int64(0)
|
|
reply := nc2.NewRespInbox()
|
|
repSub, _ := nc2.Subscribe(reply, func(msg *nats.Msg) {
|
|
atomic.AddInt64(&respCnt, 1)
|
|
})
|
|
defer repSub.Unsubscribe()
|
|
nc2.Flush()
|
|
// use dedicated send that publishes requests using same reply subject
|
|
send := func(msg string) {
|
|
if err := nc2.PublishMsg(&nats.Msg{Subject: "SVC", Data: []byte(msg), Reply: reply,
|
|
Header: http.Header{"X-B3-Sampled": []string{"1"}}}); err != nil {
|
|
t.Fatalf("Expected a response got: %v", err)
|
|
}
|
|
}
|
|
// Setup responder that skips responding and triggers next request OR responds
|
|
sub, _ := ncr.Subscribe("svc.echo", func(msg *nats.Msg) {
|
|
if string(msg.Data) != "msg2" {
|
|
msg.Respond([]byte("response"))
|
|
} else {
|
|
wg.Add(1)
|
|
go func() { // second request (use go routine to not block in responders callback)
|
|
defer wg.Done()
|
|
time.Sleep(250 * time.Millisecond)
|
|
send("msg1") // will cause the first latency measurement
|
|
}()
|
|
}
|
|
})
|
|
ncr.Flush()
|
|
ncl.Flush()
|
|
nc2.Flush()
|
|
defer sub.Unsubscribe()
|
|
// Send first request, which is expected to timeout
|
|
send("msg2")
|
|
// wait till we got enough responses
|
|
wg.Wait()
|
|
if len(latErr) > 0 {
|
|
t.Fatalf("Got errors %v", latErr)
|
|
}
|
|
if atomic.LoadInt64(&respCnt) != 1 {
|
|
t.Fatalf("Expected only one message")
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyHeaderTriggered(t *testing.T) {
|
|
receiveAndTest := func(t *testing.T, rsub *nats.Subscription, shared bool, header http.Header, status int, srvName string) server.ServiceLatency {
|
|
t.Helper()
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
if rmsg == nil {
|
|
t.Fatal("Expected message")
|
|
return sl
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
if sl.Status != status {
|
|
t.Fatalf("Expected different status %d != %d", status, sl.Status)
|
|
}
|
|
if status == http.StatusOK {
|
|
extendedCheck(t, sl.Responder, "svc", "", srvName)
|
|
}
|
|
if shared {
|
|
extendedCheck(t, sl.Requestor, "client", "", srvName)
|
|
} else {
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
// header are always included
|
|
if v := sl.RequestHeader.Get("Some-Other"); v != "" {
|
|
t.Fatalf("Expected header to be gone")
|
|
}
|
|
for k, value := range header {
|
|
if v := sl.RequestHeader.Get(k); v != value[0] {
|
|
t.Fatalf("Expected header set")
|
|
}
|
|
}
|
|
return sl
|
|
}
|
|
for _, v := range []struct {
|
|
shared bool
|
|
header http.Header
|
|
}{
|
|
{true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}},
|
|
{true, http.Header{"X-B3-Sampled": []string{"1"}}},
|
|
{true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}},
|
|
{true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}},
|
|
{true, http.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}},
|
|
{false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}},
|
|
{false, http.Header{"X-B3-Sampled": []string{"1"}}},
|
|
{false, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}},
|
|
{false, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}},
|
|
{false, http.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}},
|
|
{false, http.Header{
|
|
"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"},
|
|
"X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"},
|
|
"X-B3-SpanId": []string{"e457b5a2e4d86bd1"},
|
|
"X-B3-Sampled": []string{"1"},
|
|
}},
|
|
{false, http.Header{
|
|
"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"},
|
|
"X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"},
|
|
"X-B3-SpanId": []string{"e457b5a2e4d86bd1"},
|
|
}},
|
|
{false, http.Header{
|
|
"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"},
|
|
"Uberctx-X": []string{"foo"},
|
|
}},
|
|
{false, http.Header{
|
|
"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"},
|
|
"Tracestate": []string{"rojo=00f067aa0ba902b7,congo=t61rcWkgMzE"},
|
|
}},
|
|
} {
|
|
t.Run(fmt.Sprintf("%s_%t_%s", t.Name(), v.shared, v.header), func(t *testing.T) {
|
|
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: headers
|
|
subject: latency.svc
|
|
}
|
|
}]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:%t} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
|
|
system_account: SYS
|
|
`, v.shared)))
|
|
defer removeFile(t, conf)
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("latency.svc")
|
|
defer rsub.Unsubscribe()
|
|
|
|
// Setup responder
|
|
serviceTime := 25 * time.Millisecond
|
|
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("world"))
|
|
})
|
|
nc.Flush()
|
|
defer sub.Unsubscribe()
|
|
|
|
// Setup requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Send a request
|
|
start := time.Now()
|
|
msg := &nats.Msg{
|
|
Subject: "SVC",
|
|
Data: []byte("1h"),
|
|
Header: v.header.Clone(),
|
|
}
|
|
msg.Header.Add("Some-Other", "value")
|
|
if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
sl := receiveAndTest(t, rsub, v.shared, v.header, http.StatusOK, srv.Name())
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
// shut down responder to test various error scenarios
|
|
sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Send a request without responder
|
|
if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err == nil {
|
|
t.Fatalf("Expected no response")
|
|
}
|
|
receiveAndTest(t, rsub, v.shared, v.header, http.StatusServiceUnavailable, srv.Name())
|
|
|
|
// send a message without a response
|
|
msg.Reply = ""
|
|
if err := nc2.PublishMsg(msg); err != nil {
|
|
t.Fatalf("Expected no error got %v", err)
|
|
}
|
|
receiveAndTest(t, rsub, v.shared, v.header, http.StatusBadRequest, srv.Name())
|
|
})
|
|
}
|
|
}
|
|
|
|
// From a report by rip@nats.io on simple latency reporting missing in 2 server cluster setup.
|
|
func TestServiceLatencyMissingResults(t *testing.T) {
|
|
accConf := createConfFile(t, []byte(`
|
|
accounts {
|
|
one: {
|
|
users = [ {user: one, password: password} ]
|
|
imports = [ {service: {account: weather, subject: service.weather.requests.>}, to: service.weather.>, share: true} ]
|
|
}
|
|
weather: {
|
|
users = [ {user: weather, password: password} ]
|
|
exports = [ {
|
|
service: service.weather.requests.>
|
|
accounts: [one]
|
|
latency: { sampling: 100%, subject: service.weather.latency }
|
|
} ]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, accConf)
|
|
|
|
s1Conf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
listen: 127.0.0.1:-1
|
|
server_name: s1
|
|
cluster { port: -1 }
|
|
include %q
|
|
`, path.Base(accConf))))
|
|
defer removeFile(t, s1Conf)
|
|
|
|
s1, opts1 := RunServerWithConfig(s1Conf)
|
|
defer s1.Shutdown()
|
|
|
|
s2Conf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
listen: 127.0.0.1:-1
|
|
server_name: s2
|
|
cluster {
|
|
port: -1
|
|
routes = [ nats-route://127.0.0.1:%d ]
|
|
}
|
|
include %q
|
|
`, opts1.Cluster.Port, path.Base(accConf))))
|
|
defer removeFile(t, s2Conf)
|
|
|
|
s2, opts2 := RunServerWithConfig(s2Conf)
|
|
defer s2.Shutdown()
|
|
|
|
checkClusterFormed(t, s1, s2)
|
|
|
|
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "weather", "password", opts1.Host, opts1.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc1.Close()
|
|
|
|
// Create responder
|
|
sub, _ := nc1.Subscribe("service.weather.requests.>", func(msg *nats.Msg) {
|
|
time.Sleep(25 * time.Millisecond)
|
|
msg.Respond([]byte("sunny!"))
|
|
})
|
|
defer sub.Unsubscribe()
|
|
|
|
// Create sync listener for latency.
|
|
lsub, _ := nc1.SubscribeSync("service.weather.latency")
|
|
defer lsub.Unsubscribe()
|
|
nc1.Flush()
|
|
|
|
// Create requestor on s2.
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "one", "password", opts2.Host, opts2.Port), nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
nc2.Request("service.weather.los_angeles", nil, time.Second)
|
|
|
|
lr, err := lsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a latency result, got %v", err)
|
|
}
|
|
// Make sure we reported ok and have valid results for service, system and total.
|
|
var sl server.ServiceLatency
|
|
json.Unmarshal(lr.Data, &sl)
|
|
|
|
if sl.Status != 200 {
|
|
t.Fatalf("Expected a 200 status, got %d\n", sl.Status)
|
|
}
|
|
if sl.ServiceLatency == 0 || sl.SystemLatency == 0 || sl.TotalLatency == 0 {
|
|
t.Fatalf("Received invalid tracking measurements, %d %d %d", sl.ServiceLatency, sl.SystemLatency, sl.TotalLatency)
|
|
}
|
|
}
|