Added push based observable no ack benchmark

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-09 19:08:13 -07:00
parent 5d5d5cbd60
commit 4698292d4f

View File

@@ -1511,6 +1511,63 @@ func Benchmark_JetStreamPubAsyncAck(b *testing.B) {
}
}
func Benchmark____JetStreamSubNoAck(b *testing.B) {
if b.N < 10000 {
return
}
s := RunBasicJetStreamServer()
defer s.Shutdown()
mname := "foo"
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: mname})
if err != nil {
b.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc, err := nats.Connect(s.ClientURL(), nats.NoReconnect())
if err != nil {
b.Fatalf("Failed to create client: %v", err)
}
defer nc.Close()
// Queue up messages.
for i := 0; i < b.N; i++ {
nc.Publish(mname, []byte("Hello World!"))
}
nc.Flush()
stats := mset.Stats()
if stats.Msgs != uint64(b.N) {
b.Fatalf("Expected %d messages, got %d", b.N, stats.Msgs)
}
total := int32(b.N)
received := int32(0)
done := make(chan bool)
deliverTo := "DM"
oname := "O"
nc.Subscribe(deliverTo, func(m *nats.Msg) {
// We only are done when we receive all, we could check for gaps too.
if atomic.AddInt32(&received, 1) >= total {
done <- true
}
})
nc.Flush()
b.ResetTimer()
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: deliverTo, Durable: oname, AckPolicy: server.AckNone, DeliverAll: true})
if err != nil {
b.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
<-done
b.StopTimer()
}
func benchJetStreamWorkersAndBatch(b *testing.B, numWorkers, batchSize int) {
// Avoid running at too low of numbers since that chews up memory and GC.
if b.N < numWorkers*batchSize {