diff --git a/server/client.go b/server/client.go index d2e794c9..0041c4f2 100644 --- a/server/client.go +++ b/server/client.go @@ -116,12 +116,14 @@ type client struct { in readCache pcd map[*client]struct{} atmr *time.Timer - ptmr *time.Timer - pout int + ping pinfo msgb [msgScratchSize]byte last time.Time parseState + rtt time.Duration + rttStart time.Time + route *route debug bool trace bool @@ -129,6 +131,12 @@ type client struct { flags clientFlag // Compact booleans into a single field. Size will be increased when needed. } +// Struct for PING initiation from the server. +type pinfo struct { + tmr *time.Timer + out int +} + // outbound holds pending data for a socket. type outbound struct { p []byte // Primary write buffer @@ -818,7 +826,8 @@ func (c *client) sendPong() { // Assume the lock is held upon entry. func (c *client) sendPing() { - c.pout++ + c.rttStart = time.Now() + c.ping.out++ c.traceOutOp("PING", nil) c.sendProto([]byte("PING\r\n"), true) } @@ -898,7 +907,8 @@ func (c *client) processPing() { func (c *client) processPong() { c.traceInOp("PONG", nil) c.mu.Lock() - c.pout = 0 + c.ping.out = 0 + c.rtt = time.Since(c.rttStart) c.mu.Unlock() } @@ -1483,7 +1493,7 @@ func (c *client) pubPermissionViolation(subject []byte) { func (c *client) processPingTimer() { c.mu.Lock() defer c.mu.Unlock() - c.ptmr = nil + c.ping.tmr = nil // Check if connection is still opened if c.nc == nil { return @@ -1492,15 +1502,21 @@ func (c *client) processPingTimer() { c.Debugf("%s Ping Timer", c.typeString()) // Check for violation - if c.pout+1 > c.srv.getOpts().MaxPingsOut { + if c.ping.out+1 > c.srv.getOpts().MaxPingsOut { c.Debugf("Stale Client Connection - Closing") c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true) c.clearConnection() return } - // Send PING - c.sendPing() + // If we have had activity within the PingInterval no + // need to send a ping. + if delta := time.Since(c.last); delta < c.srv.getOpts().PingInterval { + c.Debugf("Delaying PING due to activity %v ago", delta.Round(time.Second)) + } else { + // Send PING + c.sendPing() + } // Reset to fire again. c.setPingTimer() @@ -1512,16 +1528,16 @@ func (c *client) setPingTimer() { return } d := c.srv.getOpts().PingInterval - c.ptmr = time.AfterFunc(d, c.processPingTimer) + c.ping.tmr = time.AfterFunc(d, c.processPingTimer) } // Lock should be held func (c *client) clearPingTimer() { - if c.ptmr == nil { + if c.ping.tmr == nil { return } - c.ptmr.Stop() - c.ptmr = nil + c.ping.tmr.Stop() + c.ping.tmr = nil } // Lock should be held diff --git a/server/monitor.go b/server/monitor.go index 4d096151..6c4a659f 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -73,6 +73,7 @@ type ConnInfo struct { Port int `json:"port"` Start time.Time `json:"start"` LastActivity time.Time `json:"last_activity"` + RTT string `json:"rtt,omitempty"` Uptime string `json:"uptime"` Idle string `json:"idle"` Pending int `json:"pending_bytes"` @@ -218,6 +219,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { ci.LastActivity = client.last ci.Uptime = myUptime(c.Now.Sub(client.start)) ci.Idle = myUptime(c.Now.Sub(client.last)) + ci.RTT = client.getRTT() ci.OutMsgs = client.outMsgs ci.OutBytes = client.outBytes ci.NumSubs = uint32(len(client.subs)) @@ -289,6 +291,25 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { return c, nil } +// Assume lock is held +func (c *client) getRTT() string { + if c.rtt == 0 { + // If a real client, go ahead and send ping now to get a value + // for RTT. For tests and telnet, etc skip. + if c.flags.isSet(connectReceived) && c.opts.Lang != "" { + c.sendPing() + } + return "" + } + var rtt time.Duration + if c.rtt > time.Microsecond && c.rtt < time.Millisecond { + rtt = c.rtt.Truncate(time.Microsecond) + } else { + rtt = c.rtt.Truncate(time.Millisecond) + } + return fmt.Sprintf("%v", rtt) +} + func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error) { str := r.URL.Query().Get(param) if str == "" { diff --git a/server/monitor_test.go b/server/monitor_test.go index 9757d8b7..3abb5884 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -309,19 +309,22 @@ func TestConnz(t *testing.T) { t.Fatalf("Expected OutBytes of 1, got %v\n", ci.OutBytes) } if ci.Start.IsZero() { - t.Fatalf("Expected Start to be valid\n") + t.Fatal("Expected Start to be valid\n") } if ci.Uptime == "" { - t.Fatalf("Expected Uptime to be valid\n") + t.Fatal("Expected Uptime to be valid\n") } if ci.LastActivity.IsZero() { - t.Fatalf("Expected LastActivity to be valid\n") + t.Fatal("Expected LastActivity to be valid\n") } if ci.LastActivity.UnixNano() < ci.Start.UnixNano() { t.Fatalf("Expected LastActivity [%v] to be > Start [%v]\n", ci.LastActivity, ci.Start) } if ci.Idle == "" { - t.Fatalf("Expected Idle to be valid\n") + t.Fatal("Expected Idle to be valid\n") + } + if ci.RTT != "" { + t.Fatal("Expected RTT to NOT be set for new connection\n") } } @@ -387,6 +390,60 @@ func ensureServerActivityRecorded(t *testing.T, nc *nats.Conn) { } } +func TestConnzRTT(t *testing.T) { + s := runMonitorServer() + defer s.Shutdown() + + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().Port) + + testRTT := func(mode int) { + // Test with connections. + nc := createClientConnSubscribeAndPublish(t, s) + defer nc.Close() + + c := pollConz(t, s, mode, url+"connz", nil) + + if c.NumConns != 1 { + t.Fatalf("Expected 1 connection, got %d\n", c.NumConns) + } + + // Send a server side PING to record RTT + s.mu.Lock() + ci := c.Conns[0] + sc := s.clients[ci.Cid] + if sc == nil { + t.Fatalf("Error looking up client %v\n", ci.Cid) + } + s.mu.Unlock() + sc.mu.Lock() + sc.sendPing() + sc.mu.Unlock() + + // Wait for client to respond with PONG + time.Sleep(20 * time.Millisecond) + + // Repoll for updated information. + c = pollConz(t, s, mode, url+"connz", nil) + ci = c.Conns[0] + + rtt, err := time.ParseDuration(ci.RTT) + if err != nil { + t.Fatalf("Could not parse RTT properly, %v", err) + } + if rtt <= 0 { + t.Fatal("Expected RTT to be valid and non-zero\n") + } + if rtt > 5*time.Millisecond || rtt < 100*time.Nanosecond { + t.Fatalf("Invalid RTT of %s\n", ci.RTT) + } + } + + for mode := 0; mode < 2; mode++ { + testRTT(mode) + waitForClientConnCount(t, s, 0) + } +} + func TestConnzLastActivity(t *testing.T) { s := runMonitorServer() defer s.Shutdown() diff --git a/server/parser_test.go b/server/parser_test.go index 7b7d9e03..95631b08 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -104,15 +104,15 @@ func TestParsePong(t *testing.T) { if err != nil || c.state != OP_START { t.Fatalf("Unexpected: %d : %v\n", c.state, err) } - if c.pout != 0 { - t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout) + if c.ping.out != 0 { + t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out) } err = c.parse(pong) if err != nil || c.state != OP_START { t.Fatalf("Unexpected: %d : %v\n", c.state, err) } - if c.pout != 0 { - t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout) + if c.ping.out != 0 { + t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out) } // Should tolerate spaces pong = []byte("PONG \r") @@ -126,20 +126,20 @@ func TestParsePong(t *testing.T) { if err != nil || c.state != OP_START { t.Fatalf("Unexpected: %d : %v\n", c.state, err) } - if c.pout != 0 { - t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout) + if c.ping.out != 0 { + t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out) } // Should be adjusting c.pout (Pings Outstanding): reset to 0 c.state = OP_START - c.pout = 10 + c.ping.out = 10 pong = []byte("PONG\r\n") err = c.parse(pong) if err != nil || c.state != OP_START { t.Fatalf("Unexpected: %d : %v\n", c.state, err) } - if c.pout != 0 { - t.Fatalf("Unexpected pout: %d vs 0\n", c.pout) + if c.ping.out != 0 { + t.Fatalf("Unexpected ping.out: %d vs 0\n", c.ping.out) } } diff --git a/test/monitor_test.go b/test/monitor_test.go index ca38aa11..9bf4bf6f 100644 --- a/test/monitor_test.go +++ b/test/monitor_test.go @@ -102,7 +102,7 @@ func TestNoMonitorPort(t *testing.T) { // testEndpointDataRace tests a monitoring endpoint for data races by polling // while client code acts to ensure statistics are updated. It is designed to -// run under the -race flag to catch violations. The caller must start the +// run under the -race flag to catch violations. The caller must start the // NATS server. func testEndpointDataRace(endpoint string, t *testing.T) { var doneWg sync.WaitGroup @@ -653,8 +653,6 @@ func TestHTTPHost(t *testing.T) { // Create a connection to test ConnInfo func createClientConnSubscribeAndPublish(t *testing.T) net.Conn { cl := createClientConn(t, "localhost", CLIENT_PORT) - - sendCommand(t, cl) send, expect := setupConn(t, cl) expectMsgs := expectMsgsCommand(t, expect)