Make sure to clean up ephemerals across a Gateway.

Also report direct consumers in num consumers in stream info.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-04-21 08:47:55 -07:00
parent a9607573b3
commit b5521053e6
3 changed files with 23 additions and 8 deletions

View File

@@ -944,6 +944,11 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
}
o.active = interest
// If the delete timer has already been set do not clear here and return.
if o.dtmr != nil && !o.isDurable() && !interest {
return true
}
// Stop and clear the delete timer always.
stopAndClearTimer(&o.dtmr)

View File

@@ -5015,13 +5015,9 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) {
}
cons := mset.getConsumers()[0]
cons.mu.Lock()
cons.dthresh = 10 * time.Millisecond
cons.dthresh = 1250 * time.Millisecond
active := cons.active
dtimerSet := cons.dtmr != nil
gwtimerSet := cons.gwdtmr != nil
if gwtimerSet {
cons.gwdtmr.Reset(cons.dthresh)
}
deliver := cons.cfg.DeliverSubject
cons.mu.Unlock()
@@ -5043,7 +5039,7 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) {
// Now check that the stream S(n) is really removed and that
// the consumer is gone for stream TEST(n).
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
// First, make sure that stream S(n) has disappeared.
if _, err := js2.StreamInfo(test.sourceName); err == nil {
return fmt.Errorf("Stream %q should no longer exist", test.sourceName)
@@ -5053,7 +5049,7 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Consumers != 0 {
return fmt.Errorf("Expected %q stream to have 0 consumer, got %v", test.streamName, si.State.Consumers)
return fmt.Errorf("Expected %q stream to have 0 consumers, got %v", test.streamName, si.State.Consumers)
}
return nil
})

View File

@@ -3017,9 +3017,22 @@ func (mset *stream) state() StreamState {
return mset.stateWithDetail(false)
}
// Lock should be held
func (mset *stream) numDirectConsumers() (num int) {
// Consumers that are direct are not recorded at the store level.
for _, o := range mset.consumers {
o.mu.RLock()
if o.cfg.Direct {
num++
}
o.mu.RUnlock()
}
return num
}
func (mset *stream) stateWithDetail(details bool) StreamState {
mset.mu.RLock()
c, store := mset.client, mset.store
c, store, ndc := mset.client, mset.store, mset.numDirectConsumers()
mset.mu.RUnlock()
if c == nil || store == nil {
return StreamState{}
@@ -3029,6 +3042,7 @@ func (mset *stream) stateWithDetail(details bool) StreamState {
if !details {
state.Deleted = nil
}
state.Consumers += ndc
return state
}