diff --git a/test/bench_test.go b/test/bench_test.go index c20a9226..9747c904 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -400,3 +400,66 @@ func Benchmark___RoutedPubSub_1K(b *testing.B) { func Benchmark_RoutedPubSub_100K(b *testing.B) { routePubSub(b, 100*1024) } + +func routeQueue(b *testing.B, numQueueSubs, size int) { + b.StopTimer() + + s1, o1 := RunServerWithConfig("./configs/srv_a.conf") + defer s1.Shutdown() + s2, o2 := RunServerWithConfig("./configs/srv_b.conf") + defer s2.Shutdown() + + sub := createClientConn(b, o1.Host, o1.Port) + doDefaultConnect(b, sub) + for i := 0; i < numQueueSubs; i++ { + sendProto(b, sub, fmt.Sprintf("SUB foo bar %c\r\n", byte(33+i))) + } + flushConnection(b, sub) + + payload := sizedString(size) + + pub := createClientConn(b, o2.Host, o2.Port) + doDefaultConnect(b, pub) + bw := bufio.NewWriterSize(pub, defaultSendBufSize) + + ch := make(chan bool) + sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", len(payload), payload)) + expected := len(fmt.Sprintf("MSG foo x %d\r\n%s\r\n", len(payload), payload)) * b.N + go drainConnection(b, sub, ch, expected) + b.StartTimer() + + for i := 0; i < b.N; i++ { + _, err := bw.Write(sendOp) + if err != nil { + b.Fatalf("Received error on PUB write: %v\n", err) + } + + } + err := bw.Flush() + if err != nil { + b.Errorf("Received error on FLUSH write: %v\n", err) + } + + // Wait for connection to be drained + <-ch + + b.StopTimer() + pub.Close() + sub.Close() +} + +func Benchmark___Routed2QueueSub(b *testing.B) { + routeQueue(b, 2, 2) +} + +func Benchmark___Routed4QueueSub(b *testing.B) { + routeQueue(b, 4, 2) +} + +func Benchmark___Routed8QueueSub(b *testing.B) { + routeQueue(b, 8, 2) +} + +func Benchmark__Routed16QueueSub(b *testing.B) { + routeQueue(b, 16, 2) +}