Added large payload pub/sub benchmark

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-05-23 16:59:02 -07:00
parent 25654a4632
commit 644376209b
5 changed files with 64 additions and 31 deletions

View File

@@ -352,6 +352,16 @@ func TestClientNoBodyPubSubWithReply(t *testing.T) {
}
}
func (c *client) parseFlushAndClose(op []byte) {
c.parse(op)
for cp := range c.pcd {
cp.mu.Lock()
cp.flushOutbound()
cp.mu.Unlock()
}
c.nc.Close()
}
func TestClientPubWithQueueSub(t *testing.T) {
_, c, cr := setupClient()
@@ -366,13 +376,7 @@ func TestClientPubWithQueueSub(t *testing.T) {
op = append(op, pubs...)
}
go func() {
c.parse(op)
for cp := range c.pcd {
cp.bw.Flush()
}
c.nc.Close()
}()
go c.parseFlushAndClose(op)
var n1, n2, received int
for ; ; received++ {
@@ -415,13 +419,7 @@ func TestClientUnSub(t *testing.T) {
op = append(op, unsub...)
op = append(op, pub...)
go func() {
c.parse(op)
for cp := range c.pcd {
cp.bw.Flush()
}
c.nc.Close()
}()
go c.parseFlushAndClose(op)
var received int
for ; ; received++ {
@@ -458,13 +456,7 @@ func TestClientUnSubMax(t *testing.T) {
op = append(op, pub...)
}
go func() {
c.parse(op)
for cp := range c.pcd {
cp.bw.Flush()
}
c.nc.Close()
}()
go c.parseFlushAndClose(op)
var received int
for ; ; received++ {

View File

@@ -1774,7 +1774,6 @@ func TestConfigReloadRotateFiles(t *testing.T) {
func runServerWithSymlinkConfig(t *testing.T, symlinkName, configName string) (*Server, *Options, string) {
opts, config := newOptionsWithSymlinkConfig(t, symlinkName, configName)
opts.NoLog = true
opts.NoSigs = true
return RunServer(opts), opts, config
}

View File

@@ -402,7 +402,7 @@ func TestProcessCommandLineArgs(t *testing.T) {
func TestWriteDeadline(t *testing.T) {
opts := DefaultOptions()
opts.WriteDeadline = 20 * time.Millisecond
opts.WriteDeadline = 1 * time.Millisecond
s := RunServer(opts)
defer s.Shutdown()
@@ -416,7 +416,7 @@ func TestWriteDeadline(t *testing.T) {
}
// Reduce socket buffer to increase reliability of getting
// write deadline errors.
c.(*net.TCPConn).SetReadBuffer(10)
c.(*net.TCPConn).SetReadBuffer(4)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
sender, err := nats.Connect(url)
@@ -439,7 +439,10 @@ func TestWriteDeadline(t *testing.T) {
t.Fatalf("Flush should have returned sooner, took: %v", dur)
}
// Flush sender connection to ensure that all data has been sent.
sender.Flush()
if err := sender.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}
// At this point server should have closed connection c.
// On certain platforms, it may take more than one call before

View File

@@ -248,6 +248,45 @@ func Benchmark_____PubSubTwoConns(b *testing.B) {
s.Shutdown()
}
func Benchmark_PubSub512kTwoConns(b *testing.B) {
b.StopTimer()
s := runBenchServer()
c := createClientConn(b, "localhost", PERF_PORT)
doDefaultConnect(b, c)
bw := bufio.NewWriterSize(c, defaultSendBufSize)
c2 := createClientConn(b, "localhost", PERF_PORT)
doDefaultConnect(b, c2)
sendProto(b, c2, "SUB foo 1\r\n")
flushConnection(b, c2)
sz := 1024 * 512
payload := sizedString(sz)
sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", sz, payload))
ch := make(chan bool)
expected := len(fmt.Sprintf("MSG foo 1 %d\r\n%s\r\n", sz, payload)) * b.N
go drainConnection(b, c2, ch, expected)
b.StartTimer()
for i := 0; i < b.N; i++ {
bw.Write(sendOp)
}
err := bw.Flush()
if err != nil {
b.Errorf("Received error on FLUSH write: %v\n", err)
}
// Wait for connection to be drained
<-ch
b.StopTimer()
c.Close()
c2.Close()
s.Shutdown()
}
func Benchmark_____PubTwoQueueSub(b *testing.B) {
b.StopTimer()
s := runBenchServer()

View File

@@ -662,6 +662,7 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID)
buf := routeExpect(infoRe)
info := server.Info{}
if err := json.Unmarshal(buf[4:], &info); err != nil {
stackFatalf(t, "Could not unmarshal route info: %v", err)
@@ -778,6 +779,7 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
// Now stop the route and restart with an additional URL
rc.Close()
// On route disconnect, clients will receive an updated INFO
expectNothing(t, oldClient)
checkINFOReceived(newClient, newClientExpect, []string{clientURL})
@@ -895,10 +897,8 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
// For this test, be explicit about listen spec.
opts.Host = "127.0.0.1"
opts.Port = 5242
for i := 0; i < 2; i++ {
if i == 1 {
opts.Cluster.NoAdvertise = true
}
f(opts)
}
f(opts)
opts.Cluster.NoAdvertise = true
f(opts)
}