Improve JS asynchronous publish benchmark

Simplify logic and make sure no more than `asyncWindow` messages are 
ever in-flight
This commit is contained in:
Marco Primi
2023-09-26 10:20:02 -07:00
parent 03aa44dc3d
commit e108096601

View File

@@ -456,61 +456,55 @@ func BenchmarkJetStreamPublish(b *testing.B) {
const publishCompleteMaxWait = 30 * time.Second
rng := rand.New(rand.NewSource(int64(seed)))
message := make([]byte, messageSize)
pending := make([]nats.PubAckFuture, 0, asyncWindow)
published, errors := 0, 0
b.SetBytes(int64(messageSize))
b.ResetTimer()
for i := 1; i <= b.N; i++ {
rng.Read(message) // TODO may skip this?
subject := subjects[rng.Intn(len(subjects))]
pubAckFuture, err := js.PublishAsync(subject, message)
if err != nil {
errors++
continue
}
pending = append(pending, pubAckFuture)
for published < b.N {
// Regularly trim the list of pending
if i%asyncWindow == 0 {
newPending := make([]nats.PubAckFuture, 0, asyncWindow)
for _, pubAckFuture := range pending {
select {
case <-pubAckFuture.Ok():
published++
b.SetBytes(int64(messageSize))
case <-pubAckFuture.Err():
errors++
default:
// This pubAck is still pending, keep it
newPending = append(newPending, pubAckFuture)
}
// Normally publish a full batch (of size `asyncWindow`)
publishBatchSize := asyncWindow
// Unless fewer are left to complete the benchmark
if b.N-published < asyncWindow {
publishBatchSize = b.N - published
}
pending := make([]nats.PubAckFuture, 0, publishBatchSize)
for i := 0; i < publishBatchSize; i++ {
rng.Read(message) // TODO may skip this?
subject := subjects[rng.Intn(len(subjects))]
pubAckFuture, err := js.PublishAsync(subject, message)
if err != nil {
errors++
continue
}
pending = newPending
pending = append(pending, pubAckFuture)
}
if verbose && i%1000 == 0 {
b.Logf("Published %d/%d, %d errors", i, b.N, errors)
}
}
// All published, wait for completed
select {
case <-js.PublishAsyncComplete():
case <-time.After(publishCompleteMaxWait):
b.Fatalf("Publish timed out")
}
// Clear whatever is left pending
for _, pubAckFuture := range pending {
// All in this batch published, wait for completed
select {
case <-pubAckFuture.Ok():
published++
b.SetBytes(int64(messageSize))
case <-pubAckFuture.Err():
errors++
default:
b.Fatalf("PubAck is still pending after publish completed")
case <-js.PublishAsyncComplete():
case <-time.After(publishCompleteMaxWait):
b.Fatalf("Publish timed out")
}
// Verify one by one if they were published successfully
for _, pubAckFuture := range pending {
select {
case <-pubAckFuture.Ok():
published++
case <-pubAckFuture.Err():
errors++
default:
b.Fatalf("PubAck is still pending after publish completed")
}
}
if verbose {
b.Logf("Published %d/%d", published, b.N)
}
}