mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Miscellaneous JetStream benchmark improvements (#4595)
- [ ] Link to issue, e.g. `Resolves #NNN` - [ ] Documentation added (if applicable) - [ ] Tests added - [x] Branch rebased on top of current main (`git pull --rebase origin main`) - [ ] Changes squashed to a single commit (described [here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html)) - [ ] Build is green in Travis CI - [x] You have certified that the contribution is your original work and that you license the work to the project under the [Apache 2 license](https://github.com/nats-io/nats-server/blob/main/LICENSE) Resolves # ### Changes proposed in this pull request: Miscellaneous fixes and improvements to server JetStream benchmarks. Reviewers: notice the PR is broken down in 5 commit, each one is trivial to review individually, but they can be definitely squashed before merging for easier cherry-picking.
This commit is contained in:
@@ -29,11 +29,12 @@ import (
|
||||
func BenchmarkJetStreamConsume(b *testing.B) {
|
||||
|
||||
const (
|
||||
verbose = false
|
||||
streamName = "S"
|
||||
subject = "s"
|
||||
seed = 12345
|
||||
publishTimeout = 30 * time.Second
|
||||
verbose = false
|
||||
streamName = "S"
|
||||
subject = "s"
|
||||
seed = 12345
|
||||
publishTimeout = 30 * time.Second
|
||||
PublishBatchSize = 10000
|
||||
)
|
||||
|
||||
runSyncPushConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string) (int, int, int) {
|
||||
@@ -81,7 +82,6 @@ func BenchmarkJetStreamConsume(b *testing.B) {
|
||||
|
||||
uniqueConsumed++
|
||||
bitset.set(index, true)
|
||||
b.SetBytes(int64(len(msg.Data)))
|
||||
|
||||
if verbose && uniqueConsumed%1000 == 0 {
|
||||
b.Logf("Consumed: %d/%d", bitset.count(), b.N)
|
||||
@@ -127,7 +127,6 @@ func BenchmarkJetStreamConsume(b *testing.B) {
|
||||
|
||||
uniqueConsumed++
|
||||
bitset.set(index, true)
|
||||
b.SetBytes(int64(len(msg.Data)))
|
||||
|
||||
if uniqueConsumed == b.N {
|
||||
msg.Sub.Unsubscribe()
|
||||
@@ -223,7 +222,6 @@ func BenchmarkJetStreamConsume(b *testing.B) {
|
||||
|
||||
uniqueConsumed++
|
||||
bitset.set(index, true)
|
||||
b.SetBytes(int64(len(msg.Data)))
|
||||
|
||||
if uniqueConsumed == b.N {
|
||||
msg.Sub.Unsubscribe()
|
||||
@@ -307,23 +305,9 @@ func BenchmarkJetStreamConsume(b *testing.B) {
|
||||
if verbose {
|
||||
b.Logf("Setting up %d nodes", bc.clusterSize)
|
||||
}
|
||||
var (
|
||||
connectURL string
|
||||
cl *cluster
|
||||
)
|
||||
if bc.clusterSize == 1 {
|
||||
s := RunBasicJetStreamServer(b)
|
||||
defer s.Shutdown()
|
||||
connectURL = s.ClientURL()
|
||||
} else {
|
||||
cl = createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize)
|
||||
defer cl.shutdown()
|
||||
cl.waitOnClusterReadyWithNumPeers(bc.clusterSize)
|
||||
cl.waitOnLeader()
|
||||
connectURL = cl.leader().ClientURL()
|
||||
}
|
||||
|
||||
nc, js := jsClientConnectURL(b, connectURL)
|
||||
cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize)
|
||||
defer shutdown()
|
||||
defer nc.Close()
|
||||
|
||||
if verbose {
|
||||
@@ -338,34 +322,38 @@ func BenchmarkJetStreamConsume(b *testing.B) {
|
||||
b.Fatalf("Error creating stream: %v", err)
|
||||
}
|
||||
|
||||
// cluster_size > 1, connect to stream leader
|
||||
if cl != nil {
|
||||
connectURL = cl.streamLeader("$G", streamName).ClientURL()
|
||||
// If replicated resource, connect to stream leader for lower variability
|
||||
if bc.replicas > 1 {
|
||||
connectURL := cl.streamLeader("$G", streamName).ClientURL()
|
||||
nc.Close()
|
||||
_, js = jsClientConnectURL(b, connectURL)
|
||||
}
|
||||
|
||||
rng := rand.New(rand.NewSource(int64(seed)))
|
||||
message := make([]byte, bc.messageSize)
|
||||
publishedCount := 0
|
||||
for publishedCount < b.N {
|
||||
|
||||
// Publish b.N messages to the stream (in batches)
|
||||
for i := 1; i <= b.N; i++ {
|
||||
rng.Read(message)
|
||||
_, err := js.PublishAsync(subject, message)
|
||||
if err != nil {
|
||||
continue
|
||||
} else {
|
||||
publishedCount++
|
||||
b.Fatalf("Failed to publish: %s", err)
|
||||
}
|
||||
// Limit outstanding published messages to PublishBatchSize
|
||||
if i%PublishBatchSize == 0 || i == b.N {
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
if verbose {
|
||||
b.Logf("Published %d/%d messages", i, b.N)
|
||||
}
|
||||
case <-time.After(publishTimeout):
|
||||
b.Fatalf("Publish timed out")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
if verbose {
|
||||
b.Logf("Published %d messages", b.N)
|
||||
}
|
||||
case <-time.After(publishTimeout):
|
||||
b.Fatalf("Publish timed out")
|
||||
}
|
||||
// Set size of each operation, for throughput calculation
|
||||
b.SetBytes(int64(bc.messageSize))
|
||||
|
||||
// Discard time spent during setup
|
||||
// Consumer may reset again further in
|
||||
@@ -437,7 +425,6 @@ func BenchmarkJetStreamPublish(b *testing.B) {
|
||||
errors++
|
||||
} else {
|
||||
published++
|
||||
b.SetBytes(int64(messageSize))
|
||||
}
|
||||
|
||||
if verbose && i%1000 == 0 {
|
||||
@@ -454,61 +441,54 @@ func BenchmarkJetStreamPublish(b *testing.B) {
|
||||
const publishCompleteMaxWait = 30 * time.Second
|
||||
rng := rand.New(rand.NewSource(int64(seed)))
|
||||
message := make([]byte, messageSize)
|
||||
pending := make([]nats.PubAckFuture, 0, asyncWindow)
|
||||
|
||||
published, errors := 0, 0
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 1; i <= b.N; i++ {
|
||||
rng.Read(message) // TODO may skip this?
|
||||
subject := subjects[rng.Intn(len(subjects))]
|
||||
pubAckFuture, err := js.PublishAsync(subject, message)
|
||||
if err != nil {
|
||||
errors++
|
||||
continue
|
||||
}
|
||||
pending = append(pending, pubAckFuture)
|
||||
for published < b.N {
|
||||
|
||||
// Regularly trim the list of pending
|
||||
if i%asyncWindow == 0 {
|
||||
newPending := make([]nats.PubAckFuture, 0, asyncWindow)
|
||||
for _, pubAckFuture := range pending {
|
||||
select {
|
||||
case <-pubAckFuture.Ok():
|
||||
published++
|
||||
b.SetBytes(int64(messageSize))
|
||||
case <-pubAckFuture.Err():
|
||||
errors++
|
||||
default:
|
||||
// This pubAck is still pending, keep it
|
||||
newPending = append(newPending, pubAckFuture)
|
||||
}
|
||||
// Normally publish a full batch (of size `asyncWindow`)
|
||||
publishBatchSize := asyncWindow
|
||||
// Unless fewer are left to complete the benchmark
|
||||
if b.N-published < asyncWindow {
|
||||
publishBatchSize = b.N - published
|
||||
}
|
||||
|
||||
pending := make([]nats.PubAckFuture, 0, publishBatchSize)
|
||||
|
||||
for i := 0; i < publishBatchSize; i++ {
|
||||
rng.Read(message) // TODO may skip this?
|
||||
subject := subjects[rng.Intn(len(subjects))]
|
||||
pubAckFuture, err := js.PublishAsync(subject, message)
|
||||
if err != nil {
|
||||
errors++
|
||||
continue
|
||||
}
|
||||
pending = newPending
|
||||
pending = append(pending, pubAckFuture)
|
||||
}
|
||||
|
||||
if verbose && i%1000 == 0 {
|
||||
b.Logf("Published %d/%d, %d errors", i, b.N, errors)
|
||||
}
|
||||
}
|
||||
|
||||
// All published, wait for completed
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(publishCompleteMaxWait):
|
||||
b.Fatalf("Publish timed out")
|
||||
}
|
||||
|
||||
// Clear whatever is left pending
|
||||
for _, pubAckFuture := range pending {
|
||||
// All in this batch published, wait for completed
|
||||
select {
|
||||
case <-pubAckFuture.Ok():
|
||||
published++
|
||||
b.SetBytes(int64(messageSize))
|
||||
case <-pubAckFuture.Err():
|
||||
errors++
|
||||
default:
|
||||
b.Fatalf("PubAck is still pending after publish completed")
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(publishCompleteMaxWait):
|
||||
b.Fatalf("Publish timed out")
|
||||
}
|
||||
|
||||
// Verify one by one if they were published successfully
|
||||
for _, pubAckFuture := range pending {
|
||||
select {
|
||||
case <-pubAckFuture.Ok():
|
||||
published++
|
||||
case <-pubAckFuture.Err():
|
||||
errors++
|
||||
default:
|
||||
b.Fatalf("PubAck is still pending after publish completed")
|
||||
}
|
||||
}
|
||||
|
||||
if verbose {
|
||||
b.Logf("Published %d/%d", published, b.N)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -569,11 +549,6 @@ func BenchmarkJetStreamPublish(b *testing.B) {
|
||||
b.Run(
|
||||
name,
|
||||
func(b *testing.B) {
|
||||
// Skip short runs, benchmark gets re-executed with a larger N
|
||||
if b.N < bc.minMessages {
|
||||
b.ResetTimer()
|
||||
return
|
||||
}
|
||||
|
||||
subjects := make([]string, bc.numSubjects)
|
||||
for i := 0; i < bc.numSubjects; i++ {
|
||||
@@ -587,27 +562,9 @@ func BenchmarkJetStreamPublish(b *testing.B) {
|
||||
if verbose {
|
||||
b.Logf("Setting up %d nodes", bc.clusterSize)
|
||||
}
|
||||
var (
|
||||
connectURL string
|
||||
cl *cluster
|
||||
)
|
||||
|
||||
if bc.clusterSize == 1 {
|
||||
s := RunBasicJetStreamServer(b)
|
||||
defer s.Shutdown()
|
||||
connectURL = s.ClientURL()
|
||||
} else {
|
||||
cl = createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize)
|
||||
defer cl.shutdown()
|
||||
cl.waitOnClusterReadyWithNumPeers(bc.clusterSize)
|
||||
cl.waitOnLeader()
|
||||
connectURL = cl.leader().ClientURL()
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(connectURL)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
cl, _, shutdown, nc, _ := startJSClusterAndConnect(b, bc.clusterSize)
|
||||
defer shutdown()
|
||||
defer nc.Close()
|
||||
|
||||
jsOpts := []nats.JSOpt{
|
||||
@@ -635,9 +592,9 @@ func BenchmarkJetStreamPublish(b *testing.B) {
|
||||
b.Fatalf("Error creating stream: %v", err)
|
||||
}
|
||||
|
||||
// cluster_size > 1, connect to stream leader
|
||||
if cl != nil {
|
||||
connectURL = cl.streamLeader("$G", streamName).ClientURL()
|
||||
// If replicated resource, connect to stream leader for lower variability
|
||||
if bc.replicas > 1 {
|
||||
connectURL := cl.streamLeader("$G", streamName).ClientURL()
|
||||
nc.Close()
|
||||
nc, err = nats.Connect(connectURL)
|
||||
if err != nil {
|
||||
@@ -648,13 +605,14 @@ func BenchmarkJetStreamPublish(b *testing.B) {
|
||||
if err != nil {
|
||||
b.Fatalf("Unexpected error getting JetStream context for stream leader: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if verbose {
|
||||
b.Logf("Running %v publisher with message size: %dB", pc.pType, bc.messageSize)
|
||||
}
|
||||
|
||||
b.SetBytes(int64(bc.messageSize))
|
||||
|
||||
// Benchmark starts here
|
||||
b.ResetTimer()
|
||||
|
||||
@@ -747,47 +705,6 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
|
||||
},
|
||||
}
|
||||
|
||||
// Helper: Stand up in-process single node or cluster
|
||||
setupCluster := func(b *testing.B, clusterSize int) (string, func()) {
|
||||
var connectURL string
|
||||
var shutdownFunc func()
|
||||
|
||||
if clusterSize == 1 {
|
||||
s := RunBasicJetStreamServer(b)
|
||||
shutdownFunc = s.Shutdown
|
||||
connectURL = s.ClientURL()
|
||||
} else {
|
||||
cl := createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize)
|
||||
shutdownFunc = cl.shutdown
|
||||
cl.waitOnClusterReadyWithNumPeers(clusterSize)
|
||||
cl.waitOnLeader()
|
||||
connectURL = cl.leader().ClientURL()
|
||||
}
|
||||
|
||||
return connectURL, shutdownFunc
|
||||
}
|
||||
|
||||
// Helper: Create the stream
|
||||
setupStream := func(b *testing.B, connectURL string, streamConfig *nats.StreamConfig) {
|
||||
// Connect
|
||||
nc, err := nats.Connect(connectURL)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
jsOpts := []nats.JSOpt{}
|
||||
|
||||
js, err := nc.JetStream(jsOpts...)
|
||||
if err != nil {
|
||||
b.Fatalf("Unexpected error getting JetStream context: %v", err)
|
||||
}
|
||||
|
||||
if _, err := js.AddStream(streamConfig); err != nil {
|
||||
b.Fatalf("Error creating stream: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Context shared by publishers routines
|
||||
type PublishersContext = struct {
|
||||
readyWg sync.WaitGroup
|
||||
@@ -873,12 +790,6 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
|
||||
b.Run(
|
||||
limitDescription,
|
||||
func(b *testing.B) {
|
||||
// Stop timer during setup
|
||||
b.StopTimer()
|
||||
b.ResetTimer()
|
||||
|
||||
// Set per-iteration bytes to calculate throughput (a.k.a. speed)
|
||||
b.SetBytes(messageSize)
|
||||
|
||||
// Print benchmark parameters
|
||||
if verbose {
|
||||
@@ -892,8 +803,9 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
|
||||
}
|
||||
|
||||
// Setup server or cluster
|
||||
connectURL, shutdownFunc := setupCluster(b, benchmarkCase.clusterSize)
|
||||
defer shutdownFunc()
|
||||
cl, ls, shutdown, nc, js := startJSClusterAndConnect(b, benchmarkCase.clusterSize)
|
||||
defer shutdown()
|
||||
defer nc.Close()
|
||||
|
||||
// Common stream configuration
|
||||
streamConfig := &nats.StreamConfig{
|
||||
@@ -906,8 +818,11 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
|
||||
}
|
||||
// Configure stream limit
|
||||
limitConfigFunc(streamConfig)
|
||||
|
||||
// Create stream
|
||||
setupStream(b, connectURL, streamConfig)
|
||||
if _, err := js.AddStream(streamConfig); err != nil {
|
||||
b.Fatalf("Error creating stream: %v", err)
|
||||
}
|
||||
|
||||
// Set up publishers shared context
|
||||
var pubCtx PublishersContext
|
||||
@@ -918,6 +833,12 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
|
||||
pubCtx.lock.Lock()
|
||||
pubCtx.messagesLeft = b.N
|
||||
|
||||
connectURL := ls.ClientURL()
|
||||
// If replicated resource, connect to stream leader for lower variability
|
||||
if benchmarkCase.replicas > 1 {
|
||||
connectURL = cl.streamLeader("$G", "S").ClientURL()
|
||||
}
|
||||
|
||||
// Spawn publishers routines, each with its own connection and JS context
|
||||
for i := 0; i < numPublishers; i++ {
|
||||
nc, err := nats.Connect(connectURL)
|
||||
@@ -935,8 +856,11 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
|
||||
// Wait for all publishers to be ready
|
||||
pubCtx.readyWg.Wait()
|
||||
|
||||
// Set size of each operation, for throughput calculation
|
||||
b.SetBytes(messageSize)
|
||||
|
||||
// Benchmark starts here
|
||||
b.StartTimer()
|
||||
b.ResetTimer()
|
||||
|
||||
// Unblock the publishers
|
||||
pubCtx.lock.Unlock()
|
||||
@@ -971,7 +895,6 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
kvName = "BUCKET"
|
||||
keyPrefix = "K_"
|
||||
seed = 12345
|
||||
minOps = 1_000
|
||||
)
|
||||
|
||||
runKVGet := func(b *testing.B, kv nats.KeyValue, keys []string) int {
|
||||
@@ -982,14 +905,12 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
|
||||
for i := 1; i <= b.N; i++ {
|
||||
key := keys[rng.Intn(len(keys))]
|
||||
kve, err := kv.Get(key)
|
||||
_, err := kv.Get(key)
|
||||
if err != nil {
|
||||
errors++
|
||||
continue
|
||||
}
|
||||
|
||||
b.SetBytes(int64(len(kve.Value())))
|
||||
|
||||
if verbose && i%1000 == 0 {
|
||||
b.Logf("Completed %d/%d Get ops", i, b.N)
|
||||
}
|
||||
@@ -1015,8 +936,6 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
continue
|
||||
}
|
||||
|
||||
b.SetBytes(int64(valueSize))
|
||||
|
||||
if verbose && i%1000 == 0 {
|
||||
b.Logf("Completed %d/%d Put ops", i, b.N)
|
||||
}
|
||||
@@ -1049,8 +968,6 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
continue
|
||||
}
|
||||
|
||||
b.SetBytes(int64(valueSize))
|
||||
|
||||
if verbose && i%1000 == 0 {
|
||||
b.Logf("Completed %d/%d Update ops", i, b.N)
|
||||
}
|
||||
@@ -1104,11 +1021,6 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
b.Run(
|
||||
wName,
|
||||
func(b *testing.B) {
|
||||
// Skip short runs, benchmark gets re-executed with a larger N
|
||||
if b.N < minOps {
|
||||
b.ResetTimer()
|
||||
return
|
||||
}
|
||||
|
||||
if verbose {
|
||||
b.Logf("Running %s workload %s with %d messages", wName, bName, b.N)
|
||||
@@ -1117,24 +1029,6 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
if verbose {
|
||||
b.Logf("Setting up %d nodes", bc.clusterSize)
|
||||
}
|
||||
var (
|
||||
connectURL string
|
||||
cl *cluster
|
||||
)
|
||||
if bc.clusterSize == 1 {
|
||||
s := RunBasicJetStreamServer(b)
|
||||
defer s.Shutdown()
|
||||
connectURL = s.ClientURL()
|
||||
} else {
|
||||
cl = createJetStreamClusterExplicit(b, "BENCH_KV", bc.clusterSize)
|
||||
defer cl.shutdown()
|
||||
cl.waitOnClusterReadyWithNumPeers(bc.clusterSize)
|
||||
cl.waitOnLeader()
|
||||
connectURL = cl.leader().ClientURL()
|
||||
}
|
||||
|
||||
nc, js := jsClientConnectURL(b, connectURL)
|
||||
defer nc.Close()
|
||||
|
||||
// Pre-generate all keys
|
||||
keys := make([]string, 0, bc.numKeys)
|
||||
@@ -1143,6 +1037,11 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
// Setup server or cluster
|
||||
cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize)
|
||||
defer shutdown()
|
||||
defer nc.Close()
|
||||
|
||||
// Create bucket
|
||||
if verbose {
|
||||
b.Logf("Creating KV %s with R=%d", kvName, bc.replicas)
|
||||
@@ -1167,18 +1066,22 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// if cluster_size > 1, connect to stream leader of bucket
|
||||
if cl != nil {
|
||||
// If replicated resource, connect to stream leader for lower variability
|
||||
if bc.replicas > 1 {
|
||||
nc.Close()
|
||||
|
||||
connectURL = cl.streamLeader("$G", fmt.Sprintf("KV_%s", kvName)).ClientURL()
|
||||
_, js = jsClientConnectURL(b, connectURL)
|
||||
connectURL := cl.streamLeader("$G", fmt.Sprintf("KV_%s", kvName)).ClientURL()
|
||||
nc, js = jsClientConnectURL(b, connectURL)
|
||||
defer nc.Close()
|
||||
}
|
||||
|
||||
kv, err = js.KeyValue(kv.Bucket())
|
||||
if err != nil {
|
||||
b.Fatalf("Error binding to KV: %v", err)
|
||||
}
|
||||
|
||||
// Set size of each operation, for throughput calculation
|
||||
b.SetBytes(int64(bc.valueSize))
|
||||
|
||||
// Discard time spent during setup
|
||||
// May reset again further in
|
||||
b.ResetTimer()
|
||||
@@ -1285,7 +1188,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
|
||||
minObjSz int
|
||||
maxObjSz int
|
||||
}{
|
||||
// TODO remove duplicates and fix comments
|
||||
{nats.MemoryStorage, 100, 1024, 102400}, // mem storage, 100 objects sized (1KB-100KB)
|
||||
{nats.MemoryStorage, 100, 102400, 1048576}, // mem storage, 100 objects sized (100KB-1MB)
|
||||
{nats.MemoryStorage, 1000, 10240, 102400}, // mem storage, 1k objects of various size (10KB - 100KB)
|
||||
@@ -1294,7 +1196,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
|
||||
{nats.FileStorage, 100, 102400, 1048576}, // file storage, 100 objects sized (100KB-1MB)
|
||||
{nats.FileStorage, 100, 1048576, 10485760}, // file storage, 100 objects sized (1MB-10MB)
|
||||
{nats.FileStorage, 10, 10485760, 104857600}, // file storage, 10 objects sized (10MB-100MB)
|
||||
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -1327,23 +1228,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
|
||||
if verbose {
|
||||
b.Logf("Setting up %d nodes", replicas)
|
||||
}
|
||||
var (
|
||||
connectURL string
|
||||
cl *cluster
|
||||
)
|
||||
if clusterSize == 1 {
|
||||
s := RunBasicJetStreamServer(b)
|
||||
defer s.Shutdown()
|
||||
connectURL = s.ClientURL()
|
||||
} else {
|
||||
cl = createJetStreamClusterExplicit(b, "BENCH_OBJ_STORE", clusterSize)
|
||||
defer cl.shutdown()
|
||||
cl.waitOnClusterReadyWithNumPeers(replicas)
|
||||
cl.waitOnLeader()
|
||||
// connect to leader and not replicas
|
||||
connectURL = cl.leader().ClientURL()
|
||||
}
|
||||
nc, js := jsClientConnectURL(b, connectURL)
|
||||
|
||||
// Setup server or cluster
|
||||
cl, _, shutdown, nc, js := startJSClusterAndConnect(b, clusterSize)
|
||||
defer shutdown()
|
||||
defer nc.Close()
|
||||
|
||||
// Initialize object store
|
||||
@@ -1360,10 +1248,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
|
||||
b.Fatalf("Error creating ObjectStore: %v", err)
|
||||
}
|
||||
|
||||
// if cluster_size > 1, connect to stream leader
|
||||
if cl != nil {
|
||||
// If replicated resource, connect to stream leader for lower variability
|
||||
if clusterSize > 1 {
|
||||
nc.Close()
|
||||
connectURL = cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL()
|
||||
connectURL := cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL()
|
||||
nc, js := jsClientConnectURL(b, connectURL)
|
||||
defer nc.Close()
|
||||
objStore, err = js.ObjectStore(objStoreName)
|
||||
@@ -1401,9 +1289,41 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to stand up a JS-enabled single server or cluster
|
||||
func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Server, shutdown func(), nc *nats.Conn, js nats.JetStreamContext) {
|
||||
b.Helper()
|
||||
var err error
|
||||
|
||||
if clusterSize == 1 {
|
||||
s = RunBasicJetStreamServer(b)
|
||||
shutdown = func() {
|
||||
s.Shutdown()
|
||||
}
|
||||
} else {
|
||||
c = createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize)
|
||||
c.waitOnClusterReadyWithNumPeers(clusterSize)
|
||||
c.waitOnLeader()
|
||||
s = c.leader()
|
||||
shutdown = func() {
|
||||
c.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
nc, err = nats.Connect(s.ClientURL())
|
||||
if err != nil {
|
||||
b.Fatalf("failed to connect: %s", err)
|
||||
}
|
||||
|
||||
js, err = nc.JetStream()
|
||||
if err != nil {
|
||||
b.Fatalf("failed to init jetstream: %s", err)
|
||||
}
|
||||
|
||||
return c, s, shutdown, nc, js
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user