mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Improve test for consumer with inactivity threshold
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
This commit is contained in:
@@ -2693,11 +2693,9 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) {
|
||||
subject string
|
||||
durable string
|
||||
name string
|
||||
policy nats.RetentionPolicy
|
||||
}{
|
||||
{testName: "LimitsWithName", name: "eph", subject: "limeph", stream: "LIMIT_EPH", policy: nats.LimitsPolicy},
|
||||
{testName: "InterestWithDurable", durable: "eph", subject: "intdur", stream: "INT_DUR", policy: nats.InterestPolicy},
|
||||
{testName: "InterestWithName", name: "eph", subject: "inteph", stream: "INT_EPH", policy: nats.InterestPolicy},
|
||||
{testName: "InterestWithDurable", durable: "eph", subject: "intdur", stream: "INT_DUR"},
|
||||
{testName: "InterestWithName", name: "eph", subject: "inteph", stream: "INT_EPH"},
|
||||
} {
|
||||
t.Run(test.testName, func(t *testing.T) {
|
||||
var err error
|
||||
@@ -2708,7 +2706,7 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) {
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: test.stream,
|
||||
Subjects: []string{test.subject},
|
||||
Retention: test.policy,
|
||||
Retention: nats.LimitsPolicy,
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
@@ -2732,7 +2730,7 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) {
|
||||
const msgs = 10_000
|
||||
done, count := make(chan bool), 0
|
||||
|
||||
_, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) {
|
||||
sub, err := js.Subscribe(_EMPTY_, func(msg *nats.Msg) {
|
||||
require_NoError(t, msg.Ack())
|
||||
count++
|
||||
if count >= msgs {
|
||||
@@ -2767,6 +2765,17 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) {
|
||||
t.Fatalf("Expected to be able to retrieve consumer: %v", err)
|
||||
}
|
||||
require_True(t, info.Delivered.Stream == msgs)
|
||||
|
||||
// Stop the subscription and remove the interest.
|
||||
err = sub.Unsubscribe()
|
||||
require_NoError(t, err)
|
||||
|
||||
// Now wait for interest inactivity threshold to kick in.
|
||||
time.Sleep(3 * inactiveThreshold / 2)
|
||||
|
||||
// Check if the consumer has been removed.
|
||||
_, err = js.ConsumerInfo(test.stream, name)
|
||||
require_Error(t, err, nats.ErrConsumerNotFound)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user