From b5521053e663dd37a58714290455af09f826ce49 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 21 Apr 2021 08:47:55 -0700 Subject: [PATCH 1/3] Make sure to clean up ephemerals across a Gateway. Also report direct consumers in num consumers in stream info. Signed-off-by: Derek Collison --- server/consumer.go | 5 +++++ server/jetstream_cluster_test.go | 10 +++------- server/stream.go | 16 +++++++++++++++- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 6511fddd..d03250be 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -944,6 +944,11 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { } o.active = interest + // If the delete timer has already been set do not clear here and return. + if o.dtmr != nil && !o.isDurable() && !interest { + return true + } + // Stop and clear the delete timer always. stopAndClearTimer(&o.dtmr) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index b6350657..072337b8 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5015,13 +5015,9 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { } cons := mset.getConsumers()[0] cons.mu.Lock() - cons.dthresh = 10 * time.Millisecond + cons.dthresh = 1250 * time.Millisecond active := cons.active dtimerSet := cons.dtmr != nil - gwtimerSet := cons.gwdtmr != nil - if gwtimerSet { - cons.gwdtmr.Reset(cons.dthresh) - } deliver := cons.cfg.DeliverSubject cons.mu.Unlock() @@ -5043,7 +5039,7 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { // Now check that the stream S(n) is really removed and that // the consumer is gone for stream TEST(n). - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 25*time.Millisecond, func() error { // First, make sure that stream S(n) has disappeared. if _, err := js2.StreamInfo(test.sourceName); err == nil { return fmt.Errorf("Stream %q should no longer exist", test.sourceName) @@ -5053,7 +5049,7 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { return fmt.Errorf("Could not get stream info: %v", err) } if si.State.Consumers != 0 { - return fmt.Errorf("Expected %q stream to have 0 consumer, got %v", test.streamName, si.State.Consumers) + return fmt.Errorf("Expected %q stream to have 0 consumers, got %v", test.streamName, si.State.Consumers) } return nil }) diff --git a/server/stream.go b/server/stream.go index b90e4cfd..15a6f694 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3017,9 +3017,22 @@ func (mset *stream) state() StreamState { return mset.stateWithDetail(false) } +// Lock should be held +func (mset *stream) numDirectConsumers() (num int) { + // Consumers that are direct are not recorded at the store level. + for _, o := range mset.consumers { + o.mu.RLock() + if o.cfg.Direct { + num++ + } + o.mu.RUnlock() + } + return num +} + func (mset *stream) stateWithDetail(details bool) StreamState { mset.mu.RLock() - c, store := mset.client, mset.store + c, store, ndc := mset.client, mset.store, mset.numDirectConsumers() mset.mu.RUnlock() if c == nil || store == nil { return StreamState{} @@ -3029,6 +3042,7 @@ func (mset *stream) stateWithDetail(details bool) StreamState { if !details { state.Deleted = nil } + state.Consumers += ndc return state } From cc776302a0ff98ee7921ff114fe95f17d6293698 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 21 Apr 2021 08:48:49 -0700 Subject: [PATCH 2/3] 2.2.2-beta.11 Signed-off-by: Derek Collison --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index 56030699..0031b6b9 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.2-beta.10" + VERSION = "2.2.2-beta.11" // PROTO is the currently supported protocol. // 0 was the original From a301d3a892137f0ad1d6df29fa36ba0d55da6919 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 21 Apr 2021 09:40:04 -0700 Subject: [PATCH 3/3] Check direct consumers directly, stream state back to previous behavior Signed-off-by: Derek Collison --- server/jetstream_cluster_test.go | 8 ++------ server/stream.go | 7 ++++--- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 072337b8..2c53982d 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5044,12 +5044,8 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) { if _, err := js2.StreamInfo(test.sourceName); err == nil { return fmt.Errorf("Stream %q should no longer exist", test.sourceName) } - si, err := js.StreamInfo(test.streamName) - if err != nil { - return fmt.Errorf("Could not get stream info: %v", err) - } - if si.State.Consumers != 0 { - return fmt.Errorf("Expected %q stream to have 0 consumers, got %v", test.streamName, si.State.Consumers) + if ndc := mset.numDirectConsumers(); ndc != 0 { + return fmt.Errorf("Expected %q stream to have 0 consumers, got %v", test.streamName, ndc) } return nil }) diff --git a/server/stream.go b/server/stream.go index 15a6f694..f2ee192d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3017,8 +3017,10 @@ func (mset *stream) state() StreamState { return mset.stateWithDetail(false) } -// Lock should be held func (mset *stream) numDirectConsumers() (num int) { + mset.mu.RLock() + defer mset.mu.RUnlock() + // Consumers that are direct are not recorded at the store level. for _, o := range mset.consumers { o.mu.RLock() @@ -3032,7 +3034,7 @@ func (mset *stream) numDirectConsumers() (num int) { func (mset *stream) stateWithDetail(details bool) StreamState { mset.mu.RLock() - c, store, ndc := mset.client, mset.store, mset.numDirectConsumers() + c, store := mset.client, mset.store mset.mu.RUnlock() if c == nil || store == nil { return StreamState{} @@ -3042,7 +3044,6 @@ func (mset *stream) stateWithDetail(details bool) StreamState { if !details { state.Deleted = nil } - state.Consumers += ndc return state }