mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
@@ -116,12 +116,14 @@ type client struct {
|
||||
in readCache
|
||||
pcd map[*client]struct{}
|
||||
atmr *time.Timer
|
||||
ptmr *time.Timer
|
||||
pout int
|
||||
ping pinfo
|
||||
msgb [msgScratchSize]byte
|
||||
last time.Time
|
||||
parseState
|
||||
|
||||
rtt time.Duration
|
||||
rttStart time.Time
|
||||
|
||||
route *route
|
||||
debug bool
|
||||
trace bool
|
||||
@@ -129,6 +131,12 @@ type client struct {
|
||||
flags clientFlag // Compact booleans into a single field. Size will be increased when needed.
|
||||
}
|
||||
|
||||
// Struct for PING initiation from the server.
|
||||
type pinfo struct {
|
||||
tmr *time.Timer
|
||||
out int
|
||||
}
|
||||
|
||||
// outbound holds pending data for a socket.
|
||||
type outbound struct {
|
||||
p []byte // Primary write buffer
|
||||
@@ -818,7 +826,8 @@ func (c *client) sendPong() {
|
||||
|
||||
// Assume the lock is held upon entry.
|
||||
func (c *client) sendPing() {
|
||||
c.pout++
|
||||
c.rttStart = time.Now()
|
||||
c.ping.out++
|
||||
c.traceOutOp("PING", nil)
|
||||
c.sendProto([]byte("PING\r\n"), true)
|
||||
}
|
||||
@@ -898,7 +907,8 @@ func (c *client) processPing() {
|
||||
func (c *client) processPong() {
|
||||
c.traceInOp("PONG", nil)
|
||||
c.mu.Lock()
|
||||
c.pout = 0
|
||||
c.ping.out = 0
|
||||
c.rtt = time.Since(c.rttStart)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -1483,7 +1493,7 @@ func (c *client) pubPermissionViolation(subject []byte) {
|
||||
func (c *client) processPingTimer() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.ptmr = nil
|
||||
c.ping.tmr = nil
|
||||
// Check if connection is still opened
|
||||
if c.nc == nil {
|
||||
return
|
||||
@@ -1492,15 +1502,21 @@ func (c *client) processPingTimer() {
|
||||
c.Debugf("%s Ping Timer", c.typeString())
|
||||
|
||||
// Check for violation
|
||||
if c.pout+1 > c.srv.getOpts().MaxPingsOut {
|
||||
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
|
||||
c.Debugf("Stale Client Connection - Closing")
|
||||
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
|
||||
c.clearConnection()
|
||||
return
|
||||
}
|
||||
|
||||
// Send PING
|
||||
c.sendPing()
|
||||
// If we have had activity within the PingInterval no
|
||||
// need to send a ping.
|
||||
if delta := time.Since(c.last); delta < c.srv.getOpts().PingInterval {
|
||||
c.Debugf("Delaying PING due to activity %v ago", delta.Round(time.Second))
|
||||
} else {
|
||||
// Send PING
|
||||
c.sendPing()
|
||||
}
|
||||
|
||||
// Reset to fire again.
|
||||
c.setPingTimer()
|
||||
@@ -1512,16 +1528,16 @@ func (c *client) setPingTimer() {
|
||||
return
|
||||
}
|
||||
d := c.srv.getOpts().PingInterval
|
||||
c.ptmr = time.AfterFunc(d, c.processPingTimer)
|
||||
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
|
||||
}
|
||||
|
||||
// Lock should be held
|
||||
func (c *client) clearPingTimer() {
|
||||
if c.ptmr == nil {
|
||||
if c.ping.tmr == nil {
|
||||
return
|
||||
}
|
||||
c.ptmr.Stop()
|
||||
c.ptmr = nil
|
||||
c.ping.tmr.Stop()
|
||||
c.ping.tmr = nil
|
||||
}
|
||||
|
||||
// Lock should be held
|
||||
|
||||
@@ -73,6 +73,7 @@ type ConnInfo struct {
|
||||
Port int `json:"port"`
|
||||
Start time.Time `json:"start"`
|
||||
LastActivity time.Time `json:"last_activity"`
|
||||
RTT string `json:"rtt,omitempty"`
|
||||
Uptime string `json:"uptime"`
|
||||
Idle string `json:"idle"`
|
||||
Pending int `json:"pending_bytes"`
|
||||
@@ -218,6 +219,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
|
||||
ci.LastActivity = client.last
|
||||
ci.Uptime = myUptime(c.Now.Sub(client.start))
|
||||
ci.Idle = myUptime(c.Now.Sub(client.last))
|
||||
ci.RTT = client.getRTT()
|
||||
ci.OutMsgs = client.outMsgs
|
||||
ci.OutBytes = client.outBytes
|
||||
ci.NumSubs = uint32(len(client.subs))
|
||||
@@ -289,6 +291,25 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Assume lock is held
|
||||
func (c *client) getRTT() string {
|
||||
if c.rtt == 0 {
|
||||
// If a real client, go ahead and send ping now to get a value
|
||||
// for RTT. For tests and telnet, etc skip.
|
||||
if c.flags.isSet(connectReceived) && c.opts.Lang != "" {
|
||||
c.sendPing()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
var rtt time.Duration
|
||||
if c.rtt > time.Microsecond && c.rtt < time.Millisecond {
|
||||
rtt = c.rtt.Truncate(time.Microsecond)
|
||||
} else {
|
||||
rtt = c.rtt.Truncate(time.Millisecond)
|
||||
}
|
||||
return fmt.Sprintf("%v", rtt)
|
||||
}
|
||||
|
||||
func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error) {
|
||||
str := r.URL.Query().Get(param)
|
||||
if str == "" {
|
||||
|
||||
@@ -309,19 +309,22 @@ func TestConnz(t *testing.T) {
|
||||
t.Fatalf("Expected OutBytes of 1, got %v\n", ci.OutBytes)
|
||||
}
|
||||
if ci.Start.IsZero() {
|
||||
t.Fatalf("Expected Start to be valid\n")
|
||||
t.Fatal("Expected Start to be valid\n")
|
||||
}
|
||||
if ci.Uptime == "" {
|
||||
t.Fatalf("Expected Uptime to be valid\n")
|
||||
t.Fatal("Expected Uptime to be valid\n")
|
||||
}
|
||||
if ci.LastActivity.IsZero() {
|
||||
t.Fatalf("Expected LastActivity to be valid\n")
|
||||
t.Fatal("Expected LastActivity to be valid\n")
|
||||
}
|
||||
if ci.LastActivity.UnixNano() < ci.Start.UnixNano() {
|
||||
t.Fatalf("Expected LastActivity [%v] to be > Start [%v]\n", ci.LastActivity, ci.Start)
|
||||
}
|
||||
if ci.Idle == "" {
|
||||
t.Fatalf("Expected Idle to be valid\n")
|
||||
t.Fatal("Expected Idle to be valid\n")
|
||||
}
|
||||
if ci.RTT != "" {
|
||||
t.Fatal("Expected RTT to NOT be set for new connection\n")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,6 +390,60 @@ func ensureServerActivityRecorded(t *testing.T, nc *nats.Conn) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnzRTT(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().Port)
|
||||
|
||||
testRTT := func(mode int) {
|
||||
// Test with connections.
|
||||
nc := createClientConnSubscribeAndPublish(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
c := pollConz(t, s, mode, url+"connz", nil)
|
||||
|
||||
if c.NumConns != 1 {
|
||||
t.Fatalf("Expected 1 connection, got %d\n", c.NumConns)
|
||||
}
|
||||
|
||||
// Send a server side PING to record RTT
|
||||
s.mu.Lock()
|
||||
ci := c.Conns[0]
|
||||
sc := s.clients[ci.Cid]
|
||||
if sc == nil {
|
||||
t.Fatalf("Error looking up client %v\n", ci.Cid)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
sc.mu.Lock()
|
||||
sc.sendPing()
|
||||
sc.mu.Unlock()
|
||||
|
||||
// Wait for client to respond with PONG
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// Repoll for updated information.
|
||||
c = pollConz(t, s, mode, url+"connz", nil)
|
||||
ci = c.Conns[0]
|
||||
|
||||
rtt, err := time.ParseDuration(ci.RTT)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not parse RTT properly, %v", err)
|
||||
}
|
||||
if rtt <= 0 {
|
||||
t.Fatal("Expected RTT to be valid and non-zero\n")
|
||||
}
|
||||
if rtt > 5*time.Millisecond || rtt < 100*time.Nanosecond {
|
||||
t.Fatalf("Invalid RTT of %s\n", ci.RTT)
|
||||
}
|
||||
}
|
||||
|
||||
for mode := 0; mode < 2; mode++ {
|
||||
testRTT(mode)
|
||||
waitForClientConnCount(t, s, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnzLastActivity(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -104,15 +104,15 @@ func TestParsePong(t *testing.T) {
|
||||
if err != nil || c.state != OP_START {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if c.pout != 0 {
|
||||
t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout)
|
||||
if c.ping.out != 0 {
|
||||
t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out)
|
||||
}
|
||||
err = c.parse(pong)
|
||||
if err != nil || c.state != OP_START {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if c.pout != 0 {
|
||||
t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout)
|
||||
if c.ping.out != 0 {
|
||||
t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out)
|
||||
}
|
||||
// Should tolerate spaces
|
||||
pong = []byte("PONG \r")
|
||||
@@ -126,20 +126,20 @@ func TestParsePong(t *testing.T) {
|
||||
if err != nil || c.state != OP_START {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if c.pout != 0 {
|
||||
t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout)
|
||||
if c.ping.out != 0 {
|
||||
t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out)
|
||||
}
|
||||
|
||||
// Should be adjusting c.pout (Pings Outstanding): reset to 0
|
||||
c.state = OP_START
|
||||
c.pout = 10
|
||||
c.ping.out = 10
|
||||
pong = []byte("PONG\r\n")
|
||||
err = c.parse(pong)
|
||||
if err != nil || c.state != OP_START {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if c.pout != 0 {
|
||||
t.Fatalf("Unexpected pout: %d vs 0\n", c.pout)
|
||||
if c.ping.out != 0 {
|
||||
t.Fatalf("Unexpected ping.out: %d vs 0\n", c.ping.out)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ func TestNoMonitorPort(t *testing.T) {
|
||||
|
||||
// testEndpointDataRace tests a monitoring endpoint for data races by polling
|
||||
// while client code acts to ensure statistics are updated. It is designed to
|
||||
// run under the -race flag to catch violations. The caller must start the
|
||||
// run under the -race flag to catch violations. The caller must start the
|
||||
// NATS server.
|
||||
func testEndpointDataRace(endpoint string, t *testing.T) {
|
||||
var doneWg sync.WaitGroup
|
||||
@@ -653,8 +653,6 @@ func TestHTTPHost(t *testing.T) {
|
||||
// Create a connection to test ConnInfo
|
||||
func createClientConnSubscribeAndPublish(t *testing.T) net.Conn {
|
||||
cl := createClientConn(t, "localhost", CLIENT_PORT)
|
||||
|
||||
sendCommand(t, cl)
|
||||
send, expect := setupConn(t, cl)
|
||||
expectMsgs := expectMsgsCommand(t, expect)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user