diff --git a/server/const.go b/server/const.go index 56030699..0031b6b9 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.2-beta.10" + VERSION = "2.2.2-beta.11" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/consumer.go b/server/consumer.go index 6511fddd..d03250be 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -944,6 +944,11 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { } o.active = interest + // If the delete timer has already been set do not clear here and return. + if o.dtmr != nil && !o.isDurable() && !interest { + return true + } + // Stop and clear the delete timer always. stopAndClearTimer(&o.dtmr) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index b6350657..2c53982d 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5015,13 +5015,9 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { } cons := mset.getConsumers()[0] cons.mu.Lock() - cons.dthresh = 10 * time.Millisecond + cons.dthresh = 1250 * time.Millisecond active := cons.active dtimerSet := cons.dtmr != nil - gwtimerSet := cons.gwdtmr != nil - if gwtimerSet { - cons.gwdtmr.Reset(cons.dthresh) - } deliver := cons.cfg.DeliverSubject cons.mu.Unlock() @@ -5043,17 +5039,13 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { // Now check that the stream S(n) is really removed and that // the consumer is gone for stream TEST(n). - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 25*time.Millisecond, func() error { // First, make sure that stream S(n) has disappeared. if _, err := js2.StreamInfo(test.sourceName); err == nil { return fmt.Errorf("Stream %q should no longer exist", test.sourceName) } - si, err := js.StreamInfo(test.streamName) - if err != nil { - return fmt.Errorf("Could not get stream info: %v", err) - } - if si.State.Consumers != 0 { - return fmt.Errorf("Expected %q stream to have 0 consumer, got %v", test.streamName, si.State.Consumers) + if ndc := mset.numDirectConsumers(); ndc != 0 { + return fmt.Errorf("Expected %q stream to have 0 consumers, got %v", test.streamName, ndc) } return nil }) diff --git a/server/stream.go b/server/stream.go index b90e4cfd..f2ee192d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3017,6 +3017,21 @@ func (mset *stream) state() StreamState { return mset.stateWithDetail(false) } +func (mset *stream) numDirectConsumers() (num int) { + mset.mu.RLock() + defer mset.mu.RUnlock() + + // Consumers that are direct are not recorded at the store level. + for _, o := range mset.consumers { + o.mu.RLock() + if o.cfg.Direct { + num++ + } + o.mu.RUnlock() + } + return num +} + func (mset *stream) stateWithDetail(details bool) StreamState { mset.mu.RLock() c, store := mset.client, mset.store