Improve setup of JS Consume benchmark

Handle error condition during stream setup that was resulting in failed 
runs.
This commit is contained in:
Marco Primi
2023-09-26 10:18:43 -07:00
parent 4c17eeb79e
commit 03aa44dc3d

View File

@@ -29,11 +29,12 @@ import (
func BenchmarkJetStreamConsume(b *testing.B) {
const (
verbose = false
streamName = "S"
subject = "s"
seed = 12345
publishTimeout = 30 * time.Second
verbose = false
streamName = "S"
subject = "s"
seed = 12345
publishTimeout = 30 * time.Second
PublishBatchSize = 10000
)
runSyncPushConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string) (int, int, int) {
@@ -347,24 +348,25 @@ func BenchmarkJetStreamConsume(b *testing.B) {
rng := rand.New(rand.NewSource(int64(seed)))
message := make([]byte, bc.messageSize)
publishedCount := 0
for publishedCount < b.N {
// Publish b.N messages to the stream (in batches)
for i := 1; i <= b.N; i++ {
rng.Read(message)
_, err := js.PublishAsync(subject, message)
if err != nil {
continue
} else {
publishedCount++
b.Fatalf("Failed to publish: %s", err)
}
}
select {
case <-js.PublishAsyncComplete():
if verbose {
b.Logf("Published %d messages", b.N)
// Limit outstanding published messages to PublishBatchSize
if i%PublishBatchSize == 0 || i == b.N {
select {
case <-js.PublishAsyncComplete():
if verbose {
b.Logf("Published %d/%d messages", i, b.N)
}
case <-time.After(publishTimeout):
b.Fatalf("Publish timed out")
}
}
case <-time.After(publishTimeout):
b.Fatalf("Publish timed out")
}
// Discard time spent during setup