diff --git a/server/core_benchmarks_test.go b/server/core_benchmarks_test.go index 101d2038..1ecb594e 100644 --- a/server/core_benchmarks_test.go +++ b/server/core_benchmarks_test.go @@ -15,7 +15,11 @@ package server import ( "crypto/rand" + "crypto/tls" + "errors" "fmt" + "os" + "sync" "testing" "time" @@ -87,3 +91,161 @@ func BenchmarkCoreRequestReply(b *testing.B) { }) } } + +func BenchmarkCoreTLSFanOut(b *testing.B) { + const ( + subject = "test-subject" + configsBasePath = "./configs/tls" + maxPendingMessages = 25 + maxPendingBytes = 15 * 1024 * 1024 // 15MiB + ) + + keyTypeCases := []string{ + "none", + "ed25519", + "rsa-1024", + "rsa-2048", + "rsa-4096", + } + messageSizeCases := []int64{ + 512 * 1024, // 512Kib + } + numSubsCases := []int{ + 5, + } + + // Custom error handler that ignores ErrSlowConsumer. + // Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher + // than what the server can fan-out to subscribers. + ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) { + if errors.Is(err, nats.ErrSlowConsumer) { + // Swallow this error + } else { + _, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err) + } + } + + for _, keyType := range keyTypeCases { + + b.Run( + fmt.Sprintf("keyType=%s", keyType), + func(b *testing.B) { + + for _, messageSize := range messageSizeCases { + b.Run( + fmt.Sprintf("msgSz=%db", messageSize), + func(b *testing.B) { + + for _, numSubs := range numSubsCases { + b.Run( + fmt.Sprintf("subs=%d", numSubs), + func(b *testing.B) { + // Start server + configPath := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType) + server, _ := RunServerWithConfig(configPath) + defer server.Shutdown() + + opts := []nats.Option{ + nats.MaxReconnects(-1), + nats.ReconnectWait(0), + nats.ErrorHandler(ignoreSlowConsumerErrorHandler), + } + + if keyType != "none" { + opts = append(opts, nats.Secure(&tls.Config{ + InsecureSkipVerify: true, + })) + } + + clientUrl := server.ClientURL() + + // Count of messages received for by each subscriber + counters := make([]int, numSubs) + + // Wait group for subscribers to signal they received b.N messages + var wg sync.WaitGroup + wg.Add(numSubs) + + // Create subscribers + for i := 0; i < numSubs; i++ { + subIndex := i + ncSub, err := nats.Connect(clientUrl, opts...) + if err != nil { + b.Fatal(err) + } + defer ncSub.Close() + sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { + counters[subIndex] += 1 + if counters[subIndex] == b.N { + wg.Done() + } + }) + if err != nil { + b.Fatalf("failed to subscribe: %s", err) + } + err = sub.SetPendingLimits(maxPendingMessages, maxPendingBytes) + if err != nil { + b.Fatalf("failed to set pending limits: %s", err) + } + defer sub.Unsubscribe() + if err != nil { + b.Fatal(err) + } + } + + // publisher + ncPub, err := nats.Connect(clientUrl, opts...) + if err != nil { + b.Fatal(err) + } + defer ncPub.Close() + + var errorCount = 0 + + // random bytes as payload + messageData := make([]byte, messageSize) + rand.Read(messageData) + + quitCh := make(chan bool, 1) + + publish := func() { + for { + select { + case <-quitCh: + return + default: + // continue publishing + } + + err := ncPub.Publish(subject, messageData) + if err != nil { + errorCount += 1 + } + } + } + + // Set bytes per operation + b.SetBytes(messageSize) + // Start the clock + b.ResetTimer() + // Start publishing as fast as the server allows + go publish() + // Wait for all subscribers to have delivered b.N messages + wg.Wait() + // Stop the clock + b.StopTimer() + + // Stop publisher + quitCh <- true + + b.ReportMetric(float64(errorCount), "errors") + }, + ) + } + }, + ) + } + }, + ) + } +}