mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
After speaking with Ivan we are taking a better approach for initial RTT.
Ivan had the idea of using the CONNECT to establish a first estimate of RTT without additional PING/PONGs. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -707,14 +707,6 @@ type remoteLatency struct {
|
||||
M2 ServiceLatency `json:"m2"`
|
||||
}
|
||||
|
||||
// Used to hold for an RTT measurement from requestor.
|
||||
type pendingLatency struct {
|
||||
acc *Account // Exporting/Reporting account
|
||||
si *serviceImport
|
||||
sl *ServiceLatency
|
||||
resp *client
|
||||
}
|
||||
|
||||
// sendTrackingMessage will send out the appropriate tracking information for the
|
||||
// service request/response latency. This is called when the requestor's server has
|
||||
// received the response.
|
||||
@@ -735,7 +727,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c
|
||||
// We will estimate time when request left the requestor by time we received
|
||||
// and the client RTT for the requestor.
|
||||
reqStart := time.Unix(0, si.ts-int64(reqClientRTT))
|
||||
sl := ServiceLatency{
|
||||
sl := &ServiceLatency{
|
||||
AppName: appName,
|
||||
RequestStart: reqStart,
|
||||
ServiceLatency: serviceRTT - respClientRTT,
|
||||
@@ -750,23 +742,9 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c
|
||||
sl.NATSLatency.System = time.Since(ts)
|
||||
sl.TotalLatency += sl.NATSLatency.System
|
||||
}
|
||||
// If we do not have a requestor RTT at this point we will wait for it to show up.
|
||||
if reqClientRTT == 0 {
|
||||
requestor.mu.Lock()
|
||||
if requestor.flags.isSet(firstPongSent) {
|
||||
requestor.holdPendingLatency(&pendingLatency{a, si, &sl, responder})
|
||||
requestor.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
requestor.mu.Unlock()
|
||||
}
|
||||
|
||||
return a.flushTrackingLatency(&sl, si, responder)
|
||||
}
|
||||
|
||||
// We can attempt to flush out the latency metric. May pause if multiple measurements needed.
|
||||
func (a *Account) flushTrackingLatency(sl *ServiceLatency, si *serviceImport, responder *client) bool {
|
||||
sanitizeLatencyMetric(sl)
|
||||
|
||||
// If we are expecting a remote measurement, store our sl here.
|
||||
// We need to account for the race between this and us receiving the
|
||||
// remote measurement.
|
||||
|
||||
@@ -201,8 +201,6 @@ type client struct {
|
||||
rrTracking map[string]*remoteLatency
|
||||
rrMax int
|
||||
|
||||
pslms []*pendingLatency
|
||||
|
||||
route *route
|
||||
gw *gateway
|
||||
leaf *leaf
|
||||
@@ -829,7 +827,6 @@ func (c *client) readLoop() {
|
||||
}()
|
||||
|
||||
// Start read buffer.
|
||||
|
||||
b := make([]byte, c.in.rsz)
|
||||
|
||||
for {
|
||||
@@ -1239,7 +1236,10 @@ func (c *client) processConnect(arg []byte) error {
|
||||
return nil
|
||||
}
|
||||
c.last = time.Now()
|
||||
|
||||
// Estimate RTT to start.
|
||||
if c.kind == CLIENT {
|
||||
c.rtt = c.last.Sub(c.start)
|
||||
}
|
||||
kind := c.kind
|
||||
srv := c.srv
|
||||
|
||||
@@ -1662,10 +1662,6 @@ func (c *client) processPing() {
|
||||
func (c *client) processPong() {
|
||||
c.traceInOp("PONG", nil)
|
||||
c.mu.Lock()
|
||||
// If we have a zero rtt quickly check if we have any pending latency measurements.
|
||||
if c.rtt == 0 && len(c.pslms) > 0 {
|
||||
go c.flushPendingLatencies()
|
||||
}
|
||||
c.ping.out = 0
|
||||
c.rtt = time.Since(c.rttStart)
|
||||
srv := c.srv
|
||||
@@ -2402,49 +2398,6 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Will flush all of our pending latencies.
|
||||
// Should be called from a go routine (processPong), so no locks held.
|
||||
func (c *client) flushPendingLatencies() {
|
||||
var _pslms [32]*pendingLatency
|
||||
c.mu.Lock()
|
||||
reqClientRTT := c.rtt
|
||||
pslms := append(_pslms[:0], c.pslms...)
|
||||
c.pslms = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
for _, pl := range pslms {
|
||||
// Fixup the service latency with requestor rtt.
|
||||
// Hold si account lock which protects changes to the sl itself.
|
||||
sl, si, a := pl.sl, pl.si, pl.acc
|
||||
if sl.NATSLatency.Responder == 0 && pl.resp != nil && pl.resp.kind == CLIENT {
|
||||
sl.NATSLatency.Responder = pl.resp.getRTTValue()
|
||||
}
|
||||
si.acc.mu.Lock()
|
||||
reqStart := time.Unix(0, si.ts-int64(reqClientRTT))
|
||||
sl.RequestStart = reqStart
|
||||
sl.NATSLatency.Requestor = reqClientRTT
|
||||
sl.TotalLatency += reqClientRTT
|
||||
si.acc.mu.Unlock()
|
||||
|
||||
if a.flushTrackingLatency(sl, si, pl.resp) {
|
||||
// Make sure we remove the entry here.
|
||||
si.acc.removeServiceImport(si.from)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This will hold a pending latency waiting for requestor RTT.
|
||||
// Lock should be held.
|
||||
func (c *client) holdPendingLatency(pl *pendingLatency) {
|
||||
if len(c.pslms) == 0 {
|
||||
c.rrMax = c.acc.MaxAutoExpireResponseMaps()
|
||||
}
|
||||
c.pslms = append(c.pslms, pl)
|
||||
if len(c.pslms) > c.rrMax {
|
||||
go c.flushPendingLatencies()
|
||||
}
|
||||
}
|
||||
|
||||
// This will track a remote reply for an exported service that has requested
|
||||
// latency tracking.
|
||||
// Lock assumed to be held.
|
||||
@@ -3142,20 +3095,6 @@ func (c *client) processPingTimer() {
|
||||
c.setPingTimer()
|
||||
}
|
||||
|
||||
// Lock should be held
|
||||
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
|
||||
// This is because the clients by default are usually setting same interval
|
||||
// and we have alot of cross ping/pongs between clients and servers.
|
||||
// We will now suppress the server ping/pong if we have received a client ping.
|
||||
func (c *client) setFirstPingTimer(pingInterval time.Duration) {
|
||||
if c.srv == nil {
|
||||
return
|
||||
}
|
||||
addDelay := rand.Int63n(int64(pingInterval / 5))
|
||||
d := pingInterval + time.Duration(addDelay)
|
||||
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
|
||||
}
|
||||
|
||||
// Lock should be held
|
||||
func (c *client) setPingTimer() {
|
||||
if c.srv == nil {
|
||||
|
||||
@@ -328,8 +328,10 @@ func TestConnz(t *testing.T) {
|
||||
if ci.Idle == "" {
|
||||
t.Fatal("Expected Idle to be valid\n")
|
||||
}
|
||||
if ci.RTT != "" {
|
||||
t.Fatal("Expected RTT to NOT be set for new connection\n")
|
||||
// This is a change, we now expect them to be set for connections when the
|
||||
// client sends a connect.
|
||||
if ci.RTT == "" {
|
||||
t.Fatal("Expected RTT to be set for new connection\n")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2382,10 +2382,8 @@ func TestExpandPath(t *testing.T) {
|
||||
defer os.Setenv("HOME", origHome)
|
||||
|
||||
cases := []struct {
|
||||
path string
|
||||
home string
|
||||
testEnv string
|
||||
|
||||
path string
|
||||
home string
|
||||
wantPath string
|
||||
wantErr bool
|
||||
}{
|
||||
|
||||
@@ -47,6 +47,9 @@ const (
|
||||
|
||||
// Interval for the first PING for non client connections.
|
||||
firstPingInterval = time.Second
|
||||
|
||||
// This is for the first ping for client connections.
|
||||
firstClientPingInterval = 2 * time.Second
|
||||
)
|
||||
|
||||
// Make this a variable so that we can change during tests
|
||||
@@ -1699,7 +1702,7 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
// Do final client initialization
|
||||
|
||||
// Set the First Ping timer.
|
||||
c.setFirstPingTimer(opts.PingInterval)
|
||||
s.setFirstPingTimer(c)
|
||||
|
||||
// Spin up the read loop.
|
||||
s.startGoRoutine(func() { c.readLoop() })
|
||||
@@ -2524,11 +2527,19 @@ func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
|
||||
// Invoked for route, leaf and gateway connections. Set the very first
|
||||
// PING to a lower interval to capture the initial RTT.
|
||||
// After that the PING interval will be set to the user defined value.
|
||||
// Client lock should be held.
|
||||
func (s *Server) setFirstPingTimer(c *client) {
|
||||
opts := s.getOpts()
|
||||
d := opts.PingInterval
|
||||
if d > firstPingInterval {
|
||||
d = firstPingInterval
|
||||
if c.kind != CLIENT {
|
||||
if d > firstPingInterval {
|
||||
d = firstPingInterval
|
||||
}
|
||||
} else if d > firstClientPingInterval {
|
||||
d = firstClientPingInterval
|
||||
}
|
||||
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
|
||||
addDelay := rand.Int63n(int64(d / 5))
|
||||
d += time.Duration(addDelay)
|
||||
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
|
||||
}
|
||||
|
||||
@@ -128,7 +128,7 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time
|
||||
if startDelta > 5*time.Millisecond {
|
||||
t.Fatalf("Bad start delta %v", startDelta)
|
||||
}
|
||||
if sl.ServiceLatency < time.Duration(float64(serviceTime)*0.9) {
|
||||
if sl.ServiceLatency < time.Duration(float64(serviceTime)*0.8) {
|
||||
t.Fatalf("Bad service latency: %v (%v)", sl.ServiceLatency, serviceTime)
|
||||
}
|
||||
if sl.TotalLatency < sl.ServiceLatency {
|
||||
@@ -195,7 +195,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) {
|
||||
// the requestor RTT will be marked as 0. This can happen quite often with
|
||||
// utility programs that are far away from a cluster like NGS but the service
|
||||
// response time has a shorter RTT.
|
||||
func TestServiceLatencyClientRTTSlowerThanServiceRTT(t *testing.T) {
|
||||
func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) {
|
||||
sc := createSuperCluster(t, 2, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
@@ -207,12 +207,15 @@ func TestServiceLatencyClientRTTSlowerThanServiceRTT(t *testing.T) {
|
||||
|
||||
// The service listener. Instant response.
|
||||
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
|
||||
time.Sleep(time.Millisecond)
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
|
||||
// Listen for metrics
|
||||
rsub, _ := nc.SubscribeSync("results")
|
||||
|
||||
nc.Flush()
|
||||
|
||||
// Requestor and processing
|
||||
requestAndCheck := func(sopts *server.Options) {
|
||||
rtt := 10 * time.Millisecond
|
||||
|
||||
@@ -76,7 +76,7 @@ func newSlowProxy(latency time.Duration, opts *server.Options) (*slowProxy, *ser
|
||||
func (sp *slowProxy) loop(latency time.Duration, r, w net.Conn) {
|
||||
delay := latency / 2
|
||||
for {
|
||||
var buf [64]byte
|
||||
var buf [1024]byte
|
||||
n, err := r.Read(buf[:])
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user