mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Added proper processing of client PONGs
This commit is contained in:
@@ -186,6 +186,13 @@ func (c *client) processPing() {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *client) processPong() {
|
||||
c.traceOp("PONG", nil)
|
||||
c.mu.Lock()
|
||||
c.pout -= 1
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
const argsLenMax = 3
|
||||
|
||||
func (c *client) processPub(arg []byte) error {
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
VERSION = "go-0.2.6.alpha.1"
|
||||
VERSION = "go-0.2.8.alpha.1"
|
||||
|
||||
DEFAULT_PORT = 4222
|
||||
DEFAULT_HOST = "0.0.0.0"
|
||||
|
||||
@@ -41,6 +41,9 @@ const (
|
||||
OP_PI
|
||||
OP_PIN
|
||||
OP_PING
|
||||
OP_PO
|
||||
OP_PON
|
||||
OP_PONG
|
||||
MSG_PAYLOAD
|
||||
MSG_END
|
||||
OP_S
|
||||
@@ -84,6 +87,8 @@ func (c *client) parse(buf []byte) error {
|
||||
c.state = OP_PU
|
||||
case 'I', 'i':
|
||||
c.state = OP_PI
|
||||
case 'O', 'o':
|
||||
c.state = OP_PO
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
@@ -275,6 +280,26 @@ func (c *client) parse(buf []byte) error {
|
||||
c.processPing()
|
||||
c.drop, c.state = 0, OP_START
|
||||
}
|
||||
case OP_PO:
|
||||
switch b {
|
||||
case 'N', 'n':
|
||||
c.state = OP_PON
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PON:
|
||||
switch b {
|
||||
case 'G', 'g':
|
||||
c.state = OP_PONG
|
||||
default:
|
||||
goto parseErr
|
||||
}
|
||||
case OP_PONG:
|
||||
switch b {
|
||||
case '\n':
|
||||
c.processPong()
|
||||
c.drop, c.state = 0, OP_START
|
||||
}
|
||||
case OP_C:
|
||||
switch b {
|
||||
case 'O', 'o':
|
||||
|
||||
@@ -59,6 +59,66 @@ func TestParsePing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsePong(t *testing.T) {
|
||||
c := dummyClient()
|
||||
if c.state != OP_START {
|
||||
t.Fatalf("Expected OP_START vs %d\n", c.state)
|
||||
}
|
||||
pong := []byte("PONG\r\n")
|
||||
err := c.parse(pong[:1])
|
||||
if err != nil || c.state != OP_P {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
err = c.parse(pong[1:2])
|
||||
if err != nil || c.state != OP_PO {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
err = c.parse(pong[2:3])
|
||||
if err != nil || c.state != OP_PON {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
err = c.parse(pong[3:4])
|
||||
if err != nil || c.state != OP_PONG {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
err = c.parse(pong[4:5])
|
||||
if err != nil || c.state != OP_PONG {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
err = c.parse(pong[5:6])
|
||||
if err != nil || c.state != OP_START {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
err = c.parse(pong)
|
||||
if err != nil || c.state != OP_START {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
// Should tolerate spaces
|
||||
pong = []byte("PONG \r")
|
||||
err = c.parse(pong)
|
||||
if err != nil || c.state != OP_PONG {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
c.state = OP_START
|
||||
pong = []byte("PONG \r \n")
|
||||
err = c.parse(pong)
|
||||
if err != nil || c.state != OP_START {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
|
||||
// Should be adjusting c.pout, Pings Outstanding
|
||||
c.state = OP_START
|
||||
c.pout = 10
|
||||
pong = []byte("PONG\r\n")
|
||||
err = c.parse(pong)
|
||||
if err != nil || c.state != OP_START {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if c.pout != 9 {
|
||||
t.Fatalf("Unexpected pout: %d vs %d\n", c.pout, 9)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConnect(t *testing.T) {
|
||||
c := dummyClient()
|
||||
connect := []byte("CONNECT {\"verbose\":false,\"pedantic\":true,\"ssl_required\":false}\r\n")
|
||||
|
||||
Reference in New Issue
Block a user