mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Merge pull request #1175 from nats-io/fix_1174
[FIXED] Server should not send RTT PING before sending initial PONG
This commit is contained in:
@@ -71,6 +71,12 @@ const (
|
||||
shortsToShrink = 2 // Trigger to shrink dynamic buffers
|
||||
maxFlushPending = 10 // Max fsps to have in order to wait for writeLoop
|
||||
readLoopReport = 2 * time.Second
|
||||
|
||||
// Server should not send a PING (for RTT) before the first PONG has
|
||||
// been sent to the client. However, in case some client libs don't
|
||||
// send CONNECT+PING, cap the maximum time before server can send
|
||||
// the RTT PING.
|
||||
maxNoRTTPingBeforeFirstPong = 2 * time.Second
|
||||
)
|
||||
|
||||
var readLoopReportThreshold = readLoopReport
|
||||
@@ -1525,12 +1531,29 @@ func (c *client) sendPong() {
|
||||
}
|
||||
|
||||
// Used to kick off a RTT measurement for latency tracking.
|
||||
func (c *client) sendRTTPing() {
|
||||
func (c *client) sendRTTPing() bool {
|
||||
c.mu.Lock()
|
||||
if c.flags.isSet(connectReceived) {
|
||||
c.sendPing()
|
||||
}
|
||||
sent := c.sendRTTPingLocked()
|
||||
c.mu.Unlock()
|
||||
return sent
|
||||
}
|
||||
|
||||
// Used to kick off a RTT measurement for latency tracking.
|
||||
// This is normally called only when the caller has checked that
|
||||
// the c.rtt is 0 and wants to force an update by sending a PING.
|
||||
// Client lock held on entry.
|
||||
func (c *client) sendRTTPingLocked() bool {
|
||||
// Most client libs send a CONNECT+PING and wait for a PONG from the
|
||||
// server. So if firstPongSent flag is set, it is ok for server to
|
||||
// send the PING. But in case we have client libs that don't do that,
|
||||
// allow the send of the PING if more than 2 secs have elapsed since
|
||||
// the client TCP connection was accepted.
|
||||
if !c.flags.isSet(clearConnection) &&
|
||||
(c.flags.isSet(firstPongSent) || time.Since(c.start) > maxNoRTTPingBeforeFirstPong) {
|
||||
c.sendPing()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Assume the lock is held upon entry.
|
||||
@@ -2322,8 +2345,8 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte) bool {
|
||||
// This needs to be from a non-client (otherwise tracking happens at requestor).
|
||||
if client.kind == CLIENT && len(c.pa.reply) > minReplyLen {
|
||||
// If we do not have a registered RTT queue that up now.
|
||||
if client.rtt == 0 && client.flags.isSet(connectReceived) {
|
||||
client.sendPing()
|
||||
if client.rtt == 0 {
|
||||
client.sendRTTPingLocked()
|
||||
}
|
||||
// FIXME(dlc) - We may need to optimize this.
|
||||
// We will have tagged this with a suffix ('.T') if we are tracking. This is
|
||||
|
||||
@@ -1632,3 +1632,66 @@ func TestResponsePermissions(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPingNotSentTooSoon(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
doneCh := make(chan bool, 1)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
s.Connz(nil)
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-time.After(time.Millisecond):
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
nc, err := nats.Connect(s.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
nc.Close()
|
||||
}
|
||||
close(doneCh)
|
||||
wg.Wait()
|
||||
|
||||
c, br, _ := newClientForServer(s)
|
||||
connectOp := []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n")
|
||||
c.parse(connectOp)
|
||||
|
||||
// Since client has not send PING, having server try to send RTT ping
|
||||
// to client should not do anything
|
||||
if c.sendRTTPing() {
|
||||
t.Fatalf("RTT ping should not have been sent")
|
||||
}
|
||||
// Speed up detection of time elapsed by moving the c.start to more than
|
||||
// 2 secs in the past.
|
||||
c.mu.Lock()
|
||||
c.start = time.Unix(0, c.start.UnixNano()-int64(maxNoRTTPingBeforeFirstPong+time.Second))
|
||||
c.mu.Unlock()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
l, _ := br.ReadString('\n')
|
||||
if l != "PING\r\n" {
|
||||
errCh <- fmt.Errorf("expected to get PING, got %s", l)
|
||||
return
|
||||
}
|
||||
errCh <- nil
|
||||
}()
|
||||
if !c.sendRTTPing() {
|
||||
t.Fatalf("RTT ping should have been sent")
|
||||
}
|
||||
wg.Wait()
|
||||
if e := <-errCh; e != nil {
|
||||
t.Fatal(e.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -448,8 +448,8 @@ func (c *client) getRTT() string {
|
||||
if c.rtt == 0 {
|
||||
// If a real client, go ahead and send ping now to get a value
|
||||
// for RTT. For tests and telnet, or if client is closing, etc skip.
|
||||
if !c.flags.isSet(clearConnection) && c.flags.isSet(connectReceived) && c.opts.Lang != "" {
|
||||
c.sendPing()
|
||||
if c.opts.Lang != "" {
|
||||
c.sendRTTPingLocked()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user