mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
committed by
reubenninan
parent
1528434431
commit
524c1f544a
@@ -19,6 +19,8 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -116,7 +118,7 @@ func BenchmarkCoreTLSFanOut(b *testing.B) {
|
|||||||
|
|
||||||
// Custom error handler that ignores ErrSlowConsumer.
|
// Custom error handler that ignores ErrSlowConsumer.
|
||||||
// Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher
|
// Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher
|
||||||
// than what the server can fan-out to subscribers.
|
// than what the server can relay to subscribers.
|
||||||
ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) {
|
ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) {
|
||||||
if errors.Is(err, nats.ErrSlowConsumer) {
|
if errors.Is(err, nats.ErrSlowConsumer) {
|
||||||
// Swallow this error
|
// Swallow this error
|
||||||
@@ -249,3 +251,317 @@ func BenchmarkCoreTLSFanOut(b *testing.B) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkCoreFanOut(b *testing.B) {
|
||||||
|
const (
|
||||||
|
subject = "test-subject"
|
||||||
|
maxPendingMessages = 25
|
||||||
|
maxPendingBytes = 15 * 1024 * 1024 // 15MiB
|
||||||
|
)
|
||||||
|
|
||||||
|
messageSizeCases := []int64{
|
||||||
|
100, // 100B
|
||||||
|
1024, // 1KiB
|
||||||
|
10240, // 10KiB
|
||||||
|
512 * 1024, // 512KiB
|
||||||
|
}
|
||||||
|
numSubsCases := []int{
|
||||||
|
3,
|
||||||
|
5,
|
||||||
|
10,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Custom error handler that ignores ErrSlowConsumer.
|
||||||
|
// Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher
|
||||||
|
// than what the server can relay to subscribers.
|
||||||
|
ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) {
|
||||||
|
if errors.Is(err, nats.ErrSlowConsumer) {
|
||||||
|
// Swallow this error
|
||||||
|
} else {
|
||||||
|
_, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, messageSize := range messageSizeCases {
|
||||||
|
b.Run(
|
||||||
|
fmt.Sprintf("msgSz=%db", messageSize),
|
||||||
|
func(b *testing.B) {
|
||||||
|
for _, numSubs := range numSubsCases {
|
||||||
|
b.Run(
|
||||||
|
fmt.Sprintf("subs=%d", numSubs),
|
||||||
|
func(b *testing.B) {
|
||||||
|
// Start server
|
||||||
|
defaultOpts := DefaultOptions()
|
||||||
|
server := RunServer(defaultOpts)
|
||||||
|
defer server.Shutdown()
|
||||||
|
|
||||||
|
opts := []nats.Option{
|
||||||
|
nats.MaxReconnects(-1),
|
||||||
|
nats.ReconnectWait(0),
|
||||||
|
nats.ErrorHandler(ignoreSlowConsumerErrorHandler),
|
||||||
|
}
|
||||||
|
|
||||||
|
clientUrl := server.ClientURL()
|
||||||
|
|
||||||
|
// Count of messages received for by each subscriber
|
||||||
|
counters := make([]int, numSubs)
|
||||||
|
|
||||||
|
// Wait group for subscribers to signal they received b.N messages
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(numSubs)
|
||||||
|
|
||||||
|
// Create subscribers
|
||||||
|
for i := 0; i < numSubs; i++ {
|
||||||
|
subIndex := i
|
||||||
|
ncSub, err := nats.Connect(clientUrl, opts...)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
defer ncSub.Close()
|
||||||
|
sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) {
|
||||||
|
counters[subIndex] += 1
|
||||||
|
if counters[subIndex] == b.N {
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to subscribe: %s", err)
|
||||||
|
}
|
||||||
|
err = sub.SetPendingLimits(maxPendingMessages, maxPendingBytes)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to set pending limits: %s", err)
|
||||||
|
}
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// publisher
|
||||||
|
ncPub, err := nats.Connect(clientUrl, opts...)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
defer ncPub.Close()
|
||||||
|
|
||||||
|
var errorCount = 0
|
||||||
|
|
||||||
|
// random bytes as payload
|
||||||
|
messageData := make([]byte, messageSize)
|
||||||
|
rand.Read(messageData)
|
||||||
|
|
||||||
|
quitCh := make(chan bool, 1)
|
||||||
|
|
||||||
|
publish := func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-quitCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
// continue publishing
|
||||||
|
}
|
||||||
|
|
||||||
|
err := ncPub.Publish(subject, messageData)
|
||||||
|
if err != nil {
|
||||||
|
errorCount += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set bytes per operation
|
||||||
|
b.SetBytes(messageSize)
|
||||||
|
// Start the clock
|
||||||
|
b.ResetTimer()
|
||||||
|
// Start publishing as fast as the server allows
|
||||||
|
go publish()
|
||||||
|
// Wait for all subscribers to have delivered b.N messages
|
||||||
|
wg.Wait()
|
||||||
|
// Stop the clock
|
||||||
|
b.StopTimer()
|
||||||
|
|
||||||
|
// Stop publisher
|
||||||
|
quitCh <- true
|
||||||
|
|
||||||
|
b.ReportMetric(float64(errorCount), "errors")
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkCoreFanIn(b *testing.B) {
|
||||||
|
const (
|
||||||
|
subjectBaseName = "test-subject"
|
||||||
|
numPubs = 5
|
||||||
|
)
|
||||||
|
|
||||||
|
messageSizeCases := []int64{
|
||||||
|
100, // 100B
|
||||||
|
1024, // 1KiB
|
||||||
|
10240, // 10KiB
|
||||||
|
512 * 1024, // 512KiB
|
||||||
|
}
|
||||||
|
numPubsCases := []int{
|
||||||
|
3,
|
||||||
|
5,
|
||||||
|
10,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Custom error handler that ignores ErrSlowConsumer.
|
||||||
|
// Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher
|
||||||
|
// than what the server can relay to subscribers.
|
||||||
|
ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) {
|
||||||
|
if errors.Is(err, nats.ErrSlowConsumer) {
|
||||||
|
// Swallow this error
|
||||||
|
} else {
|
||||||
|
_, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, messageSize := range messageSizeCases {
|
||||||
|
b.Run(
|
||||||
|
fmt.Sprintf("msgSz=%db", messageSize),
|
||||||
|
func(b *testing.B) {
|
||||||
|
for _, numPubs := range numPubsCases {
|
||||||
|
b.Run(
|
||||||
|
fmt.Sprintf("pubs=%d", numPubs),
|
||||||
|
func(b *testing.B) {
|
||||||
|
|
||||||
|
// Start server
|
||||||
|
defaultOpts := DefaultOptions()
|
||||||
|
server := RunServer(defaultOpts)
|
||||||
|
defer server.Shutdown()
|
||||||
|
|
||||||
|
opts := []nats.Option{
|
||||||
|
nats.MaxReconnects(-1),
|
||||||
|
nats.ReconnectWait(0),
|
||||||
|
nats.ErrorHandler(ignoreSlowConsumerErrorHandler),
|
||||||
|
}
|
||||||
|
|
||||||
|
clientUrl := server.ClientURL()
|
||||||
|
|
||||||
|
// start subscriber
|
||||||
|
ncSub, err := nats.Connect(clientUrl, opts...)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
defer ncSub.Close()
|
||||||
|
|
||||||
|
// track publishing errors
|
||||||
|
errors := make([]int, numPubs)
|
||||||
|
// track messages received by each publisher
|
||||||
|
counters := make([]int, numPubs)
|
||||||
|
// quit signals for each publisher
|
||||||
|
quitChs := make([]chan bool, numPubs)
|
||||||
|
for i := range quitChs {
|
||||||
|
quitChs[i] = make(chan bool, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO rename
|
||||||
|
completedPublishersCount := 0
|
||||||
|
|
||||||
|
var benchCompleteWg sync.WaitGroup
|
||||||
|
benchCompleteWg.Add(1)
|
||||||
|
|
||||||
|
ncSub.Subscribe(fmt.Sprintf("%s.*", subjectBaseName), func(msg *nats.Msg) {
|
||||||
|
// split subject by "." and get the publisher id
|
||||||
|
pubIdx, err := strconv.Atoi(strings.Split(msg.Subject, ".")[1])
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
counters[pubIdx] += 1
|
||||||
|
if counters[pubIdx] == b.N {
|
||||||
|
completedPublishersCount++
|
||||||
|
if completedPublishersCount == numPubs {
|
||||||
|
benchCompleteWg.Done()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// random bytes as payload
|
||||||
|
messageData := make([]byte, messageSize)
|
||||||
|
rand.Read(messageData)
|
||||||
|
|
||||||
|
var publishersReadyWg sync.WaitGroup
|
||||||
|
// waits for all publishers sub-routines and for main thread to be ready
|
||||||
|
publishersReadyWg.Add(numPubs + 1)
|
||||||
|
|
||||||
|
// wait group to ensure all publishers have been torn down
|
||||||
|
var finishedPublishersWg sync.WaitGroup
|
||||||
|
finishedPublishersWg.Add(numPubs)
|
||||||
|
|
||||||
|
// create N publishers
|
||||||
|
for i := 0; i < numPubs; i++ {
|
||||||
|
// create publisher connection
|
||||||
|
ncPub, err := nats.Connect(clientUrl, opts...)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
defer ncPub.Close()
|
||||||
|
|
||||||
|
go func(pubId int) {
|
||||||
|
// signal that this publisher has been torn down
|
||||||
|
defer finishedPublishersWg.Done()
|
||||||
|
|
||||||
|
subject := fmt.Sprintf("%s.%d", subjectBaseName, pubId)
|
||||||
|
|
||||||
|
// publisher successfully initialized
|
||||||
|
publishersReadyWg.Done()
|
||||||
|
|
||||||
|
// wait till all other publishers are ready to start workload
|
||||||
|
publishersReadyWg.Wait()
|
||||||
|
|
||||||
|
// publish until quitCh is closed
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-quitChs[pubId]:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
// continue publishing
|
||||||
|
}
|
||||||
|
err := ncPub.Publish(subject, messageData)
|
||||||
|
if err != nil {
|
||||||
|
errors[pubId] += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set bytes per operation
|
||||||
|
b.SetBytes(messageSize)
|
||||||
|
// Main thread is ready
|
||||||
|
publishersReadyWg.Done()
|
||||||
|
// Wait till publishers are ready
|
||||||
|
publishersReadyWg.Wait()
|
||||||
|
|
||||||
|
// Start the clock
|
||||||
|
b.ResetTimer()
|
||||||
|
// wait till termination cond reached
|
||||||
|
benchCompleteWg.Wait()
|
||||||
|
// Stop the clock
|
||||||
|
b.StopTimer()
|
||||||
|
|
||||||
|
// send quit signal to all publishers
|
||||||
|
for pubIdx := range quitChs {
|
||||||
|
quitChs[pubIdx] <- true
|
||||||
|
}
|
||||||
|
// Wait for all publishers to shutdown
|
||||||
|
finishedPublishersWg.Wait()
|
||||||
|
|
||||||
|
// sum errors from all publishers
|
||||||
|
totalErrors := 0
|
||||||
|
for _, err := range errors {
|
||||||
|
totalErrors += err
|
||||||
|
}
|
||||||
|
|
||||||
|
// report errors
|
||||||
|
b.ReportMetric(float64(totalErrors), "errors")
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user