Merge pull request #639 from nats-io/add_bench_for_routed_queue

Added benchmark for routed queues
This commit is contained in:
Ivan Kozlovic
2018-03-12 15:44:43 -06:00
committed by GitHub

View File

@@ -400,3 +400,66 @@ func Benchmark___RoutedPubSub_1K(b *testing.B) {
func Benchmark_RoutedPubSub_100K(b *testing.B) {
routePubSub(b, 100*1024)
}
func routeQueue(b *testing.B, numQueueSubs, size int) {
b.StopTimer()
s1, o1 := RunServerWithConfig("./configs/srv_a.conf")
defer s1.Shutdown()
s2, o2 := RunServerWithConfig("./configs/srv_b.conf")
defer s2.Shutdown()
sub := createClientConn(b, o1.Host, o1.Port)
doDefaultConnect(b, sub)
for i := 0; i < numQueueSubs; i++ {
sendProto(b, sub, fmt.Sprintf("SUB foo bar %c\r\n", byte(33+i)))
}
flushConnection(b, sub)
payload := sizedString(size)
pub := createClientConn(b, o2.Host, o2.Port)
doDefaultConnect(b, pub)
bw := bufio.NewWriterSize(pub, defaultSendBufSize)
ch := make(chan bool)
sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", len(payload), payload))
expected := len(fmt.Sprintf("MSG foo x %d\r\n%s\r\n", len(payload), payload)) * b.N
go drainConnection(b, sub, ch, expected)
b.StartTimer()
for i := 0; i < b.N; i++ {
_, err := bw.Write(sendOp)
if err != nil {
b.Fatalf("Received error on PUB write: %v\n", err)
}
}
err := bw.Flush()
if err != nil {
b.Errorf("Received error on FLUSH write: %v\n", err)
}
// Wait for connection to be drained
<-ch
b.StopTimer()
pub.Close()
sub.Close()
}
func Benchmark___Routed2QueueSub(b *testing.B) {
routeQueue(b, 2, 2)
}
func Benchmark___Routed4QueueSub(b *testing.B) {
routeQueue(b, 4, 2)
}
func Benchmark___Routed8QueueSub(b *testing.B) {
routeQueue(b, 8, 2)
}
func Benchmark__Routed16QueueSub(b *testing.B) {
routeQueue(b, 16, 2)
}