Merge pull request #3884 from nats-io/fix-consumer-cleanup

[FIXED] Named push consumers with inactive thresholds deleted when still active.
This commit is contained in:
Derek Collison
2023-02-19 08:51:37 -07:00
committed by GitHub
2 changed files with 94 additions and 7 deletions

View File

@@ -211,6 +211,7 @@ var (
AckTerm = []byte("+TERM")
)
// Calculate accurate replicas for the consumer config with the parent stream config.
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy {
@@ -827,9 +828,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
o.updateInactiveThreshold(&o.cfg)
if o.isPushMode() {
if !o.isDurable() {
// Check if we are not durable that the delivery subject has interest.
// Check in place here for interest. Will setup properly in setLeader.
// Check if we are running only 1 replica and that the delivery subject has interest.
// Check in place here for interest. Will setup properly in setLeader.
if config.replicas(&mset.cfg) == 1 {
r := o.acc.sl.Match(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
// Let the interest come to us eventually, but setup delete timer.
@@ -856,10 +857,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
// This is always true in single server mode.
o.mu.RLock()
isLdr := o.isLeader()
o.mu.RUnlock()
if isLdr {
if o.IsLeader() {
// Send advisory.
var suppress bool
if !s.standAloneMode() && ca == nil {
@@ -1053,6 +1051,7 @@ func (o *consumer) setLeader(isLeader bool) {
if o.active = <-o.inch; o.active {
o.checkQueueInterest()
}
// Check gateways in case they are enabled.
if s.gateway.enabled {
if !o.active {

View File

@@ -2682,3 +2682,91 @@ func TestJetStreamClusterUpdateConsumerShouldNotForceDeleteOnRestart(t *testing.
_, err = js.ConsumerInfo("TEST", "D")
require_NoError(t, err)
}
func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
for _, test := range []struct {
testName string
stream string
subject string
durable string
name string
policy nats.RetentionPolicy
}{
{testName: "LimitsWithName", name: "eph", subject: "limeph", stream: "LIMIT_EPH", policy: nats.LimitsPolicy},
{testName: "InterestWithDurable", durable: "eph", subject: "intdur", stream: "INT_DUR", policy: nats.InterestPolicy},
{testName: "InterestWithName", name: "eph", subject: "inteph", stream: "INT_EPH", policy: nats.InterestPolicy},
} {
t.Run(test.testName, func(t *testing.T) {
var err error
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err = js.AddStream(&nats.StreamConfig{
Name: test.stream,
Subjects: []string{test.subject},
Retention: test.policy,
Replicas: 3,
})
require_NoError(t, err)
const inactiveThreshold = time.Second
_, err = js.AddConsumer(test.stream, &nats.ConsumerConfig{
DeliverSubject: nats.NewInbox(),
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: inactiveThreshold,
Durable: test.durable,
Name: test.name,
})
require_NoError(t, err)
name := test.durable
if test.durable == _EMPTY_ {
name = test.name
}
const msgs = 10_000
done, count := make(chan bool), 0
_, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) {
require_NoError(t, msg.Ack())
count++
if count >= msgs {
done <- true
}
}, nats.Bind(test.stream, name), nats.ManualAck())
require_NoError(t, err)
// This happens only if we start publishing messages after consumer was created.
go func() {
for i := 0; i < msgs; i++ {
js.PublishAsync(test.subject, []byte("DATA"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * inactiveThreshold):
}
}()
// Wait for inactive threshold to expire and all messages to be published and received
// Bug is we clean up active consumers when we should not.
time.Sleep(3 * inactiveThreshold / 2)
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive done signal")
}
info, err := js.ConsumerInfo(test.stream, name)
if err != nil {
t.Fatalf("Expected to be able to retrieve consumer: %v", err)
}
require_True(t, info.Delivered.Stream == msgs)
})
}
}