Refine core and TLS benchmarks (#4475)

- [ ] Link to issue, e.g. `Resolves #NNN`
 - [ ] Documentation added (if applicable)
 - [x] Tests added
- [x] Branch rebased on top of current main (`git pull --rebase origin
main`)
- [ ] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [ ] Build is green in Travis CI
- [x] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)

Related: https://github.com/nats-io/nats-server/pull/4399

### Changes proposed in this pull request:

 - Avoid binding to hardcoded port, as it may conflict if already in use
- Drop TLS from Core Request/Response benchmark, was not showing much
difference, not enough data going through to become a dominant cost
- Add a different core benchmark that exercises TLS by pushing through
lots of data
This commit is contained in:
Derek Collison
2023-09-01 13:58:27 -07:00
committed by GitHub
6 changed files with 215 additions and 81 deletions

View File

@@ -1,6 +1,6 @@
# Simple TLS (ed25519) config file
listen: 127.0.0.1:4443
listen: 127.0.0.1:-1
tls {
cert_file: "./configs/certs/tls/benchmark-server-cert-ed25519.pem"

View File

@@ -1,4 +1,4 @@
# Simple config file
listen: 127.0.0.1:4443
listen: 127.0.0.1:-1

View File

@@ -1,6 +1,6 @@
# Simple TLS (rsa-1024) config file
listen: 127.0.0.1:4443
listen: 127.0.0.1:-1
tls {
cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-1024.pem"

View File

@@ -1,6 +1,6 @@
# Simple TLS (rsa-2048) config file
listen: 127.0.0.1:4443
listen: 127.0.0.1:-1
tls {
cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-2048.pem"

View File

@@ -1,6 +1,6 @@
# Simple TLS (rsa-4096) config file
listen: 127.0.0.1:4443
listen: 127.0.0.1:-1
tls {
cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-4096.pem"

View File

@@ -16,102 +16,236 @@ package server
import (
"crypto/rand"
"crypto/tls"
"errors"
"fmt"
"os"
"sync"
"testing"
"time"
"github.com/nats-io/nats.go"
)
func BenchmarkRequestReplyOverEncryptedConnection(b *testing.B) {
func BenchmarkCoreRequestReply(b *testing.B) {
const (
subject = "test-subject"
configsBasePath = "./configs/tls"
subject = "test-subject"
)
// default TLS client connection options
defaultOpts := []nats.Option{}
keyTypes := []string{
"none",
"ed25519",
"rsa-1024",
"rsa-2048",
"rsa-4096",
}
payloadSzs := []int64{
messageSizes := []int64{
1024, // 1kb
4096, // 4kb
40960, // 40kb
409600, // 400kb
}
for _, keyType := range keyTypes {
schemeConfig := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType)
b.Run(fmt.Sprintf("keyType=%s", keyType), func(b *testing.B) {
for _, payloadSz := range payloadSzs {
b.Run(fmt.Sprintf("payloadSz=%db", payloadSz), func(b *testing.B) {
for _, messageSize := range messageSizes {
b.Run(fmt.Sprintf("msgSz=%db", messageSize), func(b *testing.B) {
// run server with tls scheme
server, _ := RunServerWithConfig(schemeConfig)
opts := defaultOpts
defer server.Shutdown()
// Start server
serverOpts := DefaultOptions()
server := RunServer(serverOpts)
defer server.Shutdown()
if keyType != "none" {
opts = append(opts, nats.Secure(&tls.Config{
InsecureSkipVerify: true,
}))
}
clientUrl := server.ClientURL()
// default client url
clientUrl := server.ClientURL()
// subscriber
ncSub, err := nats.Connect(clientUrl, opts...)
if err != nil {
b.Fatal(err)
}
defer ncSub.Close()
sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) {
// Responder echoes the request payload as-is
msg.Respond(msg.Data)
})
defer sub.Unsubscribe()
if err != nil {
b.Fatal(err)
}
// publisher
ncPub, err := nats.Connect(clientUrl, opts...)
if err != nil {
b.Fatal(err)
}
defer ncPub.Close()
var errors = 0
// random bytes as payload
b.SetBytes(payloadSz)
payload := make([]byte, payloadSz)
rand.Read(payload)
// start benchmark
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := ncPub.Request(subject, payload, time.Second)
if err != nil {
errors++
}
}
// stop benchmark
b.StopTimer()
b.ReportMetric(float64(errors), "errors")
})
// Create "echo" subscriber
ncSub, err := nats.Connect(clientUrl)
if err != nil {
b.Fatal(err)
}
defer ncSub.Close()
sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) {
// Responder echoes the request payload as-is
msg.Respond(msg.Data)
})
defer sub.Unsubscribe()
if err != nil {
b.Fatal(err)
}
// Create publisher
ncPub, err := nats.Connect(clientUrl)
if err != nil {
b.Fatal(err)
}
defer ncPub.Close()
var errors = 0
// Create message (reused for all requests)
messageData := make([]byte, messageSize)
b.SetBytes(messageSize)
rand.Read(messageData)
// Benchmark
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := ncPub.Request(subject, messageData, time.Second)
if err != nil {
errors++
}
}
b.StopTimer()
b.ReportMetric(float64(errors), "errors")
})
}
}
func BenchmarkCoreTLSFanOut(b *testing.B) {
const (
subject = "test-subject"
configsBasePath = "./configs/tls"
maxPendingMessages = 25
maxPendingBytes = 15 * 1024 * 1024 // 15MiB
)
keyTypeCases := []string{
"none",
"ed25519",
"rsa-1024",
"rsa-2048",
"rsa-4096",
}
messageSizeCases := []int64{
512 * 1024, // 512Kib
}
numSubsCases := []int{
5,
}
// Custom error handler that ignores ErrSlowConsumer.
// Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher
// than what the server can fan-out to subscribers.
ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) {
if errors.Is(err, nats.ErrSlowConsumer) {
// Swallow this error
} else {
_, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err)
}
}
for _, keyType := range keyTypeCases {
b.Run(
fmt.Sprintf("keyType=%s", keyType),
func(b *testing.B) {
for _, messageSize := range messageSizeCases {
b.Run(
fmt.Sprintf("msgSz=%db", messageSize),
func(b *testing.B) {
for _, numSubs := range numSubsCases {
b.Run(
fmt.Sprintf("subs=%d", numSubs),
func(b *testing.B) {
// Start server
configPath := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType)
server, _ := RunServerWithConfig(configPath)
defer server.Shutdown()
opts := []nats.Option{
nats.MaxReconnects(-1),
nats.ReconnectWait(0),
nats.ErrorHandler(ignoreSlowConsumerErrorHandler),
}
if keyType != "none" {
opts = append(opts, nats.Secure(&tls.Config{
InsecureSkipVerify: true,
}))
}
clientUrl := server.ClientURL()
// Count of messages received for by each subscriber
counters := make([]int, numSubs)
// Wait group for subscribers to signal they received b.N messages
var wg sync.WaitGroup
wg.Add(numSubs)
// Create subscribers
for i := 0; i < numSubs; i++ {
subIndex := i
ncSub, err := nats.Connect(clientUrl, opts...)
if err != nil {
b.Fatal(err)
}
defer ncSub.Close()
sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) {
counters[subIndex] += 1
if counters[subIndex] == b.N {
wg.Done()
}
})
if err != nil {
b.Fatalf("failed to subscribe: %s", err)
}
err = sub.SetPendingLimits(maxPendingMessages, maxPendingBytes)
if err != nil {
b.Fatalf("failed to set pending limits: %s", err)
}
defer sub.Unsubscribe()
if err != nil {
b.Fatal(err)
}
}
// publisher
ncPub, err := nats.Connect(clientUrl, opts...)
if err != nil {
b.Fatal(err)
}
defer ncPub.Close()
var errorCount = 0
// random bytes as payload
messageData := make([]byte, messageSize)
rand.Read(messageData)
quitCh := make(chan bool, 1)
publish := func() {
for {
select {
case <-quitCh:
return
default:
// continue publishing
}
err := ncPub.Publish(subject, messageData)
if err != nil {
errorCount += 1
}
}
}
// Set bytes per operation
b.SetBytes(messageSize)
// Start the clock
b.ResetTimer()
// Start publishing as fast as the server allows
go publish()
// Wait for all subscribers to have delivered b.N messages
wg.Wait()
// Stop the clock
b.StopTimer()
// Stop publisher
quitCh <- true
b.ReportMetric(float64(errorCount), "errors")
},
)
}
},
)
}
},
)
}
}