diff --git a/server/consumer.go b/server/consumer.go index 6511fddd..d03250be 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -944,6 +944,11 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { } o.active = interest + // If the delete timer has already been set do not clear here and return. + if o.dtmr != nil && !o.isDurable() && !interest { + return true + } + // Stop and clear the delete timer always. stopAndClearTimer(&o.dtmr) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index b6350657..072337b8 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5015,13 +5015,9 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { } cons := mset.getConsumers()[0] cons.mu.Lock() - cons.dthresh = 10 * time.Millisecond + cons.dthresh = 1250 * time.Millisecond active := cons.active dtimerSet := cons.dtmr != nil - gwtimerSet := cons.gwdtmr != nil - if gwtimerSet { - cons.gwdtmr.Reset(cons.dthresh) - } deliver := cons.cfg.DeliverSubject cons.mu.Unlock() @@ -5043,7 +5039,7 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { // Now check that the stream S(n) is really removed and that // the consumer is gone for stream TEST(n). - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 25*time.Millisecond, func() error { // First, make sure that stream S(n) has disappeared. if _, err := js2.StreamInfo(test.sourceName); err == nil { return fmt.Errorf("Stream %q should no longer exist", test.sourceName) @@ -5053,7 +5049,7 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { return fmt.Errorf("Could not get stream info: %v", err) } if si.State.Consumers != 0 { - return fmt.Errorf("Expected %q stream to have 0 consumer, got %v", test.streamName, si.State.Consumers) + return fmt.Errorf("Expected %q stream to have 0 consumers, got %v", test.streamName, si.State.Consumers) } return nil }) diff --git a/server/stream.go b/server/stream.go index b90e4cfd..15a6f694 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3017,9 +3017,22 @@ func (mset *stream) state() StreamState { return mset.stateWithDetail(false) } +// Lock should be held +func (mset *stream) numDirectConsumers() (num int) { + // Consumers that are direct are not recorded at the store level. + for _, o := range mset.consumers { + o.mu.RLock() + if o.cfg.Direct { + num++ + } + o.mu.RUnlock() + } + return num +} + func (mset *stream) stateWithDetail(details bool) StreamState { mset.mu.RLock() - c, store := mset.client, mset.store + c, store, ndc := mset.client, mset.store, mset.numDirectConsumers() mset.mu.RUnlock() if c == nil || store == nil { return StreamState{} @@ -3029,6 +3042,7 @@ func (mset *stream) stateWithDetail(details bool) StreamState { if !details { state.Deleted = nil } + state.Consumers += ndc return state }