mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
@@ -628,7 +628,7 @@ func TestJetStreamConsumerMaxDeliveries(t *testing.T) {
|
||||
|
||||
// Wait for redeliveries to pile up.
|
||||
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxDeliver {
|
||||
if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != maxDeliver {
|
||||
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, maxDeliver)
|
||||
}
|
||||
return nil
|
||||
@@ -15197,6 +15197,90 @@ func TestJetStreamRestoreBadStream(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamConsumerAckSampling(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "dlc",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
FilterSubject: "foo",
|
||||
SampleFrequency: "100%",
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
sub, err := js.PullSubscribe("foo", "dlc")
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.Publish("foo", []byte("Hello"))
|
||||
require_NoError(t, err)
|
||||
|
||||
msub, err := nc.SubscribeSync("$JS.EVENT.METRIC.>")
|
||||
require_NoError(t, err)
|
||||
|
||||
for _, m := range fetchMsgs(t, sub, 1, time.Second) {
|
||||
err = m.AckSync()
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
m, err := msub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
var am JSConsumerAckMetric
|
||||
err = json.Unmarshal(m.Data, &am)
|
||||
require_NoError(t, err)
|
||||
|
||||
if am.Stream != "TEST" || am.Consumer != "dlc" || am.ConsumerSeq != 1 {
|
||||
t.Fatalf("Not a proper ack metric: %+v", am)
|
||||
}
|
||||
|
||||
// Do less than 100%
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "alps",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
FilterSubject: "foo",
|
||||
SampleFrequency: "50%",
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
asub, err := js.PullSubscribe("foo", "alps")
|
||||
require_NoError(t, err)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err = js.Publish("foo", []byte("Hello"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
mp := 0
|
||||
for _, m := range fetchMsgs(t, asub, 100, time.Second) {
|
||||
err = m.Ack()
|
||||
require_NoError(t, err)
|
||||
mp++
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
if mp != 100 {
|
||||
t.Fatalf("Got only %d msgs out of %d", mp, 100)
|
||||
}
|
||||
|
||||
nmsgs, _, err := msub.Pending()
|
||||
require_NoError(t, err)
|
||||
|
||||
// Should be ~50
|
||||
if nmsgs < 35 || nmsgs > 65 {
|
||||
t.Fatalf("Expected about 50, got %d", nmsgs)
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Simple JetStream Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user