diff --git a/server/consumer.go b/server/consumer.go index 5dc19aa6..111c3dc4 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -211,6 +211,7 @@ var ( AckTerm = []byte("+TERM") ) +// Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { if consCfg.Replicas == 0 { if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { @@ -827,9 +828,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.updateInactiveThreshold(&o.cfg) if o.isPushMode() { - if !o.isDurable() { - // Check if we are not durable that the delivery subject has interest. - // Check in place here for interest. Will setup properly in setLeader. + // Check if we are running only 1 replica and that the delivery subject has interest. + // Check in place here for interest. Will setup properly in setLeader. + if config.replicas(&mset.cfg) == 1 { r := o.acc.sl.Match(o.cfg.DeliverSubject) if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) { // Let the interest come to us eventually, but setup delete timer. @@ -856,10 +857,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } // This is always true in single server mode. - o.mu.RLock() - isLdr := o.isLeader() - o.mu.RUnlock() - if isLdr { + if o.IsLeader() { // Send advisory. var suppress bool if !s.standAloneMode() && ca == nil { @@ -1053,6 +1051,7 @@ func (o *consumer) setLeader(isLeader bool) { if o.active = <-o.inch; o.active { o.checkQueueInterest() } + // Check gateways in case they are enabled. if s.gateway.enabled { if !o.active { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 31f06f08..26f55be9 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2682,3 +2682,91 @@ func TestJetStreamClusterUpdateConsumerShouldNotForceDeleteOnRestart(t *testing. _, err = js.ConsumerInfo("TEST", "D") require_NoError(t, err) } + +func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + for _, test := range []struct { + testName string + stream string + subject string + durable string + name string + policy nats.RetentionPolicy + }{ + {testName: "LimitsWithName", name: "eph", subject: "limeph", stream: "LIMIT_EPH", policy: nats.LimitsPolicy}, + {testName: "InterestWithDurable", durable: "eph", subject: "intdur", stream: "INT_DUR", policy: nats.InterestPolicy}, + {testName: "InterestWithName", name: "eph", subject: "inteph", stream: "INT_EPH", policy: nats.InterestPolicy}, + } { + t.Run(test.testName, func(t *testing.T) { + var err error + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err = js.AddStream(&nats.StreamConfig{ + Name: test.stream, + Subjects: []string{test.subject}, + Retention: test.policy, + Replicas: 3, + }) + require_NoError(t, err) + + const inactiveThreshold = time.Second + + _, err = js.AddConsumer(test.stream, &nats.ConsumerConfig{ + DeliverSubject: nats.NewInbox(), + AckPolicy: nats.AckExplicitPolicy, + InactiveThreshold: inactiveThreshold, + Durable: test.durable, + Name: test.name, + }) + require_NoError(t, err) + + name := test.durable + if test.durable == _EMPTY_ { + name = test.name + } + + const msgs = 10_000 + done, count := make(chan bool), 0 + + _, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) { + require_NoError(t, msg.Ack()) + count++ + if count >= msgs { + done <- true + } + }, nats.Bind(test.stream, name), nats.ManualAck()) + require_NoError(t, err) + + // This happens only if we start publishing messages after consumer was created. + go func() { + for i := 0; i < msgs; i++ { + js.PublishAsync(test.subject, []byte("DATA")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * inactiveThreshold): + } + }() + + // Wait for inactive threshold to expire and all messages to be published and received + // Bug is we clean up active consumers when we should not. + time.Sleep(3 * inactiveThreshold / 2) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive done signal") + } + + info, err := js.ConsumerInfo(test.stream, name) + if err != nil { + t.Fatalf("Expected to be able to retrieve consumer: %v", err) + } + require_True(t, info.Delivered.Stream == msgs) + }) + } +}