mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Add in pub and pub/sub benchmarks
This commit is contained in:
169
test/bench_test.go
Normal file
169
test/bench_test.go
Normal file
@@ -0,0 +1,169 @@
|
||||
// Copyright 2012 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const PERF_PORT=8422
|
||||
|
||||
const defaultRecBufSize = 32768
|
||||
const defaultSendBufSize = 16384
|
||||
|
||||
func flushConnection(b *testing.B, c net.Conn, buf []byte) {
|
||||
c.Write([]byte("PING\r\n"))
|
||||
c.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
|
||||
n, err := c.Read(buf)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed read: %v\n", err)
|
||||
}
|
||||
if n != 6 && buf[0] != 'P' {
|
||||
b.Fatalf("Failed read of PONG: %s\n", buf)
|
||||
}
|
||||
}
|
||||
|
||||
func benchPub(b *testing.B, subject, payload string) {
|
||||
b.StopTimer()
|
||||
s = startServer(b, PERF_PORT, "")
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
doDefaultConnect(b, c)
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
sendOp := []byte(fmt.Sprintf("PUB %s %d\r\n%s\r\n", subject, len(payload), payload))
|
||||
b.SetBytes(int64(len(sendOp)))
|
||||
buf := make([]byte, 1024)
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bw.Write(sendOp)
|
||||
}
|
||||
bw.Flush()
|
||||
flushConnection(b, c, buf)
|
||||
b.StopTimer()
|
||||
c.Close()
|
||||
s.stopServer()
|
||||
}
|
||||
|
||||
func BenchmarkPubNoPayload(b *testing.B) {
|
||||
benchPub(b, "a", "")
|
||||
}
|
||||
|
||||
func BenchmarkPubMinPayload(b *testing.B) {
|
||||
benchPub(b, "a", "b")
|
||||
}
|
||||
|
||||
func BenchmarkPubTinyPayload(b *testing.B) {
|
||||
benchPub(b, "foo", "ok")
|
||||
}
|
||||
|
||||
func BenchmarkPubSmallPayload(b *testing.B) {
|
||||
benchPub(b, "foo", "hello world")
|
||||
}
|
||||
|
||||
func BenchmarkPubMedPayload(b *testing.B) {
|
||||
benchPub(b, "foo", "The quick brown fox jumps over the lazy dog")
|
||||
}
|
||||
|
||||
func BenchmarkPubLrgPayload(b *testing.B) {
|
||||
b.StopTimer()
|
||||
var p string
|
||||
for i := 0 ; i < 200 ; i++ {
|
||||
p = p + "hello world "
|
||||
}
|
||||
benchPub(b, "foo", p)
|
||||
}
|
||||
|
||||
func drainConnection(b *testing.B, c net.Conn, ch chan bool, expected int) {
|
||||
buf := make([]byte, defaultRecBufSize)
|
||||
bytes := 0
|
||||
|
||||
for {
|
||||
c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
n, err := c.Read(buf)
|
||||
if err != nil {
|
||||
b.Errorf("Error on read: %v\n", err)
|
||||
break
|
||||
}
|
||||
bytes += n
|
||||
if bytes >= expected {
|
||||
break
|
||||
}
|
||||
}
|
||||
if bytes != expected {
|
||||
b.Errorf("Did not receive all bytes: %d vs %d\n", bytes, expected)
|
||||
}
|
||||
ch <- true
|
||||
}
|
||||
|
||||
func BenchmarkPubSub(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s = startServer(b, PERF_PORT, "")
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
doDefaultConnect(b, c)
|
||||
sendProto(b, c, "SUB foo 1\r\n")
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n"))
|
||||
ch := make(chan bool)
|
||||
expected := len("MSG foo 1 2\r\nok\r\n") * b.N
|
||||
go drainConnection(b, c, ch, expected)
|
||||
b.StartTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := bw.Write(sendOp)
|
||||
if err != nil {
|
||||
b.Errorf("Received error on PUB write: %v\n", err)
|
||||
}
|
||||
}
|
||||
err := bw.Flush()
|
||||
if err != nil {
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait for connection to be drained
|
||||
<-ch
|
||||
|
||||
b.StopTimer()
|
||||
c.Close()
|
||||
s.stopServer()
|
||||
}
|
||||
|
||||
func BenchmarkPubSubMultipleConnections(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s = startServer(b, PERF_PORT, "")
|
||||
c := createClientConn(b, "localhost", PERF_PORT)
|
||||
doDefaultConnect(b, c)
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
|
||||
c2 := createClientConn(b, "localhost", PERF_PORT)
|
||||
doDefaultConnect(b, c2)
|
||||
sendProto(b, c2, "SUB foo 1\r\n")
|
||||
|
||||
sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n"))
|
||||
ch := make(chan bool)
|
||||
|
||||
expected := len("MSG foo 1 2\r\nok\r\n") * b.N
|
||||
go drainConnection(b, c2, ch, expected)
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bw.Write(sendOp)
|
||||
}
|
||||
err := bw.Flush()
|
||||
if err != nil {
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait for connection to be drained
|
||||
<-ch
|
||||
|
||||
b.StopTimer()
|
||||
c.Close()
|
||||
c2.Close()
|
||||
s.stopServer()
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user