mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
1921 lines
54 KiB
Go
1921 lines
54 KiB
Go
// Copyright 2019-2021 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/jwt/v2"
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nkeys"
|
|
)
|
|
|
|
// Used to setup superclusters for tests.
|
|
type supercluster struct {
|
|
t *testing.T
|
|
clusters []*cluster
|
|
}
|
|
|
|
func (sc *supercluster) shutdown() {
|
|
if sc == nil {
|
|
return
|
|
}
|
|
for _, c := range sc.clusters {
|
|
shutdownCluster(c)
|
|
}
|
|
}
|
|
|
|
const digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
|
const base = 36
|
|
const cnlen = 8
|
|
|
|
func randClusterName() string {
|
|
var name []byte
|
|
rn := rand.Int63()
|
|
for i := 0; i < cnlen; i++ {
|
|
name = append(name, digits[rn%base])
|
|
rn /= base
|
|
}
|
|
return string(name[:cnlen])
|
|
}
|
|
|
|
func createSuperCluster(t *testing.T, numServersPer, numClusters int) *supercluster {
|
|
clusters := []*cluster{}
|
|
|
|
for i := 0; i < numClusters; i++ {
|
|
// Pick cluster name and setup default accounts.
|
|
c := createClusterEx(t, true, 5*time.Millisecond, true, randClusterName(), numServersPer, clusters...)
|
|
clusters = append(clusters, c)
|
|
}
|
|
return &supercluster{t, clusters}
|
|
}
|
|
|
|
func (sc *supercluster) setResponseThreshold(t *testing.T, maxTime time.Duration) {
|
|
t.Helper()
|
|
for _, c := range sc.clusters {
|
|
for _, s := range c.servers {
|
|
foo, err := s.LookupAccount("FOO")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'FOO': %v", err)
|
|
}
|
|
if err := foo.SetServiceExportResponseThreshold("ngs.usage.*", maxTime); err != nil {
|
|
t.Fatalf("Error setting response threshold")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) setImportShare(t *testing.T) {
|
|
t.Helper()
|
|
for _, c := range sc.clusters {
|
|
for _, s := range c.servers {
|
|
foo, err := s.LookupAccount("FOO")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'FOO': %v", err)
|
|
}
|
|
bar, err := s.LookupAccount("BAR")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'BAR': %v", err)
|
|
}
|
|
if err := bar.SetServiceImportSharing(foo, "ngs.usage.bar", true); err != nil {
|
|
t.Fatalf("Error setting import sharing: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) setupLatencyTracking(t *testing.T, p int) {
|
|
t.Helper()
|
|
for _, c := range sc.clusters {
|
|
for _, s := range c.servers {
|
|
foo, err := s.LookupAccount("FOO")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'FOO': %v", err)
|
|
}
|
|
bar, err := s.LookupAccount("BAR")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'BAR': %v", err)
|
|
}
|
|
if err := foo.AddServiceExport("ngs.usage.*", nil); err != nil {
|
|
t.Fatalf("Error adding service export to 'FOO': %v", err)
|
|
}
|
|
if err := foo.TrackServiceExportWithSampling("ngs.usage.*", "results", p); err != nil {
|
|
t.Fatalf("Error adding latency tracking to 'FOO': %v", err)
|
|
}
|
|
if err := bar.AddServiceImport(foo, "ngs.usage", "ngs.usage.bar"); err != nil {
|
|
t.Fatalf("Error adding service import to 'ngs.usage': %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) removeLatencyTracking(t *testing.T) {
|
|
t.Helper()
|
|
for _, c := range sc.clusters {
|
|
for _, s := range c.servers {
|
|
foo, err := s.LookupAccount("FOO")
|
|
if err != nil {
|
|
t.Fatalf("Error looking up account 'FOO': %v", err)
|
|
}
|
|
foo.UnTrackServiceExport("ngs.usage.*")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) totalSubs() int {
|
|
totalSubs := 0
|
|
for _, c := range sc.clusters {
|
|
totalSubs += c.totalSubs()
|
|
}
|
|
return totalSubs
|
|
}
|
|
|
|
func clientConnectWithName(t *testing.T, opts *server.Options, user, appname string) *nats.Conn {
|
|
t.Helper()
|
|
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
|
|
nc, err := nats.Connect(url, nats.Name(appname))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
return nc
|
|
}
|
|
|
|
func clientConnect(t *testing.T, opts *server.Options, user string) *nats.Conn {
|
|
t.Helper()
|
|
return clientConnectWithName(t, opts, user, "")
|
|
}
|
|
|
|
func clientConnectOldRequest(t *testing.T, opts *server.Options, user string) *nats.Conn {
|
|
t.Helper()
|
|
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
|
|
nc, err := nats.Connect(url, nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
return nc
|
|
}
|
|
|
|
func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time, serviceTime time.Duration) {
|
|
t.Helper()
|
|
|
|
if sl.Status != 200 {
|
|
t.Fatalf("Bad status received, wanted 200 got %d", sl.Status)
|
|
}
|
|
|
|
serviceTime = serviceTime.Round(time.Millisecond)
|
|
|
|
startDelta := sl.RequestStart.Sub(start)
|
|
// Original test was 5ms, but got GitHub Action failure with "Bad start delta 5.033929ms",
|
|
// and Travis will get something like: "Bad start delta 15.046059ms", so be more generous.
|
|
if startDelta > 20*time.Millisecond {
|
|
t.Fatalf("Bad start delta %v", startDelta)
|
|
}
|
|
// Since RTT during tests is estimate we remove from calculation.
|
|
if (sl.ServiceLatency + sl.Responder.RTT) < time.Duration(float64(serviceTime)*0.8) {
|
|
t.Fatalf("Bad service latency: %v (%v)", sl.ServiceLatency, serviceTime)
|
|
}
|
|
if sl.TotalLatency < sl.ServiceLatency {
|
|
t.Fatalf("Bad total latency: %v (%v)", sl.TotalLatency, sl.ServiceLatency)
|
|
}
|
|
// We should have NATS latency here that is non-zero with real clients.
|
|
if sl.Requestor.RTT == 0 {
|
|
t.Fatalf("Expected non-zero NATS Requestor latency")
|
|
}
|
|
if sl.Responder.RTT == 0 {
|
|
t.Fatalf("Expected non-zero NATS Requestor latency")
|
|
}
|
|
|
|
// Make sure they add up
|
|
got := sl.TotalLatency
|
|
expected := sl.ServiceLatency + sl.NATSTotalTime()
|
|
if got != expected {
|
|
t.Fatalf("Numbers do not add up: %+v,\ngot: %v\nexpected: %v", sl, got, expected)
|
|
}
|
|
}
|
|
|
|
func extendedCheck(t *testing.T, lc *server.ClientInfo, eUser, appName, eServer string) {
|
|
t.Helper()
|
|
if lc.User != eUser {
|
|
t.Fatalf("Expected user of %q, got %q", eUser, lc.User)
|
|
}
|
|
if appName != "" && appName != lc.Name {
|
|
t.Fatalf("Expected appname of %q, got %q\n", appName, lc.Name)
|
|
}
|
|
if lc.Host == "" {
|
|
t.Fatalf("Expected non-empty IP")
|
|
}
|
|
if lc.ID < 1 || lc.ID > 20 {
|
|
t.Fatalf("Expected a ID in range, got %d", lc.ID)
|
|
}
|
|
if eServer != "" && eServer != lc.Server {
|
|
t.Fatalf("Expected server of %q, got %q", eServer, lc.Server)
|
|
}
|
|
}
|
|
|
|
func noShareCheck(t *testing.T, lc *server.ClientInfo) {
|
|
t.Helper()
|
|
if lc.Name != "" {
|
|
t.Fatalf("appname should not have been shared, got %q", lc.Name)
|
|
}
|
|
if lc.User != "" {
|
|
t.Fatalf("user should not have been shared, got %q", lc.User)
|
|
}
|
|
if lc.Host != "" {
|
|
t.Fatalf("client ip should not have been shared, got %q", lc.Host)
|
|
}
|
|
if lc.ID != 0 {
|
|
t.Fatalf("client id should not have been shared, got %d", lc.ID)
|
|
}
|
|
if lc.Server != "" {
|
|
t.Fatalf("client' server should not have been shared, got %q", lc.Server)
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencySingleServerConnect(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
serviceTime := 25 * time.Millisecond
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
|
|
// Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
start := time.Now()
|
|
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
|
|
rs := sc.clusters[0].servers[0]
|
|
extendedCheck(t, sl.Responder, "foo", "service22", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Now make sure normal use case works with old request style.
|
|
nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc3.Close()
|
|
|
|
start = time.Now()
|
|
if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
nc3.Close()
|
|
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, "foo", "service22", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
|
|
// If a client has a longer RTT that the effective RTT for NATS + responder
|
|
// the requestor RTT will be marked as 0. This can happen quite often with
|
|
// utility programs that are far away from a cluster like NGS but the service
|
|
// response time has a shorter RTT.
|
|
func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) {
|
|
sc := createSuperCluster(t, 2, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have BAR import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// The service listener. Mostly instant response.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
time.Sleep(time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
// Requestor and processing
|
|
requestAndCheck := func(cindex, sindex int) {
|
|
t.Helper()
|
|
sopts := sc.clusters[cindex].opts[sindex]
|
|
|
|
if nmsgs, _, err := rsub.Pending(); err != nil || nmsgs != 0 {
|
|
t.Fatalf("Did not expect any latency results, got %d", nmsgs)
|
|
}
|
|
|
|
rtt := 10 * time.Millisecond
|
|
bw := 1024 * 1024
|
|
sp := newSlowProxy(rtt+5*time.Millisecond, bw, bw, sopts)
|
|
defer sp.stop()
|
|
|
|
nc2 := clientConnect(t, sp.opts(), "bar")
|
|
defer nc2.Close()
|
|
|
|
start := time.Now()
|
|
nc2.Flush()
|
|
// Check rtt for slow proxy
|
|
if d := time.Since(start); d < rtt {
|
|
t.Fatalf("Expected an rtt of at least %v, got %v", rtt, d)
|
|
}
|
|
|
|
// Send the request.
|
|
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil || rmsg == nil {
|
|
t.Fatalf("Did not receive latency results")
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
if sl.Status != 200 {
|
|
t.Fatalf("Expected a status 200 Ok, got [%d] %q", sl.Status, sl.Error)
|
|
}
|
|
// We want to test here that when the client requestor RTT is larger then the response time
|
|
// we still deliver a requestor value > 0.
|
|
// Now check that it is close to rtt.
|
|
if sl.Requestor.RTT < rtt {
|
|
t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.Requestor.RTT)
|
|
}
|
|
if sl.TotalLatency < rtt {
|
|
t.Fatalf("Expected total latency to be > %v, got %v", rtt, sl.TotalLatency)
|
|
}
|
|
|
|
rs := sc.clusters[0].servers[0]
|
|
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Check for trailing duplicates..
|
|
rmsg, err = rsub.NextMsg(100 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Duplicate metric result, %q", rmsg.Data)
|
|
}
|
|
}
|
|
|
|
// Check same server.
|
|
requestAndCheck(0, 0)
|
|
// Check from remote server across GW.
|
|
requestAndCheck(1, 1)
|
|
// Same cluster but different server
|
|
requestAndCheck(0, 1)
|
|
}
|
|
|
|
func connRTT(nc *nats.Conn) time.Duration {
|
|
// Do 5x to flatten
|
|
total := time.Duration(0)
|
|
for i := 0; i < 5; i++ {
|
|
start := time.Now()
|
|
nc.Flush()
|
|
total += time.Since(start)
|
|
}
|
|
return total / 5
|
|
}
|
|
|
|
func TestServiceLatencyRemoteConnect(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
subsBefore := int(sc.clusters[0].servers[0].NumSubscriptions())
|
|
|
|
// The service listener.
|
|
serviceTime := 25 * time.Millisecond
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
if err := checkExpectedSubs(subsBefore+2, sc.clusters[0].servers...); err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
|
|
// Same Cluster Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
start := time.Now()
|
|
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error getting latency measurement: %v", err)
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
rs := sc.clusters[0].servers[0]
|
|
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Lastly here, we need to make sure we are properly tracking the extra hops.
|
|
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
|
|
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt {
|
|
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt)
|
|
}
|
|
|
|
// Gateway Requestor
|
|
nc2 = clientConnect(t, sc.clusters[1].opts[1], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
start = time.Now()
|
|
_, err = nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
rmsg, err = rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error getting latency measurement: %v", err)
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, "foo", "", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Lastly here, we need to make sure we are properly tracking the extra hops.
|
|
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
|
|
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSTotalTime() < crtt {
|
|
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSTotalTime(), crtt)
|
|
}
|
|
|
|
// Now turn off and make sure we no longer receive updates.
|
|
sc.removeLatencyTracking(t)
|
|
_, err = nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
_, err = rsub.NextMsg(100 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Did not expect to receive a latency metric")
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencySampling(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 50)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
received := int32(0)
|
|
|
|
nc.Subscribe("results", func(msg *nats.Msg) {
|
|
atomic.AddInt32(&received, 1)
|
|
})
|
|
|
|
// Same Cluster Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
|
defer nc2.Close()
|
|
|
|
toSend := 1000
|
|
for i := 0; i < toSend; i++ {
|
|
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
}
|
|
// Wait for results to flow in.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
mid := toSend / 2
|
|
delta := toSend / 10 // 10%
|
|
got := int(atomic.LoadInt32(&received))
|
|
|
|
if got > mid+delta || got < mid-delta {
|
|
t.Fatalf("Sampling number incorrect: %d vs %d", mid, got)
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyNoSubsLeak(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 3)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
nc.Flush()
|
|
// Propagation of sub through super cluster.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
startSubs := sc.totalSubs()
|
|
|
|
fooAcc, _ := sc.clusters[1].servers[1].LookupAccount("FOO")
|
|
startNumSis := fooAcc.NumServiceImports()
|
|
|
|
for i := 0; i < 100; i++ {
|
|
nc := clientConnect(t, sc.clusters[1].opts[1], "bar")
|
|
if _, err := nc.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Error on request: %v", err)
|
|
}
|
|
nc.Close()
|
|
}
|
|
|
|
// We are adding 3 here for the wildcard response subject for service replies.
|
|
// we only have one but it will show in three places.
|
|
startSubs += 3
|
|
|
|
checkFor(t, time.Second, 50*time.Millisecond, func() error {
|
|
if numSubs := sc.totalSubs(); numSubs != startSubs {
|
|
return fmt.Errorf("Leaked %d subs", numSubs-startSubs)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Now also check to make sure the service imports created for the request go away as well.
|
|
checkFor(t, time.Second, 50*time.Millisecond, func() error {
|
|
if numSis := fooAcc.NumServiceImports(); numSis != startNumSis {
|
|
return fmt.Errorf("Leaked %d service imports", numSis-startNumSis)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestServiceLatencyWithName(t *testing.T) {
|
|
sc := createSuperCluster(t, 1, 1)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
opts := sc.clusters[0].opts[0]
|
|
|
|
nc := clientConnectWithName(t, opts, "foo", "dlc22")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
nc2 := clientConnect(t, opts, "bar")
|
|
defer nc2.Close()
|
|
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error getting message: %v", err)
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
// Make sure we have AppName set.
|
|
rs := sc.clusters[0].servers[0]
|
|
extendedCheck(t, sl.Responder, "foo", "dlc22", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
|
|
func TestServiceLatencyWithNameMultiServer(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
nc2 := clientConnect(t, sc.clusters[1].opts[1], "bar")
|
|
defer nc2.Close()
|
|
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
// Make sure we have AppName set.
|
|
rs := sc.clusters[0].servers[1]
|
|
extendedCheck(t, sl.Responder, "foo", "dlc22", rs.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
|
|
func createAccountWithJWT(t *testing.T) (string, nkeys.KeyPair, *jwt.AccountClaims) {
|
|
t.Helper()
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
jwt, _ := nac.Encode(okp)
|
|
return jwt, akp, nac
|
|
}
|
|
|
|
func TestServiceLatencyWithJWT(t *testing.T) {
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
|
|
// Create three accounts, system, service and normal account.
|
|
sysJWT, sysKP, _ := createAccountWithJWT(t)
|
|
sysPub, _ := sysKP.PublicKey()
|
|
|
|
_, svcKP, svcAcc := createAccountWithJWT(t)
|
|
svcPub, _ := svcKP.PublicKey()
|
|
|
|
// Add in the service export with latency tracking here.
|
|
serviceExport := &jwt.Export{Subject: "req.*", Type: jwt.Service}
|
|
svcAcc.Exports.Add(serviceExport)
|
|
svcJWT, err := svcAcc.Encode(okp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating account JWT: %v", err)
|
|
}
|
|
|
|
_, accKP, accAcc := createAccountWithJWT(t)
|
|
accPub, _ := accKP.PublicKey()
|
|
|
|
// Add in the import.
|
|
serviceImport := &jwt.Import{Account: svcPub, Subject: "request", To: "req.echo", Type: jwt.Service}
|
|
accAcc.Imports.Add(serviceImport)
|
|
accJWT, err := accAcc.Encode(okp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating account JWT: %v", err)
|
|
}
|
|
|
|
cf := `
|
|
listen: 127.0.0.1:-1
|
|
cluster {
|
|
name: "A"
|
|
listen: 127.0.0.1:-1
|
|
authorization {
|
|
timeout: 2.2
|
|
} %s
|
|
}
|
|
|
|
operator = "./configs/nkeys/op.jwt"
|
|
system_account = "%s"
|
|
|
|
resolver = MEMORY
|
|
resolver_preload = {
|
|
%s : "%s"
|
|
%s : "%s"
|
|
%s : "%s"
|
|
}
|
|
`
|
|
contents := strings.Replace(fmt.Sprintf(cf, "", sysPub, sysPub, sysJWT, svcPub, svcJWT, accPub, accJWT), "\n\t", "\n", -1)
|
|
conf := createConfFile(t, []byte(contents))
|
|
defer removeFile(t, conf)
|
|
|
|
s, opts := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
// Create a new server and route to main one.
|
|
routeStr := fmt.Sprintf("\n\t\troutes = [nats-route://%s:%d]", opts.Cluster.Host, opts.Cluster.Port)
|
|
contents2 := strings.Replace(fmt.Sprintf(cf, routeStr, sysPub, sysPub, sysJWT, svcPub, svcJWT, accPub, accJWT), "\n\t", "\n", -1)
|
|
|
|
conf2 := createConfFile(t, []byte(contents2))
|
|
defer removeFile(t, conf2)
|
|
|
|
s2, opts2 := RunServerWithConfig(conf2)
|
|
defer s2.Shutdown()
|
|
|
|
checkClusterFormed(t, s, s2)
|
|
|
|
// Create service provider.
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
copt, pubUser := createUserCredsOption(t, s, svcKP)
|
|
nc, err := nats.Connect(url, copt, nats.Name("fooService"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
serviceTime := 25 * time.Millisecond
|
|
nc.Subscribe("req.echo", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond(msg.Data)
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("results")
|
|
nc.Flush()
|
|
|
|
// Create second client and send request from this one.
|
|
url2 := fmt.Sprintf("nats://%s:%d/", opts2.Host, opts2.Port)
|
|
nc2, err := nats.Connect(url2, createUserCreds(t, s2, accKP))
|
|
if err != nil {
|
|
t.Fatalf("Error creating client: %v\n", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
// We should not receive latency at this time.
|
|
_, err = rsub.NextMsg(100 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Did not expect to receive a latency metric")
|
|
}
|
|
|
|
// Now turn it on..
|
|
updateAccount := func() {
|
|
t.Helper()
|
|
for _, s := range []*server.Server{s, s2} {
|
|
svcAccount, err := s.LookupAccount(svcPub)
|
|
if err != nil {
|
|
t.Fatalf("Could not lookup service account from server %+v", s)
|
|
}
|
|
s.UpdateAccountClaims(svcAccount, svcAcc)
|
|
}
|
|
}
|
|
serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"}
|
|
updateAccount()
|
|
|
|
// Grab the service responder's user.
|
|
|
|
// Send the request.
|
|
start := time.Now()
|
|
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil || rmsg == nil {
|
|
t.Fatalf("Did not receive a latency metric")
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, pubUser, "fooService", s.Name())
|
|
// Normally requestor's don't share
|
|
noShareCheck(t, sl.Requestor)
|
|
|
|
// Now we will remove tracking. Do this by simulating a JWT update.
|
|
serviceExport.Latency = nil
|
|
updateAccount()
|
|
|
|
// Now we should not get any tracking data.
|
|
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
_, err = rsub.NextMsg(100 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Did not expect to receive a latency metric")
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyAdjustNegativeLatencyValues(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import
|
|
// that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send the request.
|
|
totalSamples := 50
|
|
for i := 0; i < totalSamples; i++ {
|
|
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
for i := 0; i < totalSamples; i++ {
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %d, %s", i, err)
|
|
}
|
|
if err := json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
if sl.ServiceLatency < 0 {
|
|
t.Fatalf("Unexpected negative latency value: %v", sl)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyRemoteConnectAdjustNegativeValues(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 2)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Now we can setup and test, do single node only first.
|
|
// This is the service provider.
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// The service listener.
|
|
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Same Cluster Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Gateway Requestor
|
|
nc3 := clientConnect(t, sc.clusters[1].opts[1], "bar")
|
|
defer nc3.Close()
|
|
|
|
// Send a few initial requests to ensure interest is propagated
|
|
// both for cluster and gateway requestors.
|
|
checkFor(t, 3*time.Second, time.Second, func() error {
|
|
_, err1 := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
|
_, err2 := nc3.Request("ngs.usage", []byte("1h"), time.Second)
|
|
|
|
if err1 != nil || err2 != nil {
|
|
return fmt.Errorf("Timed out waiting for super cluster to be ready")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Send the request.
|
|
totalSamples := 20
|
|
for i := 0; i < totalSamples; i++ {
|
|
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
}
|
|
|
|
for i := 0; i < totalSamples; i++ {
|
|
if _, err := nc3.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
for i := 0; i < totalSamples*2; i++ {
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %d, %s", i, err)
|
|
}
|
|
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
if sl.ServiceLatency < 0 {
|
|
t.Fatalf("Unexpected negative service latency value: %v", sl)
|
|
}
|
|
if sl.SystemLatency < 0 {
|
|
t.Fatalf("Unexpected negative system latency value: %v", sl)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyFailureReportingSingleServer(t *testing.T) {
|
|
sc := createSuperCluster(t, 1, 1)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
sc.setResponseThreshold(t, 20*time.Millisecond)
|
|
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
getMetricResult := func() *server.ServiceLatency {
|
|
t.Helper()
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %v", err)
|
|
}
|
|
var sl server.ServiceLatency
|
|
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
return &sl
|
|
}
|
|
|
|
// Same server
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Test a request with no reply subject
|
|
nc2.Publish("ngs.usage", []byte("1h"))
|
|
sl := getMetricResult()
|
|
|
|
if sl.Status != 400 {
|
|
t.Fatalf("Expected to get a bad request status [400], got %d", sl.Status)
|
|
}
|
|
|
|
// Proper request, but no responders.
|
|
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 503 {
|
|
t.Fatalf("Expected to get a service unavailable status [503], got %d", sl.Status)
|
|
}
|
|
|
|
// The service listener. Make it slow. 20ms is respThreshold, so take 2X
|
|
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
|
|
time.Sleep(40 * time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
nc.Flush()
|
|
|
|
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 504 {
|
|
t.Fatalf("Expected to get a service timeout status [504], got %d", sl.Status)
|
|
}
|
|
|
|
// Make sure we do not get duplicates.
|
|
if rmsg, err := rsub.NextMsg(50 * time.Millisecond); err == nil {
|
|
t.Fatalf("Unexpected second response metric: %q\n", rmsg.Data)
|
|
}
|
|
|
|
// Now setup a responder that will respond under the threshold.
|
|
sub.Unsubscribe()
|
|
nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
|
|
time.Sleep(5 * time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
nc.Flush()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Now create a responder using old request and we will do a short timeout.
|
|
nc3 := clientConnectOldRequest(t, sc.clusters[0].opts[0], "bar")
|
|
defer nc3.Close()
|
|
|
|
nc3.Request("ngs.usage", []byte("1h"), time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 408 {
|
|
t.Fatalf("Expected to get a request timeout status [408], got %d", sl.Status)
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 3)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
sc.setResponseThreshold(t, 10*time.Millisecond)
|
|
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
getMetricResult := func() *server.ServiceLatency {
|
|
t.Helper()
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %v", err)
|
|
}
|
|
var sl server.ServiceLatency
|
|
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
return &sl
|
|
}
|
|
|
|
cases := []struct {
|
|
ci, si int
|
|
desc string
|
|
}{
|
|
{0, 0, "same server"},
|
|
{0, 1, "same cluster, different server"},
|
|
{1, 1, "different cluster"},
|
|
}
|
|
|
|
for _, cs := range cases {
|
|
// Select the server to send request from.
|
|
nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Test a request with no reply subject
|
|
nc2.Publish("ngs.usage", []byte("1h"))
|
|
sl := getMetricResult()
|
|
if sl.Status != 400 {
|
|
t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status)
|
|
}
|
|
|
|
// We wait here for the gateways to report no interest b/c optimistic mode.
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Proper request, but no responders.
|
|
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 503 {
|
|
t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status)
|
|
}
|
|
|
|
// The service listener. Make it slow. 10ms is respThreshold, so make 3X
|
|
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
|
|
time.Sleep(30 * time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
defer sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Wait to propagate.
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 504 {
|
|
t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status)
|
|
}
|
|
|
|
// Clean up subscriber and requestor
|
|
nc2.Close()
|
|
sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Wait to propagate.
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// To test a bug rip@nats.io is seeing.
|
|
func TestServiceLatencyOldRequestStyleSingleServer(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: 100%
|
|
subject: latency.svc
|
|
}
|
|
} ]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
|
|
system_account: SYS
|
|
`))
|
|
defer removeFile(t, conf)
|
|
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("latency.svc")
|
|
|
|
// Requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Setup responder
|
|
serviceTime := 25 * time.Millisecond
|
|
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("world"))
|
|
})
|
|
nc.Flush()
|
|
defer sub.Unsubscribe()
|
|
|
|
// Send a request
|
|
start := time.Now()
|
|
if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, "svc", "", srv.Name())
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
|
|
// To test a bug wally@nats.io is seeing.
|
|
func TestServiceAndStreamStackOverflow(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
STATIC {
|
|
users = [ { user: "static", pass: "foo" } ]
|
|
exports [
|
|
{ stream: > }
|
|
{ service: my.service }
|
|
]
|
|
}
|
|
DYN {
|
|
users = [ { user: "foo", pass: "bar" } ]
|
|
imports [
|
|
{ stream { subject: >, account: STATIC } }
|
|
{ service { subject: my.service, account: STATIC } }
|
|
]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, conf)
|
|
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder (just request sub)
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://static:foo@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.SubscribeSync("my.service")
|
|
nc.Flush()
|
|
|
|
// Requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://foo:bar@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Send a single request.
|
|
nc2.PublishRequest("my.service", "foo", []byte("hi"))
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if nm, _, err := sub.Pending(); err != nil || nm != 1 {
|
|
return fmt.Errorf("Expected one request, got %d", nm)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Make sure works for queue subscribers as well.
|
|
sub.Unsubscribe()
|
|
sub, _ = nc.QueueSubscribeSync("my.service", "prod")
|
|
nc.Flush()
|
|
|
|
// Send a single request.
|
|
nc2.PublishRequest("my.service", "foo", []byte("hi"))
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if nm, _, err := sub.Pending(); err != nil || nm != 1 {
|
|
return fmt.Errorf("Expected one request, got %d", nm)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Now create an interest in the stream from nc2. that is a queue subscriber.
|
|
sub2, _ := nc2.QueueSubscribeSync("my.service", "prod")
|
|
defer sub2.Unsubscribe()
|
|
nc2.Flush()
|
|
|
|
// Send a single request.
|
|
nc2.PublishRequest("my.service", "foo", []byte("hi"))
|
|
time.Sleep(10 * time.Millisecond)
|
|
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
|
if nm, _, err := sub.Pending(); err != nil || nm != 2 {
|
|
return fmt.Errorf("Expected two requests, got %d", nm)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Check we get the proper detailed information for the requestor when allowed.
|
|
func TestServiceLatencyRequestorSharesDetailedInfo(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 3)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
sc.setResponseThreshold(t, 10*time.Millisecond)
|
|
sc.setImportShare(t)
|
|
|
|
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, err := nc.SubscribeSync("results")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
nc.Flush()
|
|
|
|
getMetricResult := func() *server.ServiceLatency {
|
|
t.Helper()
|
|
rmsg, err := rsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected to receive latency metric: %v", err)
|
|
}
|
|
var sl server.ServiceLatency
|
|
if err = json.Unmarshal(rmsg.Data, &sl); err != nil {
|
|
t.Errorf("Unexpected error processing latency metric: %s", err)
|
|
}
|
|
return &sl
|
|
}
|
|
|
|
cases := []struct {
|
|
ci, si int
|
|
desc string
|
|
}{
|
|
{0, 0, "same server"},
|
|
{0, 1, "same cluster, different server"},
|
|
{1, 1, "different cluster"},
|
|
}
|
|
|
|
for _, cs := range cases {
|
|
// Select the server to send request from.
|
|
nc2 := clientConnect(t, sc.clusters[cs.ci].opts[cs.si], "bar")
|
|
defer nc2.Close()
|
|
|
|
rs := sc.clusters[cs.ci].servers[cs.si]
|
|
|
|
// Test a request with no reply subject
|
|
nc2.Publish("ngs.usage", []byte("1h"))
|
|
sl := getMetricResult()
|
|
if sl.Status != 400 {
|
|
t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status)
|
|
}
|
|
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
|
|
|
|
// We wait here for the gateways to report no interest b/c optimistic mode.
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Proper request, but no responders.
|
|
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 503 {
|
|
t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status)
|
|
}
|
|
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
|
|
|
|
// The service listener. Make it slow. 10ms is respThreshold, so take 2.5X
|
|
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
|
|
time.Sleep(25 * time.Millisecond)
|
|
msg.Respond([]byte("22 msgs"))
|
|
})
|
|
defer sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Wait to propagate.
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
|
|
sl = getMetricResult()
|
|
if sl.Status != 504 {
|
|
t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status)
|
|
}
|
|
extendedCheck(t, sl.Requestor, "bar", "", rs.Name())
|
|
|
|
// Clean up subscriber and requestor
|
|
nc2.Close()
|
|
sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Wait to propagate.
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyRequestorSharesConfig(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: 100%
|
|
subject: latency.svc
|
|
}
|
|
} ]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
|
|
system_account: SYS
|
|
`))
|
|
defer removeFile(t, conf)
|
|
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("latency.svc")
|
|
|
|
// Requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Setup responder
|
|
serviceTime := 25 * time.Millisecond
|
|
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("world"))
|
|
})
|
|
nc.Flush()
|
|
defer sub.Unsubscribe()
|
|
|
|
// Send a request
|
|
start := time.Now()
|
|
if _, err := nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
extendedCheck(t, sl.Responder, "svc", "", srv.Name())
|
|
extendedCheck(t, sl.Requestor, "client", "", srv.Name())
|
|
|
|
// Check reload.
|
|
newConf := []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: 100%
|
|
subject: latency.svc
|
|
}
|
|
} ]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
|
|
system_account: SYS
|
|
`)
|
|
if err := os.WriteFile(conf, newConf, 0600); err != nil {
|
|
t.Fatalf("Error rewriting server's config file: %v", err)
|
|
}
|
|
if err := srv.Reload(); err != nil {
|
|
t.Fatalf("Error on server reload: %v", err)
|
|
}
|
|
|
|
if _, err = nc2.Request("SVC", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
var sl2 server.ServiceLatency
|
|
rmsg, _ = rsub.NextMsg(time.Second)
|
|
json.Unmarshal(rmsg.Data, &sl2)
|
|
noShareCheck(t, sl2.Requestor)
|
|
}
|
|
|
|
func TestServiceLatencyLossTest(t *testing.T) {
|
|
// assure that behavior with respect to requests timing out (and samples being reordered) is as expected.
|
|
conf := createConfFile(t, []byte(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
threshold: "500ms"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: headers
|
|
subject: latency.svc
|
|
}
|
|
} ]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
system_account: SYS
|
|
`))
|
|
defer removeFile(t, conf)
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder connection
|
|
ncr, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("responder"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncr.Close()
|
|
|
|
ncl, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("latency"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncl.Close()
|
|
// Table of expected state for which message.
|
|
// This also codifies that the first message, in respsonse to second request is ok.
|
|
// Second message, in response to first request times out.
|
|
expectedState := map[int]int{1: http.StatusOK, 2: http.StatusGatewayTimeout}
|
|
msgCnt := 0
|
|
start := time.Now().Add(250 * time.Millisecond)
|
|
|
|
var latErr []error
|
|
// Listen for metrics
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(2)
|
|
rsub, _ := ncl.Subscribe("latency.svc", func(rmsg *nats.Msg) {
|
|
defer wg.Done()
|
|
var sl server.ServiceLatency
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
msgCnt++
|
|
if want := expectedState[msgCnt]; want != sl.Status {
|
|
latErr = append(latErr, fmt.Errorf("Expected different status for msg #%d: %d != %d", msgCnt, want, sl.Status))
|
|
}
|
|
if msgCnt > 1 {
|
|
if start.Before(sl.RequestStart) {
|
|
latErr = append(latErr, fmt.Errorf("start times should indicate reordering %v : %v", start, sl.RequestStart))
|
|
}
|
|
}
|
|
start = sl.RequestStart
|
|
if strings.EqualFold(sl.RequestHeader.Get("Uber-Trace-Id"), fmt.Sprintf("msg-%d", msgCnt)) {
|
|
latErr = append(latErr, fmt.Errorf("no header present"))
|
|
}
|
|
})
|
|
defer rsub.Unsubscribe()
|
|
// Setup requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port),
|
|
nats.UseOldRequestStyle(), nats.Name("requestor"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
respCnt := int64(0)
|
|
reply := nc2.NewRespInbox()
|
|
repSub, _ := nc2.Subscribe(reply, func(msg *nats.Msg) {
|
|
atomic.AddInt64(&respCnt, 1)
|
|
})
|
|
defer repSub.Unsubscribe()
|
|
nc2.Flush()
|
|
// use dedicated send that publishes requests using same reply subject
|
|
send := func(msg string) {
|
|
if err := nc2.PublishMsg(&nats.Msg{Subject: "SVC", Data: []byte(msg), Reply: reply,
|
|
Header: nats.Header{"X-B3-Sampled": []string{"1"}}}); err != nil {
|
|
t.Fatalf("Expected a response got: %v", err)
|
|
}
|
|
}
|
|
// Setup responder that skips responding and triggers next request OR responds
|
|
sub, _ := ncr.Subscribe("svc.echo", func(msg *nats.Msg) {
|
|
if string(msg.Data) != "msg2" {
|
|
msg.Respond([]byte("response"))
|
|
} else {
|
|
wg.Add(1)
|
|
go func() { // second request (use go routine to not block in responders callback)
|
|
defer wg.Done()
|
|
time.Sleep(250 * time.Millisecond)
|
|
send("msg1") // will cause the first latency measurement
|
|
}()
|
|
}
|
|
})
|
|
ncr.Flush()
|
|
ncl.Flush()
|
|
nc2.Flush()
|
|
defer sub.Unsubscribe()
|
|
// Send first request, which is expected to timeout
|
|
send("msg2")
|
|
// wait till we got enough responses
|
|
wg.Wait()
|
|
if len(latErr) > 0 {
|
|
t.Fatalf("Got errors %v", latErr)
|
|
}
|
|
if atomic.LoadInt64(&respCnt) != 1 {
|
|
t.Fatalf("Expected only one message")
|
|
}
|
|
}
|
|
|
|
func TestServiceLatencyHeaderTriggered(t *testing.T) {
|
|
receiveAndTest := func(t *testing.T, rsub *nats.Subscription, shared bool, header nats.Header, status int, srvName string) server.ServiceLatency {
|
|
t.Helper()
|
|
var sl server.ServiceLatency
|
|
rmsg, _ := rsub.NextMsg(time.Second)
|
|
if rmsg == nil {
|
|
t.Fatal("Expected message")
|
|
return sl
|
|
}
|
|
json.Unmarshal(rmsg.Data, &sl)
|
|
if sl.Status != status {
|
|
t.Fatalf("Expected different status %d != %d", status, sl.Status)
|
|
}
|
|
if status == http.StatusOK {
|
|
extendedCheck(t, sl.Responder, "svc", "", srvName)
|
|
}
|
|
if shared {
|
|
extendedCheck(t, sl.Requestor, "client", "", srvName)
|
|
} else {
|
|
noShareCheck(t, sl.Requestor)
|
|
}
|
|
// header are always included
|
|
if v := sl.RequestHeader.Get("Some-Other"); v != "" {
|
|
t.Fatalf("Expected header to be gone")
|
|
}
|
|
for k, value := range header {
|
|
if v := sl.RequestHeader.Get(k); v != value[0] {
|
|
t.Fatalf("Expected header %q to be set", k)
|
|
}
|
|
}
|
|
return sl
|
|
}
|
|
for _, v := range []struct {
|
|
shared bool
|
|
header nats.Header
|
|
}{
|
|
{true, nats.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}},
|
|
{true, nats.Header{"X-B3-Sampled": []string{"1"}}},
|
|
{true, nats.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}},
|
|
{true, nats.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}},
|
|
{true, nats.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}},
|
|
{false, nats.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}},
|
|
{false, nats.Header{"X-B3-Sampled": []string{"1"}}},
|
|
{false, nats.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}},
|
|
{false, nats.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}},
|
|
{false, nats.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}},
|
|
{false, nats.Header{
|
|
"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"},
|
|
"X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"},
|
|
"X-B3-SpanId": []string{"e457b5a2e4d86bd1"},
|
|
"X-B3-Sampled": []string{"1"},
|
|
}},
|
|
{false, nats.Header{
|
|
"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"},
|
|
"X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"},
|
|
"X-B3-SpanId": []string{"e457b5a2e4d86bd1"},
|
|
}},
|
|
{false, nats.Header{
|
|
"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"},
|
|
"Uberctx-X": []string{"foo"},
|
|
}},
|
|
{false, nats.Header{
|
|
"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"},
|
|
"Tracestate": []string{"rojo=00f067aa0ba902b7,congo=t61rcWkgMzE"},
|
|
}},
|
|
} {
|
|
t.Run(fmt.Sprintf("%s_%t_%s", t.Name(), v.shared, v.header), func(t *testing.T) {
|
|
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
listen: 127.0.0.1:-1
|
|
accounts: {
|
|
SVC: {
|
|
users: [ {user: svc, password: pass} ]
|
|
exports: [ {
|
|
service: "svc.echo"
|
|
accounts: [CLIENT]
|
|
latency: {
|
|
sampling: headers
|
|
subject: latency.svc
|
|
}
|
|
}]
|
|
},
|
|
CLIENT: {
|
|
users: [{user: client, password: pass} ]
|
|
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:%t} ]
|
|
},
|
|
SYS: { users: [{user: admin, password: pass}] }
|
|
}
|
|
|
|
system_account: SYS
|
|
`, v.shared)))
|
|
defer removeFile(t, conf)
|
|
srv, opts := RunServerWithConfig(conf)
|
|
defer srv.Shutdown()
|
|
|
|
// Responder
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("latency.svc")
|
|
defer rsub.Unsubscribe()
|
|
|
|
// Setup responder
|
|
serviceTime := 25 * time.Millisecond
|
|
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
|
|
time.Sleep(serviceTime)
|
|
msg.Respond([]byte("world"))
|
|
})
|
|
nc.Flush()
|
|
defer sub.Unsubscribe()
|
|
|
|
// Setup requestor
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Send a request
|
|
start := time.Now()
|
|
msg := &nats.Msg{
|
|
Subject: "SVC",
|
|
Data: []byte("1h"),
|
|
Header: make(nats.Header),
|
|
}
|
|
for k, v := range v.header {
|
|
for _, val := range v {
|
|
msg.Header.Add(k, val)
|
|
}
|
|
}
|
|
msg.Header.Add("Some-Other", "value")
|
|
if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
sl := receiveAndTest(t, rsub, v.shared, v.header, http.StatusOK, srv.Name())
|
|
checkServiceLatency(t, sl, start, serviceTime)
|
|
// shut down responder to test various error scenarios
|
|
sub.Unsubscribe()
|
|
nc.Flush()
|
|
// Send a request without responder
|
|
if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err == nil {
|
|
t.Fatalf("Expected no response")
|
|
}
|
|
receiveAndTest(t, rsub, v.shared, v.header, http.StatusServiceUnavailable, srv.Name())
|
|
|
|
// send a message without a response
|
|
msg.Reply = ""
|
|
if err := nc2.PublishMsg(msg); err != nil {
|
|
t.Fatalf("Expected no error got %v", err)
|
|
}
|
|
receiveAndTest(t, rsub, v.shared, v.header, http.StatusBadRequest, srv.Name())
|
|
})
|
|
}
|
|
}
|
|
|
|
// From a report by rip@nats.io on simple latency reporting missing in 2 server cluster setup.
|
|
func TestServiceLatencyMissingResults(t *testing.T) {
|
|
accConf := createConfFile(t, []byte(`
|
|
accounts {
|
|
one: {
|
|
users = [ {user: one, password: password} ]
|
|
imports = [ {service: {account: weather, subject: service.weather.requests.>}, to: service.weather.>, share: true} ]
|
|
}
|
|
weather: {
|
|
users = [ {user: weather, password: password} ]
|
|
exports = [ {
|
|
service: service.weather.requests.>
|
|
accounts: [one]
|
|
latency: { sampling: 100%, subject: service.weather.latency }
|
|
} ]
|
|
}
|
|
}
|
|
`))
|
|
defer removeFile(t, accConf)
|
|
|
|
s1Conf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
listen: 127.0.0.1:-1
|
|
server_name: s1
|
|
cluster { port: -1 }
|
|
include %q
|
|
`, filepath.Base(accConf))))
|
|
defer removeFile(t, s1Conf)
|
|
|
|
s1, opts1 := RunServerWithConfig(s1Conf)
|
|
defer s1.Shutdown()
|
|
|
|
s2Conf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
listen: 127.0.0.1:-1
|
|
server_name: s2
|
|
cluster {
|
|
port: -1
|
|
routes = [ nats-route://127.0.0.1:%d ]
|
|
}
|
|
include %q
|
|
`, opts1.Cluster.Port, filepath.Base(accConf))))
|
|
defer removeFile(t, s2Conf)
|
|
|
|
s2, opts2 := RunServerWithConfig(s2Conf)
|
|
defer s2.Shutdown()
|
|
|
|
checkClusterFormed(t, s1, s2)
|
|
|
|
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "weather", "password", opts1.Host, opts1.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc1.Close()
|
|
|
|
// Create responder
|
|
sub, _ := nc1.Subscribe("service.weather.requests.>", func(msg *nats.Msg) {
|
|
time.Sleep(25 * time.Millisecond)
|
|
msg.Respond([]byte("sunny!"))
|
|
})
|
|
defer sub.Unsubscribe()
|
|
|
|
// Create sync listener for latency.
|
|
latSubj := "service.weather.latency"
|
|
lsub, _ := nc1.SubscribeSync(latSubj)
|
|
defer lsub.Unsubscribe()
|
|
nc1.Flush()
|
|
|
|
// Make sure the subscription propagates to s2 server.
|
|
checkSubInterest(t, s2, "weather", latSubj, time.Second)
|
|
|
|
// Create requestor on s2.
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "one", "password", opts2.Host, opts2.Port), nats.UseOldRequestStyle())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
nc2.Request("service.weather.los_angeles", nil, time.Second)
|
|
|
|
lr, err := lsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Expected a latency result, got %v", err)
|
|
}
|
|
// Make sure we reported ok and have valid results for service, system and total.
|
|
var sl server.ServiceLatency
|
|
json.Unmarshal(lr.Data, &sl)
|
|
|
|
if sl.Status != 200 {
|
|
t.Fatalf("Expected a 200 status, got %d\n", sl.Status)
|
|
}
|
|
if sl.ServiceLatency == 0 || sl.SystemLatency == 0 || sl.TotalLatency == 0 {
|
|
t.Fatalf("Received invalid tracking measurements, %d %d %d", sl.ServiceLatency, sl.SystemLatency, sl.TotalLatency)
|
|
}
|
|
}
|
|
|
|
// To test a bug I was seeing.
|
|
func TestServiceLatencyDoubleResponse(t *testing.T) {
|
|
sc := createSuperCluster(t, 3, 1)
|
|
defer sc.shutdown()
|
|
|
|
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
|
sc.setupLatencyTracking(t, 100)
|
|
|
|
// Responder
|
|
nc := clientConnectWithName(t, sc.clusters[0].opts[0], "foo", "service22")
|
|
defer nc.Close()
|
|
|
|
// Setup responder
|
|
sub, _ := nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
|
msg.Respond([]byte("22 msgs"))
|
|
msg.Respond([]byte("boom"))
|
|
})
|
|
nc.Flush()
|
|
defer sub.Unsubscribe()
|
|
|
|
// Listen for metrics
|
|
rsub, _ := nc.SubscribeSync("latency.svc")
|
|
|
|
// Requestor
|
|
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
|
defer nc2.Close()
|
|
|
|
// Send a request
|
|
if _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
|
t.Fatalf("Expected a response")
|
|
}
|
|
|
|
rsub.NextMsg(time.Second)
|
|
time.Sleep(time.Second)
|
|
}
|