mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Added higher fanout queue sub benchmark
This commit is contained in:
@@ -165,7 +165,7 @@ func BenchmarkPubSubMultipleConnections(b *testing.B) {
|
||||
s.stopServer()
|
||||
}
|
||||
|
||||
func BenchmarkPubQueueSub(b *testing.B) {
|
||||
func BenchmarkPubTwoQueueSub(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s = startServer(b, PERF_PORT, "")
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
@@ -198,5 +198,40 @@ func BenchmarkPubQueueSub(b *testing.B) {
|
||||
s.stopServer()
|
||||
}
|
||||
|
||||
func BenchmarkPubFourQueueSub(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s = startServer(b, PERF_PORT, "")
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
doDefaultConnect(b, c)
|
||||
sendProto(b, c, "SUB foo group1 1\r\n")
|
||||
sendProto(b, c, "SUB foo group1 2\r\n")
|
||||
sendProto(b, c, "SUB foo group1 3\r\n")
|
||||
sendProto(b, c, "SUB foo group1 4\r\n")
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n"))
|
||||
ch := make(chan bool)
|
||||
expected := len("MSG foo 1 2\r\nok\r\n") * b.N
|
||||
go drainConnection(b, c, ch, expected)
|
||||
b.StartTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := bw.Write(sendOp)
|
||||
if err != nil {
|
||||
b.Errorf("Received error on PUB write: %v\n", err)
|
||||
}
|
||||
}
|
||||
err := bw.Flush()
|
||||
if err != nil {
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait for connection to be drained
|
||||
<-ch
|
||||
|
||||
b.StopTimer()
|
||||
c.Close()
|
||||
s.stopServer()
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user