mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Wait on requestor RTT when tracking latency.
If a client RTT for a requestor is longer than a service RTT, the requestor latency was often zero. We now wait for the RTT (if zero) before sending out the metric. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -707,21 +707,25 @@ type remoteLatency struct {
|
||||
M2 ServiceLatency `json:"m2"`
|
||||
}
|
||||
|
||||
// Used to hold for an RTT measurement from requestor.
|
||||
type pendingLatency struct {
|
||||
acc *Account // Exporting/Reporting account
|
||||
si *serviceImport
|
||||
sl *ServiceLatency
|
||||
resp *client
|
||||
}
|
||||
|
||||
// sendTrackingMessage will send out the appropriate tracking information for the
|
||||
// service request/response latency. This is called when the requestor's server has
|
||||
// received the response.
|
||||
// TODO(dlc) - holding locks for RTTs may be too much long term. Should revisit.
|
||||
func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *client) bool {
|
||||
now := time.Now()
|
||||
serviceRTT := time.Duration(now.UnixNano() - si.ts)
|
||||
ts := time.Now()
|
||||
serviceRTT := time.Duration(ts.UnixNano() - si.ts)
|
||||
|
||||
var (
|
||||
reqClientRTT = requestor.getRTTValue()
|
||||
respClientRTT time.Duration
|
||||
appName string
|
||||
)
|
||||
|
||||
expectRemoteM2 := responder != nil && responder.kind != CLIENT
|
||||
var reqClientRTT = requestor.getRTTValue()
|
||||
var respClientRTT time.Duration
|
||||
var appName string
|
||||
|
||||
if responder != nil && responder.kind == CLIENT {
|
||||
respClientRTT = responder.getRTTValue()
|
||||
@@ -742,27 +746,46 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c
|
||||
},
|
||||
TotalLatency: reqClientRTT + serviceRTT,
|
||||
}
|
||||
sanitizeLatencyMetric(&sl)
|
||||
if respClientRTT > 0 {
|
||||
sl.NATSLatency.System = time.Since(ts)
|
||||
sl.TotalLatency += sl.NATSLatency.System
|
||||
}
|
||||
// If we do not have a requestor RTT at this point we will wait for it to show up.
|
||||
if reqClientRTT == 0 {
|
||||
requestor.mu.Lock()
|
||||
if requestor.flags.isSet(firstPongSent) {
|
||||
requestor.holdPendingLatency(&pendingLatency{a, si, &sl, responder})
|
||||
requestor.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
requestor.mu.Unlock()
|
||||
}
|
||||
|
||||
return a.flushTrackingLatency(&sl, si, responder)
|
||||
}
|
||||
|
||||
// We can attempt to flush out the latency metric. May pause if multiple measurements needed.
|
||||
func (a *Account) flushTrackingLatency(sl *ServiceLatency, si *serviceImport, responder *client) bool {
|
||||
sanitizeLatencyMetric(sl)
|
||||
// If we are expecting a remote measurement, store our sl here.
|
||||
// We need to account for the race between this and us receiving the
|
||||
// remote measurement.
|
||||
// FIXME(dlc) - We need to clean these up but this should happen
|
||||
// already with the auto-expire logic.
|
||||
if expectRemoteM2 {
|
||||
if responder != nil && responder.kind != CLIENT {
|
||||
si.acc.mu.Lock()
|
||||
if si.m1 != nil {
|
||||
m1, m2 := &sl, si.m1
|
||||
m1, m2 := sl, si.m1
|
||||
m1.merge(m2)
|
||||
si.acc.mu.Unlock()
|
||||
a.srv.sendInternalAccountMsg(a, si.latency.subject, m1)
|
||||
return true
|
||||
}
|
||||
si.m1 = &sl
|
||||
si.m1 = sl
|
||||
si.acc.mu.Unlock()
|
||||
return false
|
||||
} else {
|
||||
a.srv.sendInternalAccountMsg(a, si.latency.subject, &sl)
|
||||
a.srv.sendInternalAccountMsg(a, si.latency.subject, sl)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -1396,7 +1396,7 @@ func TestAccountRequestReplyTrackLatency(t *testing.T) {
|
||||
t.Fatalf("Error adding account service import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Now setup the resonder under cfoo and the listener for the results
|
||||
// Now setup the responder under cfoo and the listener for the results
|
||||
cfoo.parse([]byte("SUB track.service 1\r\nSUB results 2\r\n"))
|
||||
|
||||
readFooMsg := func() ([]byte, string) {
|
||||
|
||||
@@ -201,6 +201,8 @@ type client struct {
|
||||
rrTracking map[string]*remoteLatency
|
||||
rrMax int
|
||||
|
||||
pslms []*pendingLatency
|
||||
|
||||
route *route
|
||||
gw *gateway
|
||||
leaf *leaf
|
||||
@@ -1660,6 +1662,10 @@ func (c *client) processPing() {
|
||||
func (c *client) processPong() {
|
||||
c.traceInOp("PONG", nil)
|
||||
c.mu.Lock()
|
||||
// If we have a zero rtt quickly check if we have any pending latency measurements.
|
||||
if c.rtt == 0 && len(c.pslms) > 0 {
|
||||
go c.flushPendingLatencies()
|
||||
}
|
||||
c.ping.out = 0
|
||||
c.rtt = time.Since(c.rttStart)
|
||||
srv := c.srv
|
||||
@@ -2396,6 +2402,49 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Will flush all of our pending latencies.
|
||||
// Should be called from a go routine (processPong), so no locks held.
|
||||
func (c *client) flushPendingLatencies() {
|
||||
var _pslms [32]*pendingLatency
|
||||
c.mu.Lock()
|
||||
reqClientRTT := c.rtt
|
||||
pslms := append(_pslms[:0], c.pslms...)
|
||||
c.pslms = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
for _, pl := range pslms {
|
||||
// Fixup the service latency with requestor rtt.
|
||||
// Hold si account lock which protects changes to the sl itself.
|
||||
sl, si, a := pl.sl, pl.si, pl.acc
|
||||
if sl.NATSLatency.Responder == 0 && pl.resp != nil && pl.resp.kind == CLIENT {
|
||||
sl.NATSLatency.Responder = pl.resp.getRTTValue()
|
||||
}
|
||||
si.acc.mu.Lock()
|
||||
reqStart := time.Unix(0, si.ts-int64(reqClientRTT))
|
||||
sl.RequestStart = reqStart
|
||||
sl.NATSLatency.Requestor = reqClientRTT
|
||||
sl.TotalLatency += reqClientRTT
|
||||
si.acc.mu.Unlock()
|
||||
|
||||
if a.flushTrackingLatency(sl, si, pl.resp) {
|
||||
// Make sure we remove the entry here.
|
||||
si.acc.removeServiceImport(si.from)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This will hold a pending latency waiting for requestor RTT.
|
||||
// Lock should be held.
|
||||
func (c *client) holdPendingLatency(pl *pendingLatency) {
|
||||
if len(c.pslms) == 0 {
|
||||
c.rrMax = c.acc.MaxAutoExpireResponseMaps()
|
||||
}
|
||||
c.pslms = append(c.pslms, pl)
|
||||
if len(c.pslms) > c.rrMax {
|
||||
go c.flushPendingLatencies()
|
||||
}
|
||||
}
|
||||
|
||||
// This will track a remote reply for an exported service that has requested
|
||||
// latency tracking.
|
||||
// Lock assumed to be held.
|
||||
@@ -2740,7 +2789,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
if si.rt != Singleton {
|
||||
acc.addRespMapEntry(si.acc, string(c.pa.reply), string(nrr))
|
||||
} else if si.latency != nil && c.rtt == 0 {
|
||||
// We have a service import that we are tracking but have not established RTT
|
||||
// We have a service import that we are tracking but have not established RTT.
|
||||
c.sendRTTPing()
|
||||
}
|
||||
// If this is a client or leaf connection and we are in gateway mode,
|
||||
@@ -2787,7 +2836,6 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
if shouldRemove {
|
||||
acc.removeServiceImport(si.from)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -252,8 +252,8 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
|
||||
if pm.msg != nil {
|
||||
b, _ = json.MarshalIndent(pm.msg, _EMPTY_, " ")
|
||||
}
|
||||
// We can have an override for account here.
|
||||
c.mu.Lock()
|
||||
// We can have an override for account here.
|
||||
if pm.acc != nil {
|
||||
c.acc = pm.acc
|
||||
} else {
|
||||
@@ -1063,14 +1063,14 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st
|
||||
|
||||
// So we have not processed the response tracking measurement yet.
|
||||
if m1 == nil {
|
||||
acc.mu.Lock()
|
||||
si.acc.mu.Lock()
|
||||
// Double check since could have slipped in.
|
||||
m1 = si.m1
|
||||
if m1 == nil {
|
||||
// Store our value there for them to pick up.
|
||||
si.m1 = &m2
|
||||
}
|
||||
acc.mu.Unlock()
|
||||
si.acc.mu.Unlock()
|
||||
if m1 == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -191,6 +191,73 @@ func TestServiceLatencySingleServerConnect(t *testing.T) {
|
||||
checkServiceLatency(t, sl, start, serviceTime)
|
||||
}
|
||||
|
||||
// If a client has a longer RTT that the effective RTT for NATS + responder
|
||||
// the requestor RTT will be marked as 0. This can happen quite often with
|
||||
// utility programs that are far away from a cluster like NGS but the service
|
||||
// response time has a shorter RTT.
|
||||
func TestServiceLatencyClientRTTSlowerThanServiceRTT(t *testing.T) {
|
||||
sc := createSuperCluster(t, 2, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now add in new service export to FOO and have bar import that with tracking enabled.
|
||||
sc.setupLatencyTracking(t, 100)
|
||||
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
// The service listener. Instant response.
|
||||
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
|
||||
// Listen for metrics
|
||||
rsub, _ := nc.SubscribeSync("results")
|
||||
|
||||
// Requestor and processing
|
||||
requestAndCheck := func(sopts *server.Options) {
|
||||
rtt := 10 * time.Millisecond
|
||||
sp, opts := newSlowProxy(rtt, sopts)
|
||||
defer sp.Stop()
|
||||
|
||||
nc2 := clientConnect(t, opts, "bar")
|
||||
defer nc2.Close()
|
||||
|
||||
start := time.Now()
|
||||
nc2.Flush()
|
||||
if d := time.Since(start); d < rtt {
|
||||
t.Fatalf("Expected an rtt of at least %v, got %v", rtt, d)
|
||||
}
|
||||
|
||||
// Send the request.
|
||||
start = time.Now()
|
||||
_, err := nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a response")
|
||||
}
|
||||
|
||||
var sl server.ServiceLatency
|
||||
rmsg, _ := rsub.NextMsg(time.Second)
|
||||
json.Unmarshal(rmsg.Data, &sl)
|
||||
|
||||
// We want to test here that when the client requestor RTT is larger then the response time
|
||||
// we still deliver a requestor value > 0.
|
||||
// Now check that it is close to rtt.
|
||||
if sl.NATSLatency.Requestor < rtt {
|
||||
t.Fatalf("Expected requestor latency to be > %v, got %v", rtt, sl.NATSLatency.Requestor)
|
||||
}
|
||||
if sl.TotalLatency < rtt {
|
||||
t.Fatalf("Expected total latency to be > %v, got %v", rtt, sl.TotalLatency)
|
||||
}
|
||||
}
|
||||
|
||||
// Check same server.
|
||||
requestAndCheck(sc.clusters[0].opts[0])
|
||||
// Check from remote server across GW.
|
||||
requestAndCheck(sc.clusters[1].opts[1])
|
||||
// Same cluster but different server
|
||||
requestAndCheck(sc.clusters[0].opts[1])
|
||||
}
|
||||
|
||||
func connRTT(nc *nats.Conn) time.Duration {
|
||||
// Do 5x to flatten
|
||||
total := time.Duration(0)
|
||||
|
||||
@@ -15,10 +15,14 @@ package test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
)
|
||||
|
||||
func checkFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
|
||||
@@ -37,6 +41,64 @@ func checkFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Slow Proxy - really crude but works for introducing simple RTT delays.
|
||||
type slowProxy struct {
|
||||
listener net.Listener
|
||||
conns []net.Conn
|
||||
}
|
||||
|
||||
func newSlowProxy(latency time.Duration, opts *server.Options) (*slowProxy, *server.Options) {
|
||||
saddr := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
|
||||
hp := net.JoinHostPort("127.0.0.1", "0")
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e))
|
||||
}
|
||||
port := l.Addr().(*net.TCPAddr).Port
|
||||
sp := &slowProxy{listener: l}
|
||||
go func() {
|
||||
client, err := l.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
server, err := net.DialTimeout("tcp", saddr, time.Second)
|
||||
if err != nil {
|
||||
panic("Can't connect to server")
|
||||
}
|
||||
sp.conns = append(sp.conns, client, server)
|
||||
go sp.loop(latency, client, server)
|
||||
go sp.loop(latency, server, client)
|
||||
}()
|
||||
sopts := &server.Options{Host: "127.0.0.1", Port: port}
|
||||
return sp, sopts
|
||||
}
|
||||
|
||||
func (sp *slowProxy) loop(latency time.Duration, r, w net.Conn) {
|
||||
delay := latency / 2
|
||||
for {
|
||||
var buf [64]byte
|
||||
n, err := r.Read(buf[:])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
time.Sleep(delay)
|
||||
if _, err = w.Write(buf[:n]); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sp *slowProxy) Stop() {
|
||||
if sp.listener != nil {
|
||||
sp.listener.Close()
|
||||
sp.listener = nil
|
||||
for _, c := range sp.conns {
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dummy Logger
|
||||
type dummyLogger struct {
|
||||
sync.Mutex
|
||||
msg string
|
||||
|
||||
Reference in New Issue
Block a user