diff --git a/server/configs/tls/tls-ed25519.conf b/server/configs/tls/tls-ed25519.conf index 2f10f7dd..dcb8fd94 100644 --- a/server/configs/tls/tls-ed25519.conf +++ b/server/configs/tls/tls-ed25519.conf @@ -1,6 +1,6 @@ # Simple TLS (ed25519) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-ed25519.pem" diff --git a/server/configs/tls/tls-none.conf b/server/configs/tls/tls-none.conf index c6cae62d..042bf4e0 100644 --- a/server/configs/tls/tls-none.conf +++ b/server/configs/tls/tls-none.conf @@ -1,4 +1,4 @@ # Simple config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 diff --git a/server/configs/tls/tls-rsa-1024.conf b/server/configs/tls/tls-rsa-1024.conf index a5032fda..fb3aaa41 100644 --- a/server/configs/tls/tls-rsa-1024.conf +++ b/server/configs/tls/tls-rsa-1024.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-1024) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-1024.pem" diff --git a/server/configs/tls/tls-rsa-2048.conf b/server/configs/tls/tls-rsa-2048.conf index 9e33b7d8..08f54a25 100644 --- a/server/configs/tls/tls-rsa-2048.conf +++ b/server/configs/tls/tls-rsa-2048.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-2048) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-2048.pem" diff --git a/server/configs/tls/tls-rsa-4096.conf b/server/configs/tls/tls-rsa-4096.conf index a64ede8f..68ad841b 100644 --- a/server/configs/tls/tls-rsa-4096.conf +++ b/server/configs/tls/tls-rsa-4096.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-4096) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-4096.pem" diff --git a/server/core_benchmarks_test.go b/server/core_benchmarks_test.go index 560cd149..1ecb594e 100644 --- a/server/core_benchmarks_test.go +++ b/server/core_benchmarks_test.go @@ -16,102 +16,236 @@ package server import ( "crypto/rand" "crypto/tls" + "errors" "fmt" + "os" + "sync" "testing" "time" "github.com/nats-io/nats.go" ) -func BenchmarkRequestReplyOverEncryptedConnection(b *testing.B) { +func BenchmarkCoreRequestReply(b *testing.B) { const ( - subject = "test-subject" - configsBasePath = "./configs/tls" + subject = "test-subject" ) - // default TLS client connection options - defaultOpts := []nats.Option{} - - keyTypes := []string{ - "none", - "ed25519", - "rsa-1024", - "rsa-2048", - "rsa-4096", - } - payloadSzs := []int64{ + messageSizes := []int64{ 1024, // 1kb 4096, // 4kb 40960, // 40kb 409600, // 400kb } - for _, keyType := range keyTypes { - schemeConfig := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType) - b.Run(fmt.Sprintf("keyType=%s", keyType), func(b *testing.B) { - for _, payloadSz := range payloadSzs { - b.Run(fmt.Sprintf("payloadSz=%db", payloadSz), func(b *testing.B) { + for _, messageSize := range messageSizes { + b.Run(fmt.Sprintf("msgSz=%db", messageSize), func(b *testing.B) { - // run server with tls scheme - server, _ := RunServerWithConfig(schemeConfig) - opts := defaultOpts - defer server.Shutdown() + // Start server + serverOpts := DefaultOptions() + server := RunServer(serverOpts) + defer server.Shutdown() - if keyType != "none" { - opts = append(opts, nats.Secure(&tls.Config{ - InsecureSkipVerify: true, - })) - } + clientUrl := server.ClientURL() - // default client url - clientUrl := server.ClientURL() - - // subscriber - ncSub, err := nats.Connect(clientUrl, opts...) - if err != nil { - b.Fatal(err) - } - defer ncSub.Close() - sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { - // Responder echoes the request payload as-is - msg.Respond(msg.Data) - }) - defer sub.Unsubscribe() - if err != nil { - b.Fatal(err) - } - - // publisher - ncPub, err := nats.Connect(clientUrl, opts...) - if err != nil { - b.Fatal(err) - } - defer ncPub.Close() - - var errors = 0 - - // random bytes as payload - b.SetBytes(payloadSz) - payload := make([]byte, payloadSz) - rand.Read(payload) - - // start benchmark - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _, err := ncPub.Request(subject, payload, time.Second) - if err != nil { - errors++ - } - } - - // stop benchmark - b.StopTimer() - - b.ReportMetric(float64(errors), "errors") - }) + // Create "echo" subscriber + ncSub, err := nats.Connect(clientUrl) + if err != nil { + b.Fatal(err) } + defer ncSub.Close() + sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { + // Responder echoes the request payload as-is + msg.Respond(msg.Data) + }) + defer sub.Unsubscribe() + if err != nil { + b.Fatal(err) + } + + // Create publisher + ncPub, err := nats.Connect(clientUrl) + if err != nil { + b.Fatal(err) + } + defer ncPub.Close() + + var errors = 0 + + // Create message (reused for all requests) + messageData := make([]byte, messageSize) + b.SetBytes(messageSize) + rand.Read(messageData) + + // Benchmark + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := ncPub.Request(subject, messageData, time.Second) + if err != nil { + errors++ + } + } + b.StopTimer() + + b.ReportMetric(float64(errors), "errors") }) } - +} + +func BenchmarkCoreTLSFanOut(b *testing.B) { + const ( + subject = "test-subject" + configsBasePath = "./configs/tls" + maxPendingMessages = 25 + maxPendingBytes = 15 * 1024 * 1024 // 15MiB + ) + + keyTypeCases := []string{ + "none", + "ed25519", + "rsa-1024", + "rsa-2048", + "rsa-4096", + } + messageSizeCases := []int64{ + 512 * 1024, // 512Kib + } + numSubsCases := []int{ + 5, + } + + // Custom error handler that ignores ErrSlowConsumer. + // Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher + // than what the server can fan-out to subscribers. + ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) { + if errors.Is(err, nats.ErrSlowConsumer) { + // Swallow this error + } else { + _, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err) + } + } + + for _, keyType := range keyTypeCases { + + b.Run( + fmt.Sprintf("keyType=%s", keyType), + func(b *testing.B) { + + for _, messageSize := range messageSizeCases { + b.Run( + fmt.Sprintf("msgSz=%db", messageSize), + func(b *testing.B) { + + for _, numSubs := range numSubsCases { + b.Run( + fmt.Sprintf("subs=%d", numSubs), + func(b *testing.B) { + // Start server + configPath := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType) + server, _ := RunServerWithConfig(configPath) + defer server.Shutdown() + + opts := []nats.Option{ + nats.MaxReconnects(-1), + nats.ReconnectWait(0), + nats.ErrorHandler(ignoreSlowConsumerErrorHandler), + } + + if keyType != "none" { + opts = append(opts, nats.Secure(&tls.Config{ + InsecureSkipVerify: true, + })) + } + + clientUrl := server.ClientURL() + + // Count of messages received for by each subscriber + counters := make([]int, numSubs) + + // Wait group for subscribers to signal they received b.N messages + var wg sync.WaitGroup + wg.Add(numSubs) + + // Create subscribers + for i := 0; i < numSubs; i++ { + subIndex := i + ncSub, err := nats.Connect(clientUrl, opts...) + if err != nil { + b.Fatal(err) + } + defer ncSub.Close() + sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { + counters[subIndex] += 1 + if counters[subIndex] == b.N { + wg.Done() + } + }) + if err != nil { + b.Fatalf("failed to subscribe: %s", err) + } + err = sub.SetPendingLimits(maxPendingMessages, maxPendingBytes) + if err != nil { + b.Fatalf("failed to set pending limits: %s", err) + } + defer sub.Unsubscribe() + if err != nil { + b.Fatal(err) + } + } + + // publisher + ncPub, err := nats.Connect(clientUrl, opts...) + if err != nil { + b.Fatal(err) + } + defer ncPub.Close() + + var errorCount = 0 + + // random bytes as payload + messageData := make([]byte, messageSize) + rand.Read(messageData) + + quitCh := make(chan bool, 1) + + publish := func() { + for { + select { + case <-quitCh: + return + default: + // continue publishing + } + + err := ncPub.Publish(subject, messageData) + if err != nil { + errorCount += 1 + } + } + } + + // Set bytes per operation + b.SetBytes(messageSize) + // Start the clock + b.ResetTimer() + // Start publishing as fast as the server allows + go publish() + // Wait for all subscribers to have delivered b.N messages + wg.Wait() + // Stop the clock + b.StopTimer() + + // Stop publisher + quitCh <- true + + b.ReportMetric(float64(errorCount), "errors") + }, + ) + } + }, + ) + } + }, + ) + } }