Test ack metrics

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-03-15 16:41:06 -07:00
parent 5c0d1999ff
commit 303bb93c18

View File

@@ -628,7 +628,7 @@ func TestJetStreamConsumerMaxDeliveries(t *testing.T) {
// Wait for redeliveries to pile up.
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxDeliver {
if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != maxDeliver {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, maxDeliver)
}
return nil
@@ -15197,6 +15197,90 @@ func TestJetStreamRestoreBadStream(t *testing.T) {
}
}
func TestJetStreamConsumerAckSampling(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dlc",
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: "foo",
SampleFrequency: "100%",
})
require_NoError(t, err)
sub, err := js.PullSubscribe("foo", "dlc")
require_NoError(t, err)
_, err = js.Publish("foo", []byte("Hello"))
require_NoError(t, err)
msub, err := nc.SubscribeSync("$JS.EVENT.METRIC.>")
require_NoError(t, err)
for _, m := range fetchMsgs(t, sub, 1, time.Second) {
err = m.AckSync()
require_NoError(t, err)
}
m, err := msub.NextMsg(time.Second)
require_NoError(t, err)
var am JSConsumerAckMetric
err = json.Unmarshal(m.Data, &am)
require_NoError(t, err)
if am.Stream != "TEST" || am.Consumer != "dlc" || am.ConsumerSeq != 1 {
t.Fatalf("Not a proper ack metric: %+v", am)
}
// Do less than 100%
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "alps",
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: "foo",
SampleFrequency: "50%",
})
require_NoError(t, err)
asub, err := js.PullSubscribe("foo", "alps")
require_NoError(t, err)
for i := 0; i < 100; i++ {
_, err = js.Publish("foo", []byte("Hello"))
require_NoError(t, err)
}
mp := 0
for _, m := range fetchMsgs(t, asub, 100, time.Second) {
err = m.Ack()
require_NoError(t, err)
mp++
}
nc.Flush()
if mp != 100 {
t.Fatalf("Got only %d msgs out of %d", mp, 100)
}
nmsgs, _, err := msub.Pending()
require_NoError(t, err)
// Should be ~50
if nmsgs < 35 || nmsgs > 65 {
t.Fatalf("Expected about 50, got %d", nmsgs)
}
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////