mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2140 from nats-io/econsumer
Make sure that ephemerals are cleaned up across gateways.
This commit is contained in:
@@ -41,7 +41,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.2.2-beta.10"
|
||||
VERSION = "2.2.2-beta.11"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -944,6 +944,11 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
|
||||
}
|
||||
o.active = interest
|
||||
|
||||
// If the delete timer has already been set do not clear here and return.
|
||||
if o.dtmr != nil && !o.isDurable() && !interest {
|
||||
return true
|
||||
}
|
||||
|
||||
// Stop and clear the delete timer always.
|
||||
stopAndClearTimer(&o.dtmr)
|
||||
|
||||
|
||||
@@ -5015,13 +5015,9 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) {
|
||||
}
|
||||
cons := mset.getConsumers()[0]
|
||||
cons.mu.Lock()
|
||||
cons.dthresh = 10 * time.Millisecond
|
||||
cons.dthresh = 1250 * time.Millisecond
|
||||
active := cons.active
|
||||
dtimerSet := cons.dtmr != nil
|
||||
gwtimerSet := cons.gwdtmr != nil
|
||||
if gwtimerSet {
|
||||
cons.gwdtmr.Reset(cons.dthresh)
|
||||
}
|
||||
deliver := cons.cfg.DeliverSubject
|
||||
cons.mu.Unlock()
|
||||
|
||||
@@ -5043,17 +5039,13 @@ func TestJetStreamClusterSuperClusterEphemeralCleanup(t *testing.T) {
|
||||
|
||||
// Now check that the stream S(n) is really removed and that
|
||||
// the consumer is gone for stream TEST(n).
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
|
||||
// First, make sure that stream S(n) has disappeared.
|
||||
if _, err := js2.StreamInfo(test.sourceName); err == nil {
|
||||
return fmt.Errorf("Stream %q should no longer exist", test.sourceName)
|
||||
}
|
||||
si, err := js.StreamInfo(test.streamName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not get stream info: %v", err)
|
||||
}
|
||||
if si.State.Consumers != 0 {
|
||||
return fmt.Errorf("Expected %q stream to have 0 consumer, got %v", test.streamName, si.State.Consumers)
|
||||
if ndc := mset.numDirectConsumers(); ndc != 0 {
|
||||
return fmt.Errorf("Expected %q stream to have 0 consumers, got %v", test.streamName, ndc)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -3017,6 +3017,21 @@ func (mset *stream) state() StreamState {
|
||||
return mset.stateWithDetail(false)
|
||||
}
|
||||
|
||||
func (mset *stream) numDirectConsumers() (num int) {
|
||||
mset.mu.RLock()
|
||||
defer mset.mu.RUnlock()
|
||||
|
||||
// Consumers that are direct are not recorded at the store level.
|
||||
for _, o := range mset.consumers {
|
||||
o.mu.RLock()
|
||||
if o.cfg.Direct {
|
||||
num++
|
||||
}
|
||||
o.mu.RUnlock()
|
||||
}
|
||||
return num
|
||||
}
|
||||
|
||||
func (mset *stream) stateWithDetail(details bool) StreamState {
|
||||
mset.mu.RLock()
|
||||
c, store := mset.client, mset.store
|
||||
|
||||
Reference in New Issue
Block a user