Merge pull request #316 from nats-io/improve_route_perf_big_msgs

[IMPROVED] Route performance for larger messages
This commit is contained in:
Derek Collison
2016-08-01 18:06:48 -07:00
committed by GitHub
4 changed files with 117 additions and 5 deletions

View File

@@ -503,6 +503,11 @@ func (c *client) parse(buf []byte) error {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
// jump ahead with the index. If this overruns
// what is left we fall out and process split
// buffer.
i = c.as + c.pa.size - 1
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
@@ -661,6 +666,7 @@ func (c *client) parse(buf []byte) error {
// We need to clone the pubArg if it is still referencing the
// read buffer and we are not able to process the msg.
if c.argBuf == nil {
// Works also for MSG_ARG, when message comes from ROUTE.
c.clonePubArg()
}

View File

@@ -340,8 +340,6 @@ func TestParseMsgSpace(t *testing.T) {
}
func TestShouldFail(t *testing.T) {
c := dummyClient()
wrongProtos := []string{
"xxx",
"Px", "PIx", "PINx", " PING",
@@ -359,17 +357,17 @@ func TestShouldFail(t *testing.T) {
"Ix", "INx", "INFx", "INFO \r\n",
}
for _, proto := range wrongProtos {
c.state = OP_START
c := dummyClient()
if err := c.parse([]byte(proto)); err == nil {
t.Fatalf("Should have received a parse error for: %v", proto)
}
}
// Special case for MSG, type needs to not be client.
c.typ = ROUTER
wrongProtos = []string{"Mx", "MSx", "MSGx", "MSG \r\n"}
for _, proto := range wrongProtos {
c.state = OP_START
c := dummyClient()
c.typ = ROUTER
if err := c.parse([]byte(proto)); err == nil {
t.Fatalf("Should have received a parse error for: %v", proto)
}

View File

@@ -345,6 +345,57 @@ func TestSplitDanglingArgBuf(t *testing.T) {
if c.argBuf != nil {
t.Fatalf("Expected c.argBuf to be nil: %q\n", c.argBuf)
}
// MSG (the client has to be a ROUTE)
c = &client{subs: make(map[string]*subscription), typ: ROUTER}
msgop := []byte("MSG foo RSID:2:1 5\r\nhello\r\n")
c.parse(msgop[:5])
c.parse(msgop[5:10])
if c.argBuf == nil {
t.Fatal("Expected a non-nil argBuf")
}
if string(c.argBuf) != "foo RS" {
t.Fatalf("Expected argBuf to be \"foo 1 \", got %q", string(c.argBuf))
}
c.parse(msgop[10:])
if c.argBuf != nil {
t.Fatalf("Expected argBuf to be nil: %q", c.argBuf)
}
if c.msgBuf != nil {
t.Fatalf("Expected msgBuf to be nil: %q", c.msgBuf)
}
c.state = OP_START
// Parse up-to somewhere in the middle of the payload.
// Verify that we have saved the MSG_ARG info
c.parse(msgop[:23])
if c.argBuf == nil {
t.Fatal("Expected a non-nil argBuf")
}
if string(c.pa.subject) != "foo" {
t.Fatalf("Expected subject to be \"foo\", got %q", c.pa.subject)
}
if string(c.pa.reply) != "" {
t.Fatalf("Expected reply to be \"\", got %q", c.pa.reply)
}
if string(c.pa.sid) != "RSID:2:1" {
t.Fatalf("Expected sid to \"RSID:2:1\", got %q", c.pa.sid)
}
if c.pa.size != 5 {
t.Fatalf("Expected sid to 5, got %v", c.pa.size)
}
// msg buffer should be
if c.msgBuf == nil || string(c.msgBuf) != "hel" {
t.Fatalf("Expected msgBuf to be \"hel\", got %q", c.msgBuf)
}
c.parse(msgop[23:])
// At the end, we should have cleaned-up both arg and msg buffers.
if c.argBuf != nil {
t.Fatalf("Expected argBuf to be nil: %q", c.argBuf)
}
if c.msgBuf != nil {
t.Fatalf("Expected msgBuf to be nil: %q", c.msgBuf)
}
}
func TestSplitMsgArg(t *testing.T) {

View File

@@ -337,3 +337,60 @@ func Benchmark__PubEightQueueSub(b *testing.B) {
c.Close()
s.Shutdown()
}
func routePubSub(b *testing.B, size int) {
b.StopTimer()
s1, o1 := RunServerWithConfig("./configs/srv_a.conf")
defer s1.Shutdown()
s2, o2 := RunServerWithConfig("./configs/srv_b.conf")
defer s2.Shutdown()
sub := createClientConn(b, o1.Host, o1.Port)
doDefaultConnect(b, sub)
sendProto(b, sub, "SUB foo 1\r\n")
flushConnection(b, sub)
payload := sizedString(size)
pub := createClientConn(b, o2.Host, o2.Port)
doDefaultConnect(b, pub)
bw := bufio.NewWriterSize(pub, defaultSendBufSize)
ch := make(chan bool)
sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", len(payload), payload))
expected := len(fmt.Sprintf("MSG foo 1 %d\r\n%s\r\n", len(payload), payload)) * b.N
go drainConnection(b, sub, ch, expected)
b.StartTimer()
for i := 0; i < b.N; i++ {
_, err := bw.Write(sendOp)
if err != nil {
b.Fatalf("Received error on PUB write: %v\n", err)
}
}
err := bw.Flush()
if err != nil {
b.Errorf("Received error on FLUSH write: %v\n", err)
}
// Wait for connection to be drained
<-ch
b.StopTimer()
pub.Close()
sub.Close()
}
func Benchmark_RoutePubSub_NoPayload(b *testing.B) {
routePubSub(b, 2)
}
func Benchmark_RoutePubSub_1K(b *testing.B) {
routePubSub(b, 1024)
}
func Benchmark_RoutePubSub_100K(b *testing.B) {
routePubSub(b, 100*1024)
}