From e1080966019dfa0edb0227c9b162e3ded59fd947 Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Tue, 26 Sep 2023 10:20:02 -0700 Subject: [PATCH] Improve JS asynchronous publish benchmark Simplify logic and make sure no more than `asyncWindow` messages are ever in-flight --- server/jetstream_benchmark_test.go | 86 ++++++++++++++---------------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index ec1bd76f..9fee6011 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -456,61 +456,55 @@ func BenchmarkJetStreamPublish(b *testing.B) { const publishCompleteMaxWait = 30 * time.Second rng := rand.New(rand.NewSource(int64(seed))) message := make([]byte, messageSize) - pending := make([]nats.PubAckFuture, 0, asyncWindow) + published, errors := 0, 0 + b.SetBytes(int64(messageSize)) b.ResetTimer() - for i := 1; i <= b.N; i++ { - rng.Read(message) // TODO may skip this? - subject := subjects[rng.Intn(len(subjects))] - pubAckFuture, err := js.PublishAsync(subject, message) - if err != nil { - errors++ - continue - } - pending = append(pending, pubAckFuture) + for published < b.N { - // Regularly trim the list of pending - if i%asyncWindow == 0 { - newPending := make([]nats.PubAckFuture, 0, asyncWindow) - for _, pubAckFuture := range pending { - select { - case <-pubAckFuture.Ok(): - published++ - b.SetBytes(int64(messageSize)) - case <-pubAckFuture.Err(): - errors++ - default: - // This pubAck is still pending, keep it - newPending = append(newPending, pubAckFuture) - } + // Normally publish a full batch (of size `asyncWindow`) + publishBatchSize := asyncWindow + // Unless fewer are left to complete the benchmark + if b.N-published < asyncWindow { + publishBatchSize = b.N - published + } + + pending := make([]nats.PubAckFuture, 0, publishBatchSize) + + for i := 0; i < publishBatchSize; i++ { + rng.Read(message) // TODO may skip this? + subject := subjects[rng.Intn(len(subjects))] + pubAckFuture, err := js.PublishAsync(subject, message) + if err != nil { + errors++ + continue } - pending = newPending + pending = append(pending, pubAckFuture) } - if verbose && i%1000 == 0 { - b.Logf("Published %d/%d, %d errors", i, b.N, errors) - } - } - - // All published, wait for completed - select { - case <-js.PublishAsyncComplete(): - case <-time.After(publishCompleteMaxWait): - b.Fatalf("Publish timed out") - } - - // Clear whatever is left pending - for _, pubAckFuture := range pending { + // All in this batch published, wait for completed select { - case <-pubAckFuture.Ok(): - published++ - b.SetBytes(int64(messageSize)) - case <-pubAckFuture.Err(): - errors++ - default: - b.Fatalf("PubAck is still pending after publish completed") + case <-js.PublishAsyncComplete(): + case <-time.After(publishCompleteMaxWait): + b.Fatalf("Publish timed out") + } + + // Verify one by one if they were published successfully + for _, pubAckFuture := range pending { + select { + case <-pubAckFuture.Ok(): + published++ + case <-pubAckFuture.Err(): + errors++ + default: + b.Fatalf("PubAck is still pending after publish completed") + } + } + + if verbose { + b.Logf("Published %d/%d", published, b.N) } }