mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Add fan-in/out benchmarks (#4660)
Benchmarks for NATS core fan-in and fan-out pattern workloads. Signed-off-by: Reuben Ninan <reuben@nats.io>
This commit is contained in:
@@ -19,6 +19,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -116,7 +118,7 @@ func BenchmarkCoreTLSFanOut(b *testing.B) {
|
||||
|
||||
// Custom error handler that ignores ErrSlowConsumer.
|
||||
// Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher
|
||||
// than what the server can fan-out to subscribers.
|
||||
// than what the server can relay to subscribers.
|
||||
ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) {
|
||||
if errors.Is(err, nats.ErrSlowConsumer) {
|
||||
// Swallow this error
|
||||
@@ -249,3 +251,317 @@ func BenchmarkCoreTLSFanOut(b *testing.B) {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCoreFanOut(b *testing.B) {
|
||||
const (
|
||||
subject = "test-subject"
|
||||
maxPendingMessages = 25
|
||||
maxPendingBytes = 15 * 1024 * 1024 // 15MiB
|
||||
)
|
||||
|
||||
messageSizeCases := []int64{
|
||||
100, // 100B
|
||||
1024, // 1KiB
|
||||
10240, // 10KiB
|
||||
512 * 1024, // 512KiB
|
||||
}
|
||||
numSubsCases := []int{
|
||||
3,
|
||||
5,
|
||||
10,
|
||||
}
|
||||
|
||||
// Custom error handler that ignores ErrSlowConsumer.
|
||||
// Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher
|
||||
// than what the server can relay to subscribers.
|
||||
ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) {
|
||||
if errors.Is(err, nats.ErrSlowConsumer) {
|
||||
// Swallow this error
|
||||
} else {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, messageSize := range messageSizeCases {
|
||||
b.Run(
|
||||
fmt.Sprintf("msgSz=%db", messageSize),
|
||||
func(b *testing.B) {
|
||||
for _, numSubs := range numSubsCases {
|
||||
b.Run(
|
||||
fmt.Sprintf("subs=%d", numSubs),
|
||||
func(b *testing.B) {
|
||||
// Start server
|
||||
defaultOpts := DefaultOptions()
|
||||
server := RunServer(defaultOpts)
|
||||
defer server.Shutdown()
|
||||
|
||||
opts := []nats.Option{
|
||||
nats.MaxReconnects(-1),
|
||||
nats.ReconnectWait(0),
|
||||
nats.ErrorHandler(ignoreSlowConsumerErrorHandler),
|
||||
}
|
||||
|
||||
clientUrl := server.ClientURL()
|
||||
|
||||
// Count of messages received for by each subscriber
|
||||
counters := make([]int, numSubs)
|
||||
|
||||
// Wait group for subscribers to signal they received b.N messages
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numSubs)
|
||||
|
||||
// Create subscribers
|
||||
for i := 0; i < numSubs; i++ {
|
||||
subIndex := i
|
||||
ncSub, err := nats.Connect(clientUrl, opts...)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer ncSub.Close()
|
||||
sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) {
|
||||
counters[subIndex] += 1
|
||||
if counters[subIndex] == b.N {
|
||||
wg.Done()
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
b.Fatalf("failed to subscribe: %s", err)
|
||||
}
|
||||
err = sub.SetPendingLimits(maxPendingMessages, maxPendingBytes)
|
||||
if err != nil {
|
||||
b.Fatalf("failed to set pending limits: %s", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// publisher
|
||||
ncPub, err := nats.Connect(clientUrl, opts...)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer ncPub.Close()
|
||||
|
||||
var errorCount = 0
|
||||
|
||||
// random bytes as payload
|
||||
messageData := make([]byte, messageSize)
|
||||
rand.Read(messageData)
|
||||
|
||||
quitCh := make(chan bool, 1)
|
||||
|
||||
publish := func() {
|
||||
for {
|
||||
select {
|
||||
case <-quitCh:
|
||||
return
|
||||
default:
|
||||
// continue publishing
|
||||
}
|
||||
|
||||
err := ncPub.Publish(subject, messageData)
|
||||
if err != nil {
|
||||
errorCount += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set bytes per operation
|
||||
b.SetBytes(messageSize)
|
||||
// Start the clock
|
||||
b.ResetTimer()
|
||||
// Start publishing as fast as the server allows
|
||||
go publish()
|
||||
// Wait for all subscribers to have delivered b.N messages
|
||||
wg.Wait()
|
||||
// Stop the clock
|
||||
b.StopTimer()
|
||||
|
||||
// Stop publisher
|
||||
quitCh <- true
|
||||
|
||||
b.ReportMetric(float64(errorCount), "errors")
|
||||
},
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCoreFanIn(b *testing.B) {
|
||||
const (
|
||||
subjectBaseName = "test-subject"
|
||||
numPubs = 5
|
||||
)
|
||||
|
||||
messageSizeCases := []int64{
|
||||
100, // 100B
|
||||
1024, // 1KiB
|
||||
10240, // 10KiB
|
||||
512 * 1024, // 512KiB
|
||||
}
|
||||
numPubsCases := []int{
|
||||
3,
|
||||
5,
|
||||
10,
|
||||
}
|
||||
|
||||
// Custom error handler that ignores ErrSlowConsumer.
|
||||
// Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher
|
||||
// than what the server can relay to subscribers.
|
||||
ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) {
|
||||
if errors.Is(err, nats.ErrSlowConsumer) {
|
||||
// Swallow this error
|
||||
} else {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, messageSize := range messageSizeCases {
|
||||
b.Run(
|
||||
fmt.Sprintf("msgSz=%db", messageSize),
|
||||
func(b *testing.B) {
|
||||
for _, numPubs := range numPubsCases {
|
||||
b.Run(
|
||||
fmt.Sprintf("pubs=%d", numPubs),
|
||||
func(b *testing.B) {
|
||||
|
||||
// Start server
|
||||
defaultOpts := DefaultOptions()
|
||||
server := RunServer(defaultOpts)
|
||||
defer server.Shutdown()
|
||||
|
||||
opts := []nats.Option{
|
||||
nats.MaxReconnects(-1),
|
||||
nats.ReconnectWait(0),
|
||||
nats.ErrorHandler(ignoreSlowConsumerErrorHandler),
|
||||
}
|
||||
|
||||
clientUrl := server.ClientURL()
|
||||
|
||||
// start subscriber
|
||||
ncSub, err := nats.Connect(clientUrl, opts...)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer ncSub.Close()
|
||||
|
||||
// track publishing errors
|
||||
errors := make([]int, numPubs)
|
||||
// track messages received by each publisher
|
||||
counters := make([]int, numPubs)
|
||||
// quit signals for each publisher
|
||||
quitChs := make([]chan bool, numPubs)
|
||||
for i := range quitChs {
|
||||
quitChs[i] = make(chan bool, 1)
|
||||
}
|
||||
|
||||
// TODO rename
|
||||
completedPublishersCount := 0
|
||||
|
||||
var benchCompleteWg sync.WaitGroup
|
||||
benchCompleteWg.Add(1)
|
||||
|
||||
ncSub.Subscribe(fmt.Sprintf("%s.*", subjectBaseName), func(msg *nats.Msg) {
|
||||
// split subject by "." and get the publisher id
|
||||
pubIdx, err := strconv.Atoi(strings.Split(msg.Subject, ".")[1])
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
counters[pubIdx] += 1
|
||||
if counters[pubIdx] == b.N {
|
||||
completedPublishersCount++
|
||||
if completedPublishersCount == numPubs {
|
||||
benchCompleteWg.Done()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// random bytes as payload
|
||||
messageData := make([]byte, messageSize)
|
||||
rand.Read(messageData)
|
||||
|
||||
var publishersReadyWg sync.WaitGroup
|
||||
// waits for all publishers sub-routines and for main thread to be ready
|
||||
publishersReadyWg.Add(numPubs + 1)
|
||||
|
||||
// wait group to ensure all publishers have been torn down
|
||||
var finishedPublishersWg sync.WaitGroup
|
||||
finishedPublishersWg.Add(numPubs)
|
||||
|
||||
// create N publishers
|
||||
for i := 0; i < numPubs; i++ {
|
||||
// create publisher connection
|
||||
ncPub, err := nats.Connect(clientUrl, opts...)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer ncPub.Close()
|
||||
|
||||
go func(pubId int) {
|
||||
// signal that this publisher has been torn down
|
||||
defer finishedPublishersWg.Done()
|
||||
|
||||
subject := fmt.Sprintf("%s.%d", subjectBaseName, pubId)
|
||||
|
||||
// publisher successfully initialized
|
||||
publishersReadyWg.Done()
|
||||
|
||||
// wait till all other publishers are ready to start workload
|
||||
publishersReadyWg.Wait()
|
||||
|
||||
// publish until quitCh is closed
|
||||
for {
|
||||
select {
|
||||
case <-quitChs[pubId]:
|
||||
return
|
||||
default:
|
||||
// continue publishing
|
||||
}
|
||||
err := ncPub.Publish(subject, messageData)
|
||||
if err != nil {
|
||||
errors[pubId] += 1
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Set bytes per operation
|
||||
b.SetBytes(messageSize)
|
||||
// Main thread is ready
|
||||
publishersReadyWg.Done()
|
||||
// Wait till publishers are ready
|
||||
publishersReadyWg.Wait()
|
||||
|
||||
// Start the clock
|
||||
b.ResetTimer()
|
||||
// wait till termination cond reached
|
||||
benchCompleteWg.Wait()
|
||||
// Stop the clock
|
||||
b.StopTimer()
|
||||
|
||||
// send quit signal to all publishers
|
||||
for pubIdx := range quitChs {
|
||||
quitChs[pubIdx] <- true
|
||||
}
|
||||
// Wait for all publishers to shutdown
|
||||
finishedPublishersWg.Wait()
|
||||
|
||||
// sum errors from all publishers
|
||||
totalErrors := 0
|
||||
for _, err := range errors {
|
||||
totalErrors += err
|
||||
}
|
||||
|
||||
// report errors
|
||||
b.ReportMetric(float64(totalErrors), "errors")
|
||||
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user