mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 11:04:42 -07:00
Merge pull request #2942 from boris-ilijic/js-con-sampling-issue-update-flow
Add failing test for updating JS Consumer with sampling option
This commit is contained in:
@@ -16079,6 +16079,62 @@ func TestJetStreamConsumerAckSampling(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamConsumerAckSamplingSpecifiedUsingUpdateConsumer(t *testing.T) {
|
||||
t.Skip("sampling msg is not sent when sampling option is part of update flow")
|
||||
|
||||
s := RunBasicJetStreamServer()
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "dlc",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
FilterSubject: "foo",
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "dlc",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
FilterSubject: "foo",
|
||||
SampleFrequency: "100%",
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
sub, err := js.PullSubscribe("foo", "dlc")
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.Publish("foo", []byte("Hello"))
|
||||
require_NoError(t, err)
|
||||
|
||||
msub, err := nc.SubscribeSync("$JS.EVENT.METRIC.>")
|
||||
require_NoError(t, err)
|
||||
|
||||
for _, m := range fetchMsgs(t, sub, 1, time.Second) {
|
||||
err = m.AckSync()
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
m, err := msub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
var am JSConsumerAckMetric
|
||||
err = json.Unmarshal(m.Data, &am)
|
||||
require_NoError(t, err)
|
||||
|
||||
if am.Stream != "TEST" || am.Consumer != "dlc" || am.ConsumerSeq != 1 {
|
||||
t.Fatalf("Not a proper ack metric: %+v", am)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamRemoveExternalSource(t *testing.T) {
|
||||
ho := DefaultTestOptions
|
||||
ho.Port = 4000 //-1
|
||||
|
||||
Reference in New Issue
Block a user