diff --git a/server/accounts.go b/server/accounts.go index d353b372..bf9aa9bc 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -707,14 +707,6 @@ type remoteLatency struct { M2 ServiceLatency `json:"m2"` } -// Used to hold for an RTT measurement from requestor. -type pendingLatency struct { - acc *Account // Exporting/Reporting account - si *serviceImport - sl *ServiceLatency - resp *client -} - // sendTrackingMessage will send out the appropriate tracking information for the // service request/response latency. This is called when the requestor's server has // received the response. @@ -735,7 +727,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c // We will estimate time when request left the requestor by time we received // and the client RTT for the requestor. reqStart := time.Unix(0, si.ts-int64(reqClientRTT)) - sl := ServiceLatency{ + sl := &ServiceLatency{ AppName: appName, RequestStart: reqStart, ServiceLatency: serviceRTT - respClientRTT, @@ -750,23 +742,9 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c sl.NATSLatency.System = time.Since(ts) sl.TotalLatency += sl.NATSLatency.System } - // If we do not have a requestor RTT at this point we will wait for it to show up. - if reqClientRTT == 0 { - requestor.mu.Lock() - if requestor.flags.isSet(firstPongSent) { - requestor.holdPendingLatency(&pendingLatency{a, si, &sl, responder}) - requestor.mu.Unlock() - return false - } - requestor.mu.Unlock() - } - return a.flushTrackingLatency(&sl, si, responder) -} - -// We can attempt to flush out the latency metric. May pause if multiple measurements needed. -func (a *Account) flushTrackingLatency(sl *ServiceLatency, si *serviceImport, responder *client) bool { sanitizeLatencyMetric(sl) + // If we are expecting a remote measurement, store our sl here. // We need to account for the race between this and us receiving the // remote measurement. diff --git a/server/client.go b/server/client.go index 27c60d05..6a4fbb92 100644 --- a/server/client.go +++ b/server/client.go @@ -201,8 +201,6 @@ type client struct { rrTracking map[string]*remoteLatency rrMax int - pslms []*pendingLatency - route *route gw *gateway leaf *leaf @@ -829,7 +827,6 @@ func (c *client) readLoop() { }() // Start read buffer. - b := make([]byte, c.in.rsz) for { @@ -1239,7 +1236,10 @@ func (c *client) processConnect(arg []byte) error { return nil } c.last = time.Now() - + // Estimate RTT to start. + if c.kind == CLIENT { + c.rtt = c.last.Sub(c.start) + } kind := c.kind srv := c.srv @@ -1662,10 +1662,6 @@ func (c *client) processPing() { func (c *client) processPong() { c.traceInOp("PONG", nil) c.mu.Lock() - // If we have a zero rtt quickly check if we have any pending latency measurements. - if c.rtt == 0 && len(c.pslms) > 0 { - go c.flushPendingLatencies() - } c.ping.out = 0 c.rtt = time.Since(c.rttStart) srv := c.srv @@ -2402,49 +2398,6 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte) bool { return true } -// Will flush all of our pending latencies. -// Should be called from a go routine (processPong), so no locks held. -func (c *client) flushPendingLatencies() { - var _pslms [32]*pendingLatency - c.mu.Lock() - reqClientRTT := c.rtt - pslms := append(_pslms[:0], c.pslms...) - c.pslms = nil - c.mu.Unlock() - - for _, pl := range pslms { - // Fixup the service latency with requestor rtt. - // Hold si account lock which protects changes to the sl itself. - sl, si, a := pl.sl, pl.si, pl.acc - if sl.NATSLatency.Responder == 0 && pl.resp != nil && pl.resp.kind == CLIENT { - sl.NATSLatency.Responder = pl.resp.getRTTValue() - } - si.acc.mu.Lock() - reqStart := time.Unix(0, si.ts-int64(reqClientRTT)) - sl.RequestStart = reqStart - sl.NATSLatency.Requestor = reqClientRTT - sl.TotalLatency += reqClientRTT - si.acc.mu.Unlock() - - if a.flushTrackingLatency(sl, si, pl.resp) { - // Make sure we remove the entry here. - si.acc.removeServiceImport(si.from) - } - } -} - -// This will hold a pending latency waiting for requestor RTT. -// Lock should be held. -func (c *client) holdPendingLatency(pl *pendingLatency) { - if len(c.pslms) == 0 { - c.rrMax = c.acc.MaxAutoExpireResponseMaps() - } - c.pslms = append(c.pslms, pl) - if len(c.pslms) > c.rrMax { - go c.flushPendingLatencies() - } -} - // This will track a remote reply for an exported service that has requested // latency tracking. // Lock assumed to be held. @@ -3142,20 +3095,6 @@ func (c *client) processPingTimer() { c.setPingTimer() } -// Lock should be held -// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s. -// This is because the clients by default are usually setting same interval -// and we have alot of cross ping/pongs between clients and servers. -// We will now suppress the server ping/pong if we have received a client ping. -func (c *client) setFirstPingTimer(pingInterval time.Duration) { - if c.srv == nil { - return - } - addDelay := rand.Int63n(int64(pingInterval / 5)) - d := pingInterval + time.Duration(addDelay) - c.ping.tmr = time.AfterFunc(d, c.processPingTimer) -} - // Lock should be held func (c *client) setPingTimer() { if c.srv == nil { diff --git a/server/monitor_test.go b/server/monitor_test.go index a153095e..0c9bd154 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -328,8 +328,10 @@ func TestConnz(t *testing.T) { if ci.Idle == "" { t.Fatal("Expected Idle to be valid\n") } - if ci.RTT != "" { - t.Fatal("Expected RTT to NOT be set for new connection\n") + // This is a change, we now expect them to be set for connections when the + // client sends a connect. + if ci.RTT == "" { + t.Fatal("Expected RTT to be set for new connection\n") } } diff --git a/server/opts_test.go b/server/opts_test.go index cdf359e4..9ca98529 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -2382,10 +2382,8 @@ func TestExpandPath(t *testing.T) { defer os.Setenv("HOME", origHome) cases := []struct { - path string - home string - testEnv string - + path string + home string wantPath string wantErr bool }{ diff --git a/server/server.go b/server/server.go index c17d4da1..038e7424 100644 --- a/server/server.go +++ b/server/server.go @@ -47,6 +47,9 @@ const ( // Interval for the first PING for non client connections. firstPingInterval = time.Second + + // This is for the first ping for client connections. + firstClientPingInterval = 2 * time.Second ) // Make this a variable so that we can change during tests @@ -1699,7 +1702,7 @@ func (s *Server) createClient(conn net.Conn) *client { // Do final client initialization // Set the First Ping timer. - c.setFirstPingTimer(opts.PingInterval) + s.setFirstPingTimer(c) // Spin up the read loop. s.startGoRoutine(func() { c.readLoop() }) @@ -2524,11 +2527,19 @@ func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool { // Invoked for route, leaf and gateway connections. Set the very first // PING to a lower interval to capture the initial RTT. // After that the PING interval will be set to the user defined value. +// Client lock should be held. func (s *Server) setFirstPingTimer(c *client) { opts := s.getOpts() d := opts.PingInterval - if d > firstPingInterval { - d = firstPingInterval + if c.kind != CLIENT { + if d > firstPingInterval { + d = firstPingInterval + } + } else if d > firstClientPingInterval { + d = firstClientPingInterval } + // We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s. + addDelay := rand.Int63n(int64(d / 5)) + d += time.Duration(addDelay) c.ping.tmr = time.AfterFunc(d, c.processPingTimer) } diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 581c9dad..5658f587 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -128,7 +128,7 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time if startDelta > 5*time.Millisecond { t.Fatalf("Bad start delta %v", startDelta) } - if sl.ServiceLatency < time.Duration(float64(serviceTime)*0.9) { + if sl.ServiceLatency < time.Duration(float64(serviceTime)*0.8) { t.Fatalf("Bad service latency: %v (%v)", sl.ServiceLatency, serviceTime) } if sl.TotalLatency < sl.ServiceLatency { @@ -195,7 +195,7 @@ func TestServiceLatencySingleServerConnect(t *testing.T) { // the requestor RTT will be marked as 0. This can happen quite often with // utility programs that are far away from a cluster like NGS but the service // response time has a shorter RTT. -func TestServiceLatencyClientRTTSlowerThanServiceRTT(t *testing.T) { +func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) { sc := createSuperCluster(t, 2, 2) defer sc.shutdown() @@ -207,12 +207,15 @@ func TestServiceLatencyClientRTTSlowerThanServiceRTT(t *testing.T) { // The service listener. Instant response. nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) { + time.Sleep(time.Millisecond) msg.Respond([]byte("22 msgs")) }) // Listen for metrics rsub, _ := nc.SubscribeSync("results") + nc.Flush() + // Requestor and processing requestAndCheck := func(sopts *server.Options) { rtt := 10 * time.Millisecond diff --git a/test/test_test.go b/test/test_test.go index b51b3462..81c99b9a 100644 --- a/test/test_test.go +++ b/test/test_test.go @@ -76,7 +76,7 @@ func newSlowProxy(latency time.Duration, opts *server.Options) (*slowProxy, *ser func (sp *slowProxy) loop(latency time.Duration, r, w net.Conn) { delay := latency / 2 for { - var buf [64]byte + var buf [1024]byte n, err := r.Read(buf[:]) if err != nil { return