mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
First pass latency tracking for exported services
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -16,6 +16,7 @@ package server
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
@@ -98,11 +99,14 @@ type serviceImport struct {
|
||||
claim *jwt.Import
|
||||
from string
|
||||
to string
|
||||
rt ServiceRespType
|
||||
ts int64
|
||||
rt ServiceRespType
|
||||
latency *serviceLatency
|
||||
m1 *ServiceLatency
|
||||
ae bool
|
||||
internal bool
|
||||
invalid bool
|
||||
tracking bool
|
||||
}
|
||||
|
||||
// This is used to record when we create a mapping for implicit service
|
||||
@@ -142,7 +146,15 @@ func (rt ServiceRespType) String() string {
|
||||
type exportAuth struct {
|
||||
tokenReq bool
|
||||
approved map[string]*Account
|
||||
// Only used for service types
|
||||
respType ServiceRespType
|
||||
latency *serviceLatency
|
||||
}
|
||||
|
||||
// Used to track service latency.
|
||||
type serviceLatency struct {
|
||||
sampling int8
|
||||
subject string
|
||||
}
|
||||
|
||||
// importMap tracks the imported streams and services.
|
||||
@@ -403,6 +415,173 @@ func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceR
|
||||
return nil
|
||||
}
|
||||
|
||||
// TrackServiceExport will enable latency tracking of the named service.
|
||||
// Results will be published in this account to the given results subject.
|
||||
func (a *Account) TrackServiceExport(service, results string) error {
|
||||
return a.TrackServiceExportWithSampling(service, results, 100)
|
||||
}
|
||||
|
||||
// TrackServiceExportWithSampling will enable latency tracking of the named service for the given
|
||||
// sampling rate (1-100). Results will be published in this account to the given results subject.
|
||||
func (a *Account) TrackServiceExportWithSampling(service, results string, sampling int) error {
|
||||
if sampling < 1 || sampling > 100 {
|
||||
return ErrBadSampling
|
||||
}
|
||||
if !IsValidPublishSubject(results) {
|
||||
return ErrBadPublishSubject
|
||||
}
|
||||
// Don't loop back on outselves.
|
||||
if a.IsExportService(results) {
|
||||
return ErrBadPublishSubject
|
||||
}
|
||||
|
||||
if a.srv == nil || !a.srv.eventsEnabled() {
|
||||
return ErrNoSysAccount
|
||||
}
|
||||
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
if a == nil {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
if a.exports.services == nil {
|
||||
return ErrMissingService
|
||||
}
|
||||
ea, ok := a.exports.services[service]
|
||||
if !ok {
|
||||
return ErrMissingService
|
||||
}
|
||||
if ea == nil {
|
||||
ea = &exportAuth{}
|
||||
a.exports.services[service] = ea
|
||||
} else if ea.respType != Singleton {
|
||||
return ErrBadServiceType
|
||||
}
|
||||
ea.latency = &serviceLatency{
|
||||
sampling: int8(sampling),
|
||||
subject: results,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsExportService will indicate if this service exists. Will check wildcard scenarios.
|
||||
func (a *Account) IsExportService(service string) bool {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
_, ok := a.exports.services[service]
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
tokens := strings.Split(service, tsep)
|
||||
for subj := range a.exports.services {
|
||||
if isSubsetMatch(tokens, subj) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsExportServiceTracking will indicate if given publish subject is an export service with tracking enabled.
|
||||
func (a *Account) IsExportServiceTracking(service string) bool {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
ea, ok := a.exports.services[service]
|
||||
if ok && ea == nil {
|
||||
return false
|
||||
}
|
||||
if ok && ea != nil && ea.latency != nil {
|
||||
return true
|
||||
}
|
||||
// FIXME(dlc) - Might want to cache this is in the hot path checking for
|
||||
// latency tracking.
|
||||
tokens := strings.Split(service, tsep)
|
||||
for subj, ea := range a.exports.services {
|
||||
if isSubsetMatch(tokens, subj) && ea != nil && ea.latency != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ServiceLatency is the JSON message sent out in respone to latency tracking for
|
||||
// exported services.
|
||||
type ServiceLatency struct {
|
||||
AppName string `json:"app_name,omitempty"`
|
||||
RequestStart time.Time `json:"request_start"`
|
||||
ServiceLatency time.Duration `json:"service_latency"`
|
||||
NATSLatency time.Duration `json:"nats_latency"`
|
||||
TotalLatency time.Duration `json:"total_latency"`
|
||||
}
|
||||
|
||||
// Used for transporting remote laytency measurements.
|
||||
type remoteLatency struct {
|
||||
Account string `json:"account"`
|
||||
ReqId string `json:"req_id"`
|
||||
M2 ServiceLatency `json:"m2"`
|
||||
}
|
||||
|
||||
// sendTrackingMessage will send out the appropriate tracking information for the
|
||||
// service request/response latency. This is called when the requestor's server has
|
||||
// received the response.
|
||||
// TODO(dlc) - holding locks for RTTs may be too much long term. Should revisit.
|
||||
func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *client) bool {
|
||||
now := time.Now()
|
||||
serviceRTT := time.Duration(now.UnixNano() - si.ts)
|
||||
|
||||
var (
|
||||
reqClientRTT = requestor.getRTTValue()
|
||||
natsRTT = reqClientRTT
|
||||
respClientRTT time.Duration
|
||||
appName string
|
||||
)
|
||||
|
||||
expectRemoteM2 := responder != nil && responder.kind != CLIENT
|
||||
|
||||
if responder != nil && responder.kind == CLIENT {
|
||||
respClientRTT = responder.getRTTValue()
|
||||
natsRTT += respClientRTT
|
||||
appName = responder.GetName()
|
||||
}
|
||||
|
||||
// We will estimate time when request left the requestor by time we received
|
||||
// and the client RTT for the requestor.
|
||||
reqStart := time.Unix(0, si.ts-int64(reqClientRTT))
|
||||
sl := ServiceLatency{
|
||||
AppName: appName,
|
||||
RequestStart: reqStart,
|
||||
ServiceLatency: serviceRTT - respClientRTT,
|
||||
NATSLatency: natsRTT,
|
||||
TotalLatency: reqClientRTT + serviceRTT,
|
||||
}
|
||||
|
||||
// If we are expecting a remote measurement, store our sl here.
|
||||
// We need to account for the race between this and us receiving the
|
||||
// remote measurement.
|
||||
// FIXME(dlc) - We need to clean these up but this should happen
|
||||
// already with the auto-expire logic.
|
||||
if expectRemoteM2 {
|
||||
si.acc.mu.Lock()
|
||||
if si.m1 != nil {
|
||||
m2 := si.m1
|
||||
m1 := &sl
|
||||
m1.AppName = m2.AppName
|
||||
m1.ServiceLatency = m2.ServiceLatency
|
||||
m1.NATSLatency = m1.TotalLatency - m1.ServiceLatency
|
||||
si.acc.mu.Unlock()
|
||||
a.srv.sendInternalAccountMsg(a, si.latency.subject, m1)
|
||||
return true
|
||||
}
|
||||
si.m1 = &sl
|
||||
si.acc.mu.Unlock()
|
||||
return false
|
||||
} else {
|
||||
a.srv.sendInternalAccountMsg(a, si.latency.subject, &sl)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// numServiceRoutes returns the number of service routes on this account.
|
||||
func (a *Account) numServiceRoutes() int {
|
||||
a.mu.RLock()
|
||||
@@ -582,9 +761,12 @@ func (a *Account) SetMaxResponseMaps(max int) {
|
||||
// AddServiceImport from above if responding to user input or config changes, etc.
|
||||
func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Import) *serviceImport {
|
||||
rt := Singleton
|
||||
var lat *serviceLatency
|
||||
|
||||
dest.mu.Lock()
|
||||
if ae := dest.exports.services[to]; ae != nil {
|
||||
rt = ae.respType
|
||||
if ea := dest.getExportAuth(to); ea != nil {
|
||||
rt = ea.respType
|
||||
lat = ea.latency
|
||||
}
|
||||
dest.mu.Unlock()
|
||||
|
||||
@@ -592,25 +774,40 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
|
||||
if a.imports.services == nil {
|
||||
a.imports.services = make(map[string]*serviceImport)
|
||||
}
|
||||
si := &serviceImport{dest, claim, from, to, rt, 0, false, false, false}
|
||||
si := &serviceImport{dest, claim, from, to, 0, rt, lat, nil, false, false, false, false}
|
||||
a.imports.services[from] = si
|
||||
a.mu.Unlock()
|
||||
|
||||
return si
|
||||
}
|
||||
|
||||
// Helper to detrmine when to sample.
|
||||
func shouldSample(l *serviceLatency) bool {
|
||||
if l == nil || l.sampling <= 0 {
|
||||
return false
|
||||
}
|
||||
if l.sampling >= 100 {
|
||||
return true
|
||||
}
|
||||
return rand.Int31n(100) <= int32(l.sampling)
|
||||
}
|
||||
|
||||
// This is for internal responses.
|
||||
func (a *Account) addResponseServiceImport(dest *Account, from, to string, rt ServiceRespType) *serviceImport {
|
||||
func (a *Account) addRespServiceImport(dest *Account, from, to string, rt ServiceRespType, lat *serviceLatency) *serviceImport {
|
||||
a.mu.Lock()
|
||||
if a.imports.services == nil {
|
||||
a.imports.services = make(map[string]*serviceImport)
|
||||
}
|
||||
ae := rt == Singleton
|
||||
si := &serviceImport{dest, nil, from, to, rt, 0, ae, true, false}
|
||||
si := &serviceImport{dest, nil, from, to, 0, rt, nil, nil, ae, true, false, false}
|
||||
a.imports.services[from] = si
|
||||
if ae {
|
||||
a.nae++
|
||||
si.ts = time.Now().Unix()
|
||||
si.ts = time.Now().UnixNano()
|
||||
if lat != nil {
|
||||
si.latency = lat
|
||||
si.tracking = true
|
||||
}
|
||||
if a.nae > a.maxnae && !a.pruning {
|
||||
a.pruning = true
|
||||
go a.pruneAutoExpireResponseMaps()
|
||||
@@ -648,14 +845,14 @@ func (a *Account) pruneAutoExpireResponseMaps() {
|
||||
}()
|
||||
|
||||
a.mu.RLock()
|
||||
ttl := int64(a.maxaettl/time.Second) + 1
|
||||
ttl := int64(a.maxaettl)
|
||||
a.mu.RUnlock()
|
||||
|
||||
for {
|
||||
sis := a.autoExpireResponseMaps()
|
||||
|
||||
// Check ttl items.
|
||||
now := time.Now().Unix()
|
||||
now := time.Now().UnixNano()
|
||||
for i, si := range sis {
|
||||
if now-si.ts >= ttl {
|
||||
a.removeServiceImport(si.from)
|
||||
@@ -798,6 +995,30 @@ func (a *Account) checkExportApproved(account *Account, subject string, imClaim
|
||||
return false
|
||||
}
|
||||
|
||||
// Helper function to get an exportAuth.
|
||||
// Lock should be held on entry.
|
||||
func (a *Account) getExportAuth(subj string) *exportAuth {
|
||||
ea, ok := a.exports.services[subj]
|
||||
// The export probably has a wildcard, so lookup that up.
|
||||
if !ok {
|
||||
ea = a.getWildcardExportAuth(subj)
|
||||
}
|
||||
return ea
|
||||
}
|
||||
|
||||
// This helper is used when trying to match an exportAuth record that is
|
||||
// represented by a wildcard.
|
||||
// Lock should be held on entry.
|
||||
func (a *Account) getWildcardExportAuth(to string) *exportAuth {
|
||||
tokens := strings.Split(to, tsep)
|
||||
for subj, ea := range a.exports.services {
|
||||
if isSubsetMatch(tokens, subj) {
|
||||
return ea
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Will fetch the activation token for an import.
|
||||
func fetchActivation(url string) string {
|
||||
// FIXME(dlc) - Make configurable.
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -1181,6 +1182,7 @@ func TestCrossAccountRequestReply(t *testing.T) {
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
@@ -1270,6 +1272,140 @@ func TestCrossAccountRequestReply(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountRequestReplyTrackLatency(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
// Run server in Go routine. We need this one running for internal sending of msgs.
|
||||
go s.Start()
|
||||
// Wait for accept loop(s) to be started
|
||||
if !s.ReadyForConnections(10 * time.Second) {
|
||||
panic("Unable to start NATS Server in Go Routine")
|
||||
}
|
||||
|
||||
cfoo, crFoo, _ := newClientForServer(s)
|
||||
defer cfoo.nc.Close()
|
||||
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'foo' account: %v", err)
|
||||
}
|
||||
|
||||
cbar, crBar, _ := newClientForServer(s)
|
||||
defer cbar.nc.Close()
|
||||
|
||||
if err := cbar.registerWithAccount(barAcc); err != nil {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
// Add in the service export for the requests. Make it public.
|
||||
if err := fooAcc.AddServiceExport("track.service", nil); err != nil {
|
||||
t.Fatalf("Error adding account service export to client foo: %v", err)
|
||||
}
|
||||
|
||||
// Now let's add in tracking
|
||||
|
||||
// This looks ok but should fail because we have not set a system account needed for internal msgs.
|
||||
if err := fooAcc.TrackServiceExport("track.service", "results"); err != ErrNoSysAccount {
|
||||
t.Fatalf("Expected error enabling tracking latency without a system account")
|
||||
}
|
||||
|
||||
if err := s.SetSystemAccount(globalAccountName); err != nil {
|
||||
t.Fatalf("Error setting system account: %v", err)
|
||||
}
|
||||
|
||||
// First check we get an error if service does not exist.
|
||||
if err := fooAcc.TrackServiceExport("track.wrong", "results"); err != ErrMissingService {
|
||||
t.Fatalf("Expected error enabling tracking latency for wrong service")
|
||||
}
|
||||
// Check results should be a valid subject
|
||||
if err := fooAcc.TrackServiceExport("track.service", "results.*"); err != ErrBadPublishSubject {
|
||||
t.Fatalf("Expected error enabling tracking latency for bad results subject")
|
||||
}
|
||||
// Make sure we can not loop around on ourselves..
|
||||
if err := fooAcc.TrackServiceExport("track.service", "track.service"); err != ErrBadPublishSubject {
|
||||
t.Fatalf("Expected error enabling tracking latency for same subject")
|
||||
}
|
||||
// Check bad sampling
|
||||
if err := fooAcc.TrackServiceExportWithSampling("track.service", "results", -1); err != ErrBadSampling {
|
||||
t.Fatalf("Expected error enabling tracking latency for bad sampling")
|
||||
}
|
||||
if err := fooAcc.TrackServiceExportWithSampling("track.service", "results", 101); err != ErrBadSampling {
|
||||
t.Fatalf("Expected error enabling tracking latency for bad sampling")
|
||||
}
|
||||
|
||||
// Now let's add in tracking for real. This will be 100%
|
||||
if err := fooAcc.TrackServiceExport("track.service", "results"); err != nil {
|
||||
t.Fatalf("Error enabling tracking latency: %v", err)
|
||||
}
|
||||
|
||||
// Now add in the route mapping for request to be routed to the foo account.
|
||||
if err := barAcc.AddServiceImport(fooAcc, "req", "track.service"); err != nil {
|
||||
t.Fatalf("Error adding account service import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Now setup the resonder under cfoo and the listener for the results
|
||||
cfoo.parse([]byte("SUB track.service 1\r\nSUB results 2\r\n"))
|
||||
|
||||
readFooMsg := func() ([]byte, string) {
|
||||
t.Helper()
|
||||
l, err := crFoo.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
mraw := msgPat.FindAllStringSubmatch(l, -1)
|
||||
if len(mraw) == 0 {
|
||||
t.Fatalf("No message received")
|
||||
}
|
||||
msg := mraw[0]
|
||||
msgSize, _ := strconv.Atoi(msg[LEN_INDEX])
|
||||
return grabPayload(crFoo, msgSize), msg[REPLY_INDEX]
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Now send the request. Remember we expect the request on our local foo. We added the route
|
||||
// with that "from" and will map it to "test.request"
|
||||
go cbar.parseAndFlush([]byte("SUB resp 11\r\nPUB req resp 4\r\nhelp\r\n"))
|
||||
|
||||
// Now read the request from crFoo
|
||||
_, reply := readFooMsg()
|
||||
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", reply)
|
||||
|
||||
serviceTime := 25 * time.Millisecond
|
||||
|
||||
// We will wait a bit to check latency results
|
||||
go func() {
|
||||
time.Sleep(serviceTime)
|
||||
cfoo.parseAndFlush([]byte(replyOp))
|
||||
}()
|
||||
|
||||
// Now read the response from crBar
|
||||
_, err := crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
|
||||
// Now let's check that we got the sampling results
|
||||
rMsg, _ := readFooMsg()
|
||||
|
||||
// Unmarshal and check it.
|
||||
var sl ServiceLatency
|
||||
err = json.Unmarshal(rMsg, &sl)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not parse latency json: %v\n", err)
|
||||
}
|
||||
startDelta := sl.RequestStart.Sub(start)
|
||||
if startDelta > 5*time.Millisecond {
|
||||
t.Fatalf("Bad start delta %v", startDelta)
|
||||
}
|
||||
if sl.ServiceLatency < serviceTime {
|
||||
t.Fatalf("Bad service latency: %v", sl.ServiceLatency)
|
||||
}
|
||||
if sl.TotalLatency < sl.ServiceLatency {
|
||||
t.Fatalf("Bad total latency: %v", sl.ServiceLatency)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCrossAccountRequestReplyResponseMaps(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
@@ -1774,6 +1910,6 @@ func BenchmarkNewRouteReply(b *testing.B) {
|
||||
c, _, _ := newClientForServer(s)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c.newServiceReply()
|
||||
c.newServiceReply(false)
|
||||
}
|
||||
}
|
||||
|
||||
129
server/client.go
129
server/client.go
@@ -187,8 +187,9 @@ type client struct {
|
||||
last time.Time
|
||||
parseState
|
||||
|
||||
rtt time.Duration
|
||||
rttStart time.Time
|
||||
rtt time.Duration
|
||||
rttStart time.Time
|
||||
rrTracking map[string]*remoteLatency
|
||||
|
||||
route *route
|
||||
gw *gateway
|
||||
@@ -319,6 +320,15 @@ func (c *client) String() (id string) {
|
||||
return c.ncs
|
||||
}
|
||||
|
||||
// GetName returns the application supplied name for the connection.
|
||||
func (c *client) GetName() string {
|
||||
c.mu.Lock()
|
||||
name := c.opts.Name
|
||||
c.mu.Unlock()
|
||||
return name
|
||||
}
|
||||
|
||||
// GetOpts returns the client options provided by the application.
|
||||
func (c *client) GetOpts() *clientOpts {
|
||||
return &c.opts
|
||||
}
|
||||
@@ -1480,6 +1490,15 @@ func (c *client) sendPong() {
|
||||
c.sendProto([]byte("PONG\r\n"), true)
|
||||
}
|
||||
|
||||
// Used to kick off a RTT measurement for latency tracking.
|
||||
func (c *client) sendRTTPing() {
|
||||
c.mu.Lock()
|
||||
if c.flags.isSet(connectReceived) {
|
||||
c.sendPing()
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Assume the lock is held upon entry.
|
||||
func (c *client) sendPing() {
|
||||
c.rttStart = time.Now()
|
||||
@@ -2192,6 +2211,24 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Do a fast check here to see if we should be tracking this from a latency
|
||||
// persepective. This will be for a request being received for an exported service.
|
||||
// This needs to be from a non-client (otherwise tracking happens at requestor).
|
||||
if c.kind != CLIENT && client.kind == CLIENT && len(c.pa.reply) > minReplyLen {
|
||||
// FIXME(dlc) - We may need to optimize this.
|
||||
if client.acc.IsExportServiceTracking(string(c.pa.subject)) {
|
||||
// If we do not have a registered RTT queue that up now.
|
||||
if client.rtt == 0 {
|
||||
client.sendPing()
|
||||
}
|
||||
// We will have tagged this with a suffix ('.T') if we are tracking. This is
|
||||
// needed from sampling. Not all will be tracked.
|
||||
if isTrackedReply(c.pa.reply) {
|
||||
client.trackRemoteReply(string(c.pa.reply))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Queue to outbound buffer
|
||||
client.queueOutbound(mh)
|
||||
client.queueOutbound(msg)
|
||||
@@ -2232,6 +2269,21 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// This will track a remote reply for an exported service that has requested
|
||||
// latency tracking.
|
||||
// Lock assumed to be held.
|
||||
func (c *client) trackRemoteReply(reply string) {
|
||||
if c.rrTracking == nil {
|
||||
c.rrTracking = make(map[string]*remoteLatency)
|
||||
}
|
||||
rl := remoteLatency{
|
||||
Account: c.acc.Name,
|
||||
ReqId: reply,
|
||||
}
|
||||
rl.M2.RequestStart = time.Now()
|
||||
c.rrTracking[reply] = &rl
|
||||
}
|
||||
|
||||
// pruneReplyPerms will remove any stale or expired entries
|
||||
// in our reply cache. We make sure to not check too often.
|
||||
func (c *client) pruneReplyPerms() {
|
||||
@@ -2341,27 +2393,43 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool {
|
||||
// Used to mimic client like replies.
|
||||
const (
|
||||
replyPrefix = "_R_."
|
||||
trackSuffix = ".T"
|
||||
replyPrefixLen = len(replyPrefix)
|
||||
minReplyLen = 15
|
||||
digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
||||
base = 62
|
||||
)
|
||||
|
||||
// newServiceReply is used when rewriting replies that cross account boundaries.
|
||||
// These will look like _R_.XXXXXXXX.
|
||||
func (c *client) newServiceReply() []byte {
|
||||
func (c *client) newServiceReply(tracking bool) []byte {
|
||||
// Check to see if we have our own rand yet. Global rand
|
||||
// has contention with lots of clients, etc.
|
||||
if c.in.prand == nil {
|
||||
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
}
|
||||
|
||||
var b = [15]byte{'_', 'R', '_', '.'}
|
||||
var b = [minReplyLen]byte{'_', 'R', '_', '.'}
|
||||
rn := c.in.prand.Int63()
|
||||
for i, l := replyPrefixLen, rn; i < len(b); i++ {
|
||||
b[i] = digits[l%base]
|
||||
l /= base
|
||||
}
|
||||
return b[:]
|
||||
reply := b[:]
|
||||
if tracking && c.srv.sys != nil {
|
||||
// Add in our tracking identifier. This allows the metrics to get back to only
|
||||
// this server without needless SUBS/UNSUBS.
|
||||
reply = append(reply, '.')
|
||||
reply = append(reply, c.srv.sys.shash...)
|
||||
reply = append(reply, '.', 'T')
|
||||
}
|
||||
return reply
|
||||
}
|
||||
|
||||
// Test whether this is a tracked reply.
|
||||
func isTrackedReply(reply []byte) bool {
|
||||
lreply := len(reply) - 1
|
||||
return lreply > 3 && reply[lreply-1] == '.' && reply[lreply] == 'T'
|
||||
}
|
||||
|
||||
// Test whether a reply subject is a service import reply.
|
||||
@@ -2420,6 +2488,28 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
c.checkForImportServices(c.acc, msg)
|
||||
}
|
||||
|
||||
// If we have an exported service and we are doing remote tracking, check this subject
|
||||
// to see if we need to report the latency.
|
||||
if c.acc.exports.services != nil && c.rrTracking != nil {
|
||||
c.mu.Lock()
|
||||
rl := c.rrTracking[string(c.pa.subject)]
|
||||
if rl != nil {
|
||||
delete(c.rrTracking, string(c.pa.subject))
|
||||
}
|
||||
rtt := c.rtt
|
||||
c.mu.Unlock()
|
||||
if rl != nil {
|
||||
sl := &rl.M2
|
||||
// Fill this in and send it off to the other side.
|
||||
sl.AppName = c.opts.Name
|
||||
sl.ServiceLatency = time.Since(sl.RequestStart) - rtt
|
||||
sl.NATSLatency = rtt
|
||||
sl.TotalLatency = sl.ServiceLatency + sl.NATSLatency
|
||||
lsub := remoteLatencySubjectForResponse(c.pa.subject)
|
||||
c.srv.sendInternalAccountMsg(nil, lsub, &rl) // Send to SYS account
|
||||
}
|
||||
}
|
||||
|
||||
// Match the subscriptions. We will use our own L1 map if
|
||||
// it's still valid, avoiding contention on the shared sublist.
|
||||
var r *SublistResult
|
||||
@@ -2489,17 +2579,22 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
// If we have been marked invalid simply return here.
|
||||
if si != nil && !invalid && si.acc != nil && si.acc.sl != nil {
|
||||
var nrr []byte
|
||||
if si.ae {
|
||||
acc.removeServiceImport(si.from)
|
||||
}
|
||||
if c.pa.reply != nil {
|
||||
var latency *serviceLatency
|
||||
var tracking bool
|
||||
if tracking = shouldSample(si.latency); tracking {
|
||||
latency = si.latency
|
||||
}
|
||||
// We want to remap this to provide anonymity.
|
||||
nrr = c.newServiceReply()
|
||||
si.acc.addResponseServiceImport(acc, string(nrr), string(c.pa.reply), si.rt)
|
||||
nrr = c.newServiceReply(tracking)
|
||||
si.acc.addRespServiceImport(acc, string(nrr), string(c.pa.reply), si.rt, latency)
|
||||
|
||||
// Track our responses for cleanup if not auto-expire.
|
||||
if si.rt != Singleton {
|
||||
acc.addRespMapEntry(si.acc, string(c.pa.reply), string(nrr))
|
||||
} else if si.latency != nil && c.rtt == 0 {
|
||||
// We have a service import that we are tracking but have not established RTT
|
||||
c.sendRTTPing()
|
||||
}
|
||||
|
||||
// If this is a client or leaf connection and we are in gateway mode,
|
||||
@@ -2533,6 +2628,20 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
} else {
|
||||
c.processMsgResults(si.acc, rr, msg, []byte(si.to), nrr, pmrNoFlag)
|
||||
}
|
||||
|
||||
shouldRemove := si.ae
|
||||
|
||||
// Calculate tracking info here if we are tracking this request/response.
|
||||
if si.tracking {
|
||||
if requesting := firstSubFromResult(rr); requesting != nil {
|
||||
shouldRemove = acc.sendTrackingLatency(si, requesting.client, c)
|
||||
}
|
||||
}
|
||||
|
||||
if shouldRemove {
|
||||
acc.removeServiceImport(si.from)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -305,6 +305,13 @@ const (
|
||||
LEN_INDEX = 5
|
||||
)
|
||||
|
||||
func grabPayload(cr *bufio.Reader, expected int) []byte {
|
||||
d := make([]byte, expected)
|
||||
n, _ := cr.Read(d)
|
||||
cr.ReadString('\n')
|
||||
return d[:n]
|
||||
}
|
||||
|
||||
func checkPayload(cr *bufio.Reader, expected []byte, t *testing.T) {
|
||||
t.Helper()
|
||||
// Read in payload
|
||||
|
||||
@@ -40,6 +40,9 @@ var (
|
||||
// ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.>
|
||||
ErrReservedPublishSubject = errors.New("reserved internal subject")
|
||||
|
||||
// ErrBadPublishSubject represents an error condition for an invalid publish subject.
|
||||
ErrBadPublishSubject = errors.New("invalid publish subject")
|
||||
|
||||
// ErrBadClientProtocol signals a client requested an invalid client protocol.
|
||||
ErrBadClientProtocol = errors.New("invalid client protocol")
|
||||
|
||||
@@ -76,6 +79,15 @@ var (
|
||||
// ErrMissingAccount is returned when an account does not exist.
|
||||
ErrMissingAccount = errors.New("account missing")
|
||||
|
||||
// ErrMissingService is returned when an account does not have an exported service.
|
||||
ErrMissingService = errors.New("service missing")
|
||||
|
||||
// ErrBadServiceType is returned when latency tracking is being applied to non-singleton response types.
|
||||
ErrBadServiceType = errors.New("bad service response type")
|
||||
|
||||
// ErrBadSampling is returned when the sampling for latency tracking is not 1 >= sample <= 100.
|
||||
ErrBadSampling = errors.New("bad sampling percentage, should be 1-100")
|
||||
|
||||
// ErrAccountValidation is returned when an account has failed validation.
|
||||
ErrAccountValidation = errors.New("account validation failed")
|
||||
|
||||
|
||||
150
server/events.go
150
server/events.go
@@ -14,6 +14,8 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
@@ -39,6 +41,7 @@ const (
|
||||
serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ"
|
||||
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING"
|
||||
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT"
|
||||
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
|
||||
|
||||
shutdownEventTokens = 4
|
||||
serverSubjectIndex = 2
|
||||
@@ -65,6 +68,7 @@ type internal struct {
|
||||
orphMax time.Duration
|
||||
chkOrph time.Duration
|
||||
statsz time.Duration
|
||||
shash string
|
||||
}
|
||||
|
||||
// ServerStatsMsg is sent periodically with stats updates.
|
||||
@@ -173,6 +177,7 @@ type DataStats struct {
|
||||
|
||||
// Used for internally queueing up messages that the server wants to send.
|
||||
type pubMsg struct {
|
||||
acc *Account
|
||||
sub string
|
||||
rply string
|
||||
si *ServerInfo
|
||||
@@ -197,6 +202,7 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
|
||||
return
|
||||
}
|
||||
c := s.sys.client
|
||||
sysacc := s.sys.account
|
||||
sendq := s.sys.sendq
|
||||
id := s.info.ID
|
||||
host := s.info.Host
|
||||
@@ -207,16 +213,25 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// Warn when internal send queue is backed up past 75%
|
||||
warnThresh := 3 * internalSendQLen / 4
|
||||
warnFreq := time.Second
|
||||
last := time.Now().Add(-warnFreq)
|
||||
|
||||
for s.eventsRunning() {
|
||||
// Setup information for next message
|
||||
seq := atomic.AddUint64(seqp, 1)
|
||||
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
|
||||
s.Warnf("Internal system send queue > 75%")
|
||||
last = time.Now()
|
||||
}
|
||||
|
||||
select {
|
||||
case pm := <-sendq:
|
||||
if pm.si != nil {
|
||||
pm.si.Host = host
|
||||
pm.si.Cluster = cluster
|
||||
pm.si.ID = id
|
||||
pm.si.Seq = seq
|
||||
pm.si.Seq = atomic.AddUint64(seqp, 1)
|
||||
pm.si.Version = VERSION
|
||||
pm.si.Time = time.Now()
|
||||
}
|
||||
@@ -224,11 +239,20 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
|
||||
if pm.msg != nil {
|
||||
b, _ = json.MarshalIndent(pm.msg, _EMPTY_, " ")
|
||||
}
|
||||
// We can have an override for account here.
|
||||
c.mu.Lock()
|
||||
if pm.acc != nil {
|
||||
c.acc = pm.acc
|
||||
} else {
|
||||
c.acc = sysacc
|
||||
}
|
||||
// Prep internal structures needed to send message.
|
||||
c.pa.subject = []byte(pm.sub)
|
||||
c.pa.size = len(b)
|
||||
c.pa.szb = []byte(strconv.FormatInt(int64(len(b)), 10))
|
||||
c.pa.reply = []byte(pm.rply)
|
||||
c.mu.Unlock()
|
||||
|
||||
// Add in NL
|
||||
b = append(b, _CRLF_...)
|
||||
c.processInboundClientMsg(b)
|
||||
@@ -258,7 +282,29 @@ func (s *Server) sendShutdownEvent() {
|
||||
s.sys.subs = nil
|
||||
s.mu.Unlock()
|
||||
// Send to the internal queue and mark as last.
|
||||
sendq <- &pubMsg{subj, _EMPTY_, nil, nil, true}
|
||||
sendq <- &pubMsg{nil, subj, _EMPTY_, nil, nil, true}
|
||||
}
|
||||
|
||||
// Used to send an internal message to an arbitrary account.
|
||||
func (s *Server) sendInternalAccountMsg(a *Account, subject string, msg interface{}) error {
|
||||
s.mu.Lock()
|
||||
if s.sys == nil || s.sys.sendq == nil {
|
||||
s.mu.Unlock()
|
||||
return ErrNoSysAccount
|
||||
}
|
||||
sendq := s.sys.sendq
|
||||
// Don't hold lock while placing on the channel.
|
||||
s.mu.Unlock()
|
||||
sendq <- &pubMsg{a, subject, "", nil, msg, false}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This will queue up a message to be sent.
|
||||
// Lock should not be held.
|
||||
func (s *Server) sendInternalMsgLocked(sub, rply string, si *ServerInfo, msg interface{}) {
|
||||
s.mu.Lock()
|
||||
s.sendInternalMsg(sub, rply, si, msg)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// This will queue up a message to be sent.
|
||||
@@ -270,7 +316,7 @@ func (s *Server) sendInternalMsg(sub, rply string, si *ServerInfo, msg interface
|
||||
sendq := s.sys.sendq
|
||||
// Don't hold lock while placing on the channel.
|
||||
s.mu.Unlock()
|
||||
sendq <- &pubMsg{sub, rply, si, msg, false}
|
||||
sendq <- &pubMsg{nil, sub, rply, si, msg, false}
|
||||
s.mu.Lock()
|
||||
}
|
||||
|
||||
@@ -414,6 +460,9 @@ func (s *Server) startRemoteServerSweepTimer() {
|
||||
s.sys.sweeper = time.AfterFunc(s.sys.chkOrph, s.wrapChk(s.checkRemoteServers))
|
||||
}
|
||||
|
||||
// Length of our system hash used for server targetted messages.
|
||||
const sysHashLen = 4
|
||||
|
||||
// This will setup our system wide tracking subs.
|
||||
// For now we will setup one wildcard subscription to
|
||||
// monitor all accounts for changes in number of connections.
|
||||
@@ -424,6 +473,12 @@ func (s *Server) initEventTracking() {
|
||||
if !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
// Create a system hash which we use for other servers to target us
|
||||
// specifically.
|
||||
sha := sha256.New()
|
||||
sha.Write([]byte(s.info.ID))
|
||||
s.sys.shash = fmt.Sprintf("%x", sha.Sum(nil))[:sysHashLen]
|
||||
|
||||
subject := fmt.Sprintf(accConnsEventSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
@@ -463,6 +518,11 @@ func (s *Server) initEventTracking() {
|
||||
if _, err := s.sysSubscribe(subject, s.leafNodeConnected); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// For tracking remote lateny measurements.
|
||||
subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash)
|
||||
if _, err := s.sysSubscribe(subject, s.remoteLatencyUpdate); err != nil {
|
||||
s.Errorf("Error setting up internal latency tracking: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// accountClaimUpdate will receive claim updates for accounts.
|
||||
@@ -786,9 +846,7 @@ func (s *Server) accountConnectEvent(c *client) {
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
s.mu.Lock()
|
||||
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
|
||||
s.mu.Unlock()
|
||||
s.sendInternalMsgLocked(subj, _EMPTY_, &m.Server, &m)
|
||||
}
|
||||
|
||||
// accountDisconnectEvent will send an account client disconnect event if there is interest.
|
||||
@@ -837,9 +895,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
|
||||
subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name)
|
||||
|
||||
s.mu.Lock()
|
||||
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
|
||||
s.mu.Unlock()
|
||||
s.sendInternalMsgLocked(subj, _EMPTY_, &m.Server, &m)
|
||||
}
|
||||
|
||||
func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
@@ -880,7 +936,6 @@ func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
subj := fmt.Sprintf(authErrorEventSubj, s.info.ID)
|
||||
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
|
||||
s.mu.Unlock()
|
||||
|
||||
}
|
||||
|
||||
// Internal message callback. If the msg is needed past the callback it is
|
||||
@@ -937,6 +992,76 @@ func (s *Server) sysUnsubscribe(sub *subscription) {
|
||||
c.unsubscribe(acc, sub, true, true)
|
||||
}
|
||||
|
||||
// This will generate the tracking subject for remote latency from the response subject.
|
||||
func remoteLatencySubjectForResponse(subject []byte) string {
|
||||
if !isTrackedReply(subject) {
|
||||
return ""
|
||||
}
|
||||
toks := bytes.Split(subject, []byte(tsep))
|
||||
// FIXME(dlc) - Sprintf may become a performance concern at some point.
|
||||
return fmt.Sprintf(remoteLatencyEventSubj, toks[len(toks)-2])
|
||||
}
|
||||
|
||||
// remoteLatencyUpdate is used to track remote latency measurements for tracking on exported services.
|
||||
func (s *Server) remoteLatencyUpdate(sub *subscription, subject, _ string, msg []byte) {
|
||||
if !s.eventsRunning() {
|
||||
return
|
||||
}
|
||||
rl := remoteLatency{}
|
||||
if err := json.Unmarshal(msg, &rl); err != nil {
|
||||
s.Errorf("Error unmarshalling remot elatency measurement: %v", err)
|
||||
return
|
||||
}
|
||||
// Now we need to look up the responseServiceImport associated with thsi measurement.
|
||||
acc, err := s.LookupAccount(rl.Account)
|
||||
if err != nil {
|
||||
s.Warnf("Could not lookup account %q for latency measurement", rl.Account)
|
||||
}
|
||||
// Now get the request id / reply. We need to see if we have a GW prefix and if so strip that off.
|
||||
reply := rl.ReqId
|
||||
if subjectStartsWithGatewayReplyPrefix([]byte(reply)) {
|
||||
reply = reply[gwReplyStart:]
|
||||
}
|
||||
acc.mu.RLock()
|
||||
si := acc.imports.services[reply]
|
||||
if si == nil {
|
||||
acc.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
m1 := si.m1
|
||||
m2 := rl.M2
|
||||
lsub := si.latency.subject
|
||||
acc.mu.RUnlock()
|
||||
|
||||
// So we have no processed the response tracking measurement yet.
|
||||
if m1 == nil {
|
||||
acc.mu.Lock()
|
||||
// Double check since could have slipped in.
|
||||
m1 = si.m1
|
||||
if m1 == nil {
|
||||
// Store our value there for them to pick up.
|
||||
si.m1 = &m2
|
||||
}
|
||||
acc.mu.Unlock()
|
||||
if m1 == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate the correct latency given M1 and M2.
|
||||
// M2 ServiceLatency is correct, so use that.
|
||||
// M1 TotalLatency is correct, so use that.
|
||||
// Will use those to back into NATS latency.
|
||||
m1.AppName = m2.AppName
|
||||
m1.ServiceLatency = m2.ServiceLatency
|
||||
m1.NATSLatency = m1.TotalLatency - m1.ServiceLatency
|
||||
|
||||
// Make sure we remove the entry here.
|
||||
si.acc.removeServiceImport(si.from)
|
||||
// Send the metrics
|
||||
s.sendInternalAccountMsg(acc, lsub, &m1)
|
||||
}
|
||||
|
||||
// Helper to grab name for a client.
|
||||
func nameForClient(c *client) string {
|
||||
if c.user != nil {
|
||||
@@ -966,10 +1091,11 @@ func clearTimer(tp **time.Timer) {
|
||||
func (s *Server) wrapChk(f func()) func() {
|
||||
return func() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.eventsEnabled() {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
f()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1174,7 +1174,7 @@ func TestSystemAccountWithGateways(t *testing.T) {
|
||||
nca.Flush()
|
||||
// If this tests fails with wrong number after 10 seconds we may have
|
||||
// added a new inititial subscription for the eventing system.
|
||||
checkExpectedSubs(t, 9, sa)
|
||||
checkExpectedSubs(t, 10, sa)
|
||||
|
||||
// Create a client on B and see if we receive the event
|
||||
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
|
||||
|
||||
@@ -697,9 +697,12 @@ func (s *Server) SystemAccount() *Account {
|
||||
return nil
|
||||
}
|
||||
|
||||
// For internal sends.
|
||||
const internalSendQLen = 1024
|
||||
|
||||
// Assign a system account. Should only be called once.
|
||||
// This sets up a server to send and receive messages from inside
|
||||
// the server itself.
|
||||
// This sets up a server to send and receive messages from
|
||||
// inside the server itself.
|
||||
func (s *Server) setSystemAccount(acc *Account) error {
|
||||
if acc == nil {
|
||||
return ErrMissingAccount
|
||||
@@ -728,7 +731,7 @@ func (s *Server) setSystemAccount(acc *Account) error {
|
||||
sid: 1,
|
||||
servers: make(map[string]*serverUpdate),
|
||||
subs: make(map[string]msgHandler),
|
||||
sendq: make(chan *pubMsg, 128),
|
||||
sendq: make(chan *pubMsg, internalSendQLen),
|
||||
statsz: eventsHBInterval,
|
||||
orphMax: 5 * eventsHBInterval,
|
||||
chkOrph: 3 * eventsHBInterval,
|
||||
@@ -900,12 +903,12 @@ func (s *Server) fetchRawAccountClaims(name string) (string, error) {
|
||||
fetchTime := time.Since(start)
|
||||
s.mu.Lock()
|
||||
if fetchTime > time.Second {
|
||||
s.Warnf("Account [%s] fetch took %v\n", name, fetchTime)
|
||||
s.Warnf("Account [%s] fetch took %v", name, fetchTime)
|
||||
} else {
|
||||
s.Debugf("Account [%s] fetch took %v\n", name, fetchTime)
|
||||
s.Debugf("Account [%s] fetch took %v", name, fetchTime)
|
||||
}
|
||||
if err != nil {
|
||||
s.Warnf("Account fetch failed: %v\n", err)
|
||||
s.Warnf("Account fetch failed: %v", err)
|
||||
return "", err
|
||||
}
|
||||
return claimJWT, nil
|
||||
|
||||
@@ -802,6 +802,11 @@ func subjectIsLiteral(subject string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// IsValidPublishSubject returns true if a subject is valid and a literal, false otherwise
|
||||
func IsValidPublishSubject(subject string) bool {
|
||||
return IsValidSubject(subject) && subjectIsLiteral(subject)
|
||||
}
|
||||
|
||||
// IsValidSubject returns true if a subject is valid, false otherwise
|
||||
func IsValidSubject(subject string) bool {
|
||||
if subject == "" {
|
||||
@@ -1058,3 +1063,17 @@ func (s *Sublist) collectAllSubs(l *level, subs *[]*subscription) {
|
||||
s.collectAllSubs(l.fwc.next, subs)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to get the first result sub.
|
||||
func firstSubFromResult(rr *SublistResult) *subscription {
|
||||
if rr == nil {
|
||||
return nil
|
||||
}
|
||||
if len(rr.psubs) > 0 {
|
||||
return rr.psubs[0]
|
||||
}
|
||||
if len(rr.qsubs) > 0 {
|
||||
return rr.qsubs[0][0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -579,14 +579,14 @@ func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServe
|
||||
return []*server.Account{server.NewAccount("$SYS")}, nil
|
||||
}
|
||||
|
||||
sys := server.NewAccount("$SYS")
|
||||
ngs := server.NewAccount("NGS")
|
||||
dlc := server.NewAccount("DLC")
|
||||
accounts := []*server.Account{
|
||||
server.NewAccount("$SYS"),
|
||||
server.NewAccount("FOO"),
|
||||
server.NewAccount("BAR"),
|
||||
ngs, dlc,
|
||||
}
|
||||
foo := server.NewAccount("FOO")
|
||||
bar := server.NewAccount("BAR")
|
||||
|
||||
accounts := []*server.Account{sys, foo, bar, ngs, dlc}
|
||||
|
||||
ngs.AddServiceExport("ngs.usage.*", nil)
|
||||
dlc.AddServiceImport(ngs, "ngs.usage", "ngs.usage.dlc")
|
||||
|
||||
@@ -594,6 +594,8 @@ func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServe
|
||||
users := []*server.User{
|
||||
&server.User{Username: "dlc", Password: "pass", Permissions: nil, Account: dlc},
|
||||
&server.User{Username: "ngs", Password: "pass", Permissions: nil, Account: ngs},
|
||||
&server.User{Username: "foo", Password: "pass", Permissions: nil, Account: foo},
|
||||
&server.User{Username: "bar", Password: "pass", Permissions: nil, Account: bar},
|
||||
}
|
||||
return accounts, users
|
||||
}
|
||||
@@ -614,6 +616,10 @@ func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServe
|
||||
gws = append(gws, &server.RemoteGatewayOpts{Name: c.name, URLs: []*url.URL{gwurl}})
|
||||
}
|
||||
|
||||
// Make the GWs form faster for the tests.
|
||||
server.SetGatewaysSolicitDelay(5 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
// Create seed first.
|
||||
o := testDefaultClusterOptionsForLeafNodes()
|
||||
o.Gateway.Name = clusterName
|
||||
|
||||
441
test/service_latency_test.go
Normal file
441
test/service_latency_test.go
Normal file
@@ -0,0 +1,441 @@
|
||||
// Copyright 2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// Used to setup superclusters for tests.
|
||||
type supercluster struct {
|
||||
clusters []*cluster
|
||||
}
|
||||
|
||||
func (sc *supercluster) shutdown() {
|
||||
for _, c := range sc.clusters {
|
||||
shutdownCluster(c)
|
||||
}
|
||||
}
|
||||
|
||||
const digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
const base = 36
|
||||
const cnlen = 8
|
||||
|
||||
func randClusterName() string {
|
||||
var name []byte
|
||||
rn := rand.Int63()
|
||||
for i := 0; i < cnlen; i++ {
|
||||
name = append(name, digits[rn%base])
|
||||
rn /= base
|
||||
}
|
||||
return string(name[:cnlen])
|
||||
}
|
||||
|
||||
func createSuperCluster(t *testing.T, numServersPer, numClusters int) *supercluster {
|
||||
clusters := []*cluster{}
|
||||
|
||||
for i := 0; i < numClusters; i++ {
|
||||
// Pick cluster name and setup default accounts.
|
||||
c := createClusterEx(t, true, randClusterName(), numServersPer, clusters...)
|
||||
clusters = append(clusters, c)
|
||||
}
|
||||
return &supercluster{clusters}
|
||||
}
|
||||
|
||||
func (sc *supercluster) setupLatencyTracking(t *testing.T, p int) {
|
||||
t.Helper()
|
||||
for _, c := range sc.clusters {
|
||||
for _, s := range c.servers {
|
||||
foo, err := s.LookupAccount("FOO")
|
||||
if err != nil {
|
||||
t.Fatalf("Error looking up account 'FOO': %v", err)
|
||||
}
|
||||
bar, err := s.LookupAccount("BAR")
|
||||
if err != nil {
|
||||
t.Fatalf("Error looking up account 'BAR': %v", err)
|
||||
}
|
||||
if err := foo.AddServiceExport("ngs.usage.*", nil); err != nil {
|
||||
t.Fatalf("Error adding service export to 'FOO': %v", err)
|
||||
}
|
||||
if err := foo.TrackServiceExportWithSampling("ngs.usage.*", "results", p); err != nil {
|
||||
t.Fatalf("Error adding latency tracking to 'FOO': %v", err)
|
||||
}
|
||||
if err := bar.AddServiceImport(foo, "ngs.usage", "ngs.usage.bar"); err != nil {
|
||||
t.Fatalf("Error adding latency tracking to 'FOO': %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func clientConnectWithName(t *testing.T, opts *server.Options, user, appname string) *nats.Conn {
|
||||
t.Helper()
|
||||
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(url, nats.Name(appname))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
return nc
|
||||
}
|
||||
|
||||
func clientConnect(t *testing.T, opts *server.Options, user string) *nats.Conn {
|
||||
t.Helper()
|
||||
return clientConnectWithName(t, opts, user, "")
|
||||
}
|
||||
|
||||
func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time, serviceTime time.Duration) {
|
||||
t.Helper()
|
||||
|
||||
startDelta := sl.RequestStart.Sub(start)
|
||||
if startDelta > 5*time.Millisecond {
|
||||
t.Fatalf("Bad start delta %v", startDelta)
|
||||
}
|
||||
if sl.ServiceLatency < serviceTime {
|
||||
t.Fatalf("Bad service latency: %v", sl.ServiceLatency)
|
||||
}
|
||||
if sl.TotalLatency < sl.ServiceLatency {
|
||||
t.Fatalf("Bad total latency: %v", sl.ServiceLatency)
|
||||
}
|
||||
// We should have NATS latency here that is non-zero with real clients.
|
||||
if sl.NATSLatency == 0 {
|
||||
t.Fatalf("Expected non-zero NATS latency")
|
||||
}
|
||||
// Make sure they add up
|
||||
if sl.TotalLatency != sl.ServiceLatency+sl.NATSLatency {
|
||||
t.Fatalf("Numbers do not add up: %+v", sl)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestServiceLatencySingleServerConnect(t *testing.T) {
|
||||
sc := createSuperCluster(t, 3, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
||||
sc.setupLatencyTracking(t, 100)
|
||||
|
||||
// Now we can setup and test, do single node only first.
|
||||
// This is the service provider.
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
// The service listener.
|
||||
serviceTime := 25 * time.Millisecond
|
||||
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
||||
time.Sleep(serviceTime)
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
|
||||
// Listen for metrics
|
||||
rsub, _ := nc.SubscribeSync("results")
|
||||
|
||||
// Requestor
|
||||
nc2 := clientConnect(t, sc.clusters[0].opts[0], "bar")
|
||||
defer nc.Close()
|
||||
|
||||
// Send the request.
|
||||
start := time.Now()
|
||||
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a response")
|
||||
}
|
||||
|
||||
var sl server.ServiceLatency
|
||||
rmsg, _ := rsub.NextMsg(time.Second)
|
||||
json.Unmarshal(rmsg.Data, &sl)
|
||||
|
||||
checkServiceLatency(t, sl, start, serviceTime)
|
||||
}
|
||||
|
||||
func connRTT(nc *nats.Conn) time.Duration {
|
||||
// Do 5x to flatten
|
||||
total := time.Duration(0)
|
||||
for i := 0; i < 5; i++ {
|
||||
start := time.Now()
|
||||
nc.Flush()
|
||||
total += time.Since(start)
|
||||
}
|
||||
return total / 5
|
||||
}
|
||||
|
||||
func TestServiceLatencyRemoteConnect(t *testing.T) {
|
||||
sc := createSuperCluster(t, 3, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
||||
sc.setupLatencyTracking(t, 100)
|
||||
|
||||
// Now we can setup and test, do single node only first.
|
||||
// This is the service provider.
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
// The service listener.
|
||||
serviceTime := 25 * time.Millisecond
|
||||
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
||||
time.Sleep(serviceTime)
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
|
||||
// Listen for metrics
|
||||
rsub, _ := nc.SubscribeSync("results")
|
||||
|
||||
// Same Cluster Requestor
|
||||
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
||||
defer nc.Close()
|
||||
|
||||
// Send the request.
|
||||
start := time.Now()
|
||||
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a response")
|
||||
}
|
||||
|
||||
var sl server.ServiceLatency
|
||||
rmsg, err := rsub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting latency measurement: %v", err)
|
||||
}
|
||||
json.Unmarshal(rmsg.Data, &sl)
|
||||
checkServiceLatency(t, sl, start, serviceTime)
|
||||
|
||||
// Lastly here, we need to make sure we are properly tracking the extra hops.
|
||||
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
|
||||
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency < crtt {
|
||||
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency, crtt)
|
||||
}
|
||||
|
||||
// Gateway Requestor
|
||||
nc2 = clientConnect(t, sc.clusters[1].opts[1], "bar")
|
||||
defer nc.Close()
|
||||
|
||||
// Send the request.
|
||||
start = time.Now()
|
||||
_, err = nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a response")
|
||||
}
|
||||
|
||||
rmsg, err = rsub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting latency measurement: %v", err)
|
||||
}
|
||||
json.Unmarshal(rmsg.Data, &sl)
|
||||
checkServiceLatency(t, sl, start, serviceTime)
|
||||
|
||||
// Lastly here, we need to make sure we are properly tracking the extra hops.
|
||||
// We will make sure that NATS latency is close to what we see from the outside in terms of RTT.
|
||||
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency < crtt {
|
||||
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency, crtt)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceLatencySampling(t *testing.T) {
|
||||
sc := createSuperCluster(t, 3, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
||||
sc.setupLatencyTracking(t, 50)
|
||||
|
||||
// Now we can setup and test, do single node only first.
|
||||
// This is the service provider.
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
// The service listener.
|
||||
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
|
||||
// Listen for metrics
|
||||
received := int32(0)
|
||||
|
||||
nc.Subscribe("results", func(msg *nats.Msg) {
|
||||
atomic.AddInt32(&received, 1)
|
||||
})
|
||||
|
||||
// Same Cluster Requestor
|
||||
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
|
||||
defer nc.Close()
|
||||
|
||||
toSend := 1000
|
||||
for i := 0; i < toSend; i++ {
|
||||
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
||||
}
|
||||
// Wait for results to flow in.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
mid := toSend / 2
|
||||
delta := toSend / 10 // 10%
|
||||
got := int(atomic.LoadInt32(&received))
|
||||
|
||||
if got > mid+delta || got < mid-delta {
|
||||
t.Fatalf("Sampling number incorrect: %d vs %d", mid, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceLatencyWithName(t *testing.T) {
|
||||
sc := createSuperCluster(t, 1, 1)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
||||
sc.setupLatencyTracking(t, 100)
|
||||
|
||||
opts := sc.clusters[0].opts[0]
|
||||
|
||||
nc := clientConnectWithName(t, opts, "foo", "dlc22")
|
||||
defer nc.Close()
|
||||
|
||||
// The service listener.
|
||||
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
|
||||
// Listen for metrics
|
||||
rsub, _ := nc.SubscribeSync("results")
|
||||
|
||||
nc2 := clientConnect(t, opts, "bar")
|
||||
defer nc.Close()
|
||||
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
||||
|
||||
var sl server.ServiceLatency
|
||||
rmsg, _ := rsub.NextMsg(time.Second)
|
||||
json.Unmarshal(rmsg.Data, &sl)
|
||||
|
||||
// Make sure we have AppName set.
|
||||
if sl.AppName != "dlc22" {
|
||||
t.Fatalf("Expected to have AppName set correctly, %q vs %q", "dlc22", sl.AppName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceLatencyWithNameMultiServer(t *testing.T) {
|
||||
sc := createSuperCluster(t, 3, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
||||
sc.setupLatencyTracking(t, 100)
|
||||
|
||||
nc := clientConnectWithName(t, sc.clusters[0].opts[1], "foo", "dlc22")
|
||||
defer nc.Close()
|
||||
|
||||
// The service listener.
|
||||
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
|
||||
// Listen for metrics
|
||||
rsub, _ := nc.SubscribeSync("results")
|
||||
|
||||
nc2 := clientConnect(t, sc.clusters[1].opts[1], "bar")
|
||||
defer nc.Close()
|
||||
nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
||||
|
||||
var sl server.ServiceLatency
|
||||
rmsg, _ := rsub.NextMsg(time.Second)
|
||||
json.Unmarshal(rmsg.Data, &sl)
|
||||
|
||||
// Make sure we have AppName set.
|
||||
if sl.AppName != "dlc22" {
|
||||
t.Fatalf("Expected to have AppName set correctly, %q vs %q", "dlc22", sl.AppName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) {
|
||||
numServers := 3
|
||||
numClusters := 3
|
||||
sc := createSuperCluster(t, numServers, numClusters)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
||||
sc.setupLatencyTracking(t, 100)
|
||||
|
||||
selectServer := func() *server.Options {
|
||||
si, ci := rand.Int63n(int64(numServers)), rand.Int63n(int64(numServers))
|
||||
return sc.clusters[ci].opts[si]
|
||||
}
|
||||
|
||||
sname := func(i int) string {
|
||||
return fmt.Sprintf("SERVICE-%d", i+1)
|
||||
}
|
||||
|
||||
numResponders := 5
|
||||
|
||||
// Create 10 queue subscribers for the service. Randomly select the server.
|
||||
for i := 0; i < numResponders; i++ {
|
||||
nc := clientConnectWithName(t, selectServer(), "foo", sname(i))
|
||||
nc.QueueSubscribe("ngs.usage.*", "SERVICE", func(msg *nats.Msg) {
|
||||
time.Sleep(time.Duration(rand.Int63n(10)) * time.Millisecond)
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
doRequest := func() {
|
||||
nc := clientConnect(t, selectServer(), "bar")
|
||||
if _, err := nc.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
||||
t.Fatalf("Failed to get request response: %v", err)
|
||||
}
|
||||
nc.Close()
|
||||
}
|
||||
|
||||
// To collect the metrics
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
results := make(map[string]time.Duration)
|
||||
var rlock sync.Mutex
|
||||
ch := make(chan (bool))
|
||||
received := int32(0)
|
||||
toSend := int32(100)
|
||||
|
||||
// Capture the results.
|
||||
nc.Subscribe("results", func(msg *nats.Msg) {
|
||||
var sl server.ServiceLatency
|
||||
json.Unmarshal(msg.Data, &sl)
|
||||
rlock.Lock()
|
||||
results[sl.AppName] += sl.ServiceLatency
|
||||
rlock.Unlock()
|
||||
if r := atomic.AddInt32(&received, 1); r >= toSend {
|
||||
ch <- true
|
||||
}
|
||||
})
|
||||
nc.Flush()
|
||||
|
||||
// Send 100 requests from random locations.
|
||||
for i := 0; i < 100; i++ {
|
||||
doRequest()
|
||||
}
|
||||
|
||||
// Wait on all results.
|
||||
<-ch
|
||||
|
||||
rlock.Lock()
|
||||
defer rlock.Unlock()
|
||||
|
||||
// Make sure each total is generally over 10ms
|
||||
thresh := 10 * time.Millisecond
|
||||
for i := 0; i < numResponders; i++ {
|
||||
if rl := results[sname(i)]; rl < thresh {
|
||||
t.Fatalf("Total for %q is less then threshold: %v vs %v", sname(i), thresh, rl)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user