diff --git a/test/bench_test.go b/test/bench_test.go index ea30ac55..4c991d55 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -1511,6 +1511,63 @@ func Benchmark_JetStreamPubAsyncAck(b *testing.B) { } } +func Benchmark____JetStreamSubNoAck(b *testing.B) { + if b.N < 10000 { + return + } + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mname := "foo" + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname}) + if err != nil { + b.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc, err := nats.Connect(s.ClientURL(), nats.NoReconnect()) + if err != nil { + b.Fatalf("Failed to create client: %v", err) + } + defer nc.Close() + + // Queue up messages. + for i := 0; i < b.N; i++ { + nc.Publish(mname, []byte("Hello World!")) + } + nc.Flush() + + stats := mset.Stats() + if stats.Msgs != uint64(b.N) { + b.Fatalf("Expected %d messages, got %d", b.N, stats.Msgs) + } + + total := int32(b.N) + received := int32(0) + done := make(chan bool) + + deliverTo := "DM" + oname := "O" + + nc.Subscribe(deliverTo, func(m *nats.Msg) { + // We only are done when we receive all, we could check for gaps too. + if atomic.AddInt32(&received, 1) >= total { + done <- true + } + }) + nc.Flush() + + b.ResetTimer() + o, err := mset.AddObservable(&server.ObservableConfig{Delivery: deliverTo, Durable: oname, AckPolicy: server.AckNone, DeliverAll: true}) + if err != nil { + b.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + <-done + b.StopTimer() +} + func benchJetStreamWorkersAndBatch(b *testing.B, numWorkers, batchSize int) { // Avoid running at too low of numbers since that chews up memory and GC. if b.N < numWorkers*batchSize {