From 3e3ac69eb16b4cd15686e1f4bc55c2d156b97a87 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 20 Nov 2012 15:01:49 -0800 Subject: [PATCH] Added higher fanout queue sub benchmark --- test/bench_test.go | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/test/bench_test.go b/test/bench_test.go index d71ad31a..fc9038b5 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -165,7 +165,7 @@ func BenchmarkPubSubMultipleConnections(b *testing.B) { s.stopServer() } -func BenchmarkPubQueueSub(b *testing.B) { +func BenchmarkPubTwoQueueSub(b *testing.B) { b.StopTimer() s = startServer(b, PERF_PORT, "") c := createClientConn(b, "localhost", PERF_PORT) @@ -198,5 +198,40 @@ func BenchmarkPubQueueSub(b *testing.B) { s.stopServer() } +func BenchmarkPubFourQueueSub(b *testing.B) { + b.StopTimer() + s = startServer(b, PERF_PORT, "") + c := createClientConn(b, "localhost", PERF_PORT) + doDefaultConnect(b, c) + sendProto(b, c, "SUB foo group1 1\r\n") + sendProto(b, c, "SUB foo group1 2\r\n") + sendProto(b, c, "SUB foo group1 3\r\n") + sendProto(b, c, "SUB foo group1 4\r\n") + bw := bufio.NewWriterSize(c, defaultSendBufSize) + sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n")) + ch := make(chan bool) + expected := len("MSG foo 1 2\r\nok\r\n") * b.N + go drainConnection(b, c, ch, expected) + b.StartTimer() + + for i := 0; i < b.N; i++ { + _, err := bw.Write(sendOp) + if err != nil { + b.Errorf("Received error on PUB write: %v\n", err) + } + } + err := bw.Flush() + if err != nil { + b.Errorf("Received error on FLUSH write: %v\n", err) + } + + // Wait for connection to be drained + <-ch + + b.StopTimer() + c.Close() + s.stopServer() +} +