diff --git a/server/parser.go b/server/parser.go index ee5ed3a6..25ed6d09 100644 --- a/server/parser.go +++ b/server/parser.go @@ -503,6 +503,11 @@ func (c *client) parse(buf []byte) error { return err } c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD + + // jump ahead with the index. If this overruns + // what is left we fall out and process split + // buffer. + i = c.as + c.pa.size - 1 default: if c.argBuf != nil { c.argBuf = append(c.argBuf, b) @@ -661,6 +666,7 @@ func (c *client) parse(buf []byte) error { // We need to clone the pubArg if it is still referencing the // read buffer and we are not able to process the msg. if c.argBuf == nil { + // Works also for MSG_ARG, when message comes from ROUTE. c.clonePubArg() } diff --git a/server/parser_test.go b/server/parser_test.go index c513f5a8..b1691486 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -340,8 +340,6 @@ func TestParseMsgSpace(t *testing.T) { } func TestShouldFail(t *testing.T) { - c := dummyClient() - wrongProtos := []string{ "xxx", "Px", "PIx", "PINx", " PING", @@ -359,17 +357,17 @@ func TestShouldFail(t *testing.T) { "Ix", "INx", "INFx", "INFO \r\n", } for _, proto := range wrongProtos { - c.state = OP_START + c := dummyClient() if err := c.parse([]byte(proto)); err == nil { t.Fatalf("Should have received a parse error for: %v", proto) } } // Special case for MSG, type needs to not be client. - c.typ = ROUTER wrongProtos = []string{"Mx", "MSx", "MSGx", "MSG \r\n"} for _, proto := range wrongProtos { - c.state = OP_START + c := dummyClient() + c.typ = ROUTER if err := c.parse([]byte(proto)); err == nil { t.Fatalf("Should have received a parse error for: %v", proto) } diff --git a/server/split_test.go b/server/split_test.go index 243c54b5..e4cf72c4 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -345,6 +345,57 @@ func TestSplitDanglingArgBuf(t *testing.T) { if c.argBuf != nil { t.Fatalf("Expected c.argBuf to be nil: %q\n", c.argBuf) } + + // MSG (the client has to be a ROUTE) + c = &client{subs: make(map[string]*subscription), typ: ROUTER} + msgop := []byte("MSG foo RSID:2:1 5\r\nhello\r\n") + c.parse(msgop[:5]) + c.parse(msgop[5:10]) + if c.argBuf == nil { + t.Fatal("Expected a non-nil argBuf") + } + if string(c.argBuf) != "foo RS" { + t.Fatalf("Expected argBuf to be \"foo 1 \", got %q", string(c.argBuf)) + } + c.parse(msgop[10:]) + if c.argBuf != nil { + t.Fatalf("Expected argBuf to be nil: %q", c.argBuf) + } + if c.msgBuf != nil { + t.Fatalf("Expected msgBuf to be nil: %q", c.msgBuf) + } + + c.state = OP_START + // Parse up-to somewhere in the middle of the payload. + // Verify that we have saved the MSG_ARG info + c.parse(msgop[:23]) + if c.argBuf == nil { + t.Fatal("Expected a non-nil argBuf") + } + if string(c.pa.subject) != "foo" { + t.Fatalf("Expected subject to be \"foo\", got %q", c.pa.subject) + } + if string(c.pa.reply) != "" { + t.Fatalf("Expected reply to be \"\", got %q", c.pa.reply) + } + if string(c.pa.sid) != "RSID:2:1" { + t.Fatalf("Expected sid to \"RSID:2:1\", got %q", c.pa.sid) + } + if c.pa.size != 5 { + t.Fatalf("Expected sid to 5, got %v", c.pa.size) + } + // msg buffer should be + if c.msgBuf == nil || string(c.msgBuf) != "hel" { + t.Fatalf("Expected msgBuf to be \"hel\", got %q", c.msgBuf) + } + c.parse(msgop[23:]) + // At the end, we should have cleaned-up both arg and msg buffers. + if c.argBuf != nil { + t.Fatalf("Expected argBuf to be nil: %q", c.argBuf) + } + if c.msgBuf != nil { + t.Fatalf("Expected msgBuf to be nil: %q", c.msgBuf) + } } func TestSplitMsgArg(t *testing.T) { diff --git a/test/bench_test.go b/test/bench_test.go index 16faaaeb..1d728d54 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -337,3 +337,60 @@ func Benchmark__PubEightQueueSub(b *testing.B) { c.Close() s.Shutdown() } + +func routePubSub(b *testing.B, size int) { + b.StopTimer() + + s1, o1 := RunServerWithConfig("./configs/srv_a.conf") + defer s1.Shutdown() + s2, o2 := RunServerWithConfig("./configs/srv_b.conf") + defer s2.Shutdown() + + sub := createClientConn(b, o1.Host, o1.Port) + doDefaultConnect(b, sub) + sendProto(b, sub, "SUB foo 1\r\n") + flushConnection(b, sub) + + payload := sizedString(size) + + pub := createClientConn(b, o2.Host, o2.Port) + doDefaultConnect(b, pub) + bw := bufio.NewWriterSize(pub, defaultSendBufSize) + + ch := make(chan bool) + sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", len(payload), payload)) + expected := len(fmt.Sprintf("MSG foo 1 %d\r\n%s\r\n", len(payload), payload)) * b.N + go drainConnection(b, sub, ch, expected) + b.StartTimer() + + for i := 0; i < b.N; i++ { + _, err := bw.Write(sendOp) + if err != nil { + b.Fatalf("Received error on PUB write: %v\n", err) + } + + } + err := bw.Flush() + if err != nil { + b.Errorf("Received error on FLUSH write: %v\n", err) + } + + // Wait for connection to be drained + <-ch + + b.StopTimer() + pub.Close() + sub.Close() +} + +func Benchmark_RoutePubSub_NoPayload(b *testing.B) { + routePubSub(b, 2) +} + +func Benchmark_RoutePubSub_1K(b *testing.B) { + routePubSub(b, 1024) +} + +func Benchmark_RoutePubSub_100K(b *testing.B) { + routePubSub(b, 100*1024) +}