Add in support for inactivity thresholds for durable consumers.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-06-14 06:51:00 -07:00
parent a830015703
commit 4c8110c3ff
3 changed files with 103 additions and 35 deletions

View File

@@ -745,14 +745,13 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name)
// If not durable determine the inactive threshold.
if !o.isDurable() {
if o.cfg.InactiveThreshold != 0 {
o.dthresh = o.cfg.InactiveThreshold
} else {
// Add in 1 sec of jitter above and beyond the default of 5s.
o.dthresh = JsDeleteWaitTimeDefault + time.Duration(rand.Int63n(1000))*time.Millisecond
}
// If the user has set the inactive threshold, set that up here.
if o.cfg.InactiveThreshold > 0 {
o.dthresh = o.cfg.InactiveThreshold
} else if !o.isDurable() {
// Ephemerals will always have inactive thresholds.
// Add in 1 sec of jitter above and beyond the default of 5s.
o.dthresh = JsDeleteWaitTimeDefault + time.Duration(rand.Int63n(1000))*time.Millisecond
}
if o.isPushMode() {
@@ -953,11 +952,11 @@ func (o *consumer) setLeader(isLeader bool) {
stopAndClearTimer(&o.gwdtmr)
o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() })
}
} else if !o.isDurable() {
// Ephemeral pull consumer. We run the dtmr all the time for this one.
if o.dtmr != nil {
stopAndClearTimer(&o.dtmr)
}
}
if o.dthresh > 0 && (o.isPullMode() || !o.active) {
// Pull consumer. We run the dtmr all the time for this one.
stopAndClearTimer(&o.dtmr)
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
}
@@ -1200,9 +1199,9 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
// Stop and clear the delete timer always.
stopAndClearTimer(&o.dtmr)
// If we do not have interest anymore and we are not durable start
// If we do not have interest anymore and have a delete threshold set, then set
// a timer to delete us. We wait for a bit in case of server reconnect.
if !o.isDurable() && !interest {
if !interest && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
return true
}
@@ -4017,25 +4016,6 @@ func validFilteredSubject(filteredSubject string, subjects []string) bool {
return false
}
// setInActiveDeleteThreshold sets the delete threshold for how long to wait
// before deleting an inactive ephemeral consumer.
func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.isDurable() {
return fmt.Errorf("consumer is not ephemeral")
}
deleteWasRunning := o.dtmr != nil
stopAndClearTimer(&o.dtmr)
// Do not add jitter if set via here.
o.dthresh = dthresh
if deleteWasRunning {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
}
return nil
}
// switchToEphemeral is called on startup when recovering ephemerals.
func (o *consumer) switchToEphemeral() {
o.mu.Lock()

View File

@@ -1345,3 +1345,19 @@ func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
}
return resp.StreamInfo
}
// setInActiveDeleteThreshold sets the delete threshold for how long to wait
// before deleting an inactive consumer.
func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error {
o.mu.Lock()
defer o.mu.Unlock()
deleteWasRunning := o.dtmr != nil
stopAndClearTimer(&o.dtmr)
// Do not add jitter if set via here.
o.dthresh = dthresh
if deleteWasRunning {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
}
return nil
}

View File

@@ -5372,7 +5372,6 @@ func TestJetStreamConsumerInactiveNoDeadlock(t *testing.T) {
// the internal sendq.
sub.Unsubscribe()
nc.Flush()
})
}
}
@@ -17839,6 +17838,79 @@ func TestJetStreamDirectMsgGet(t *testing.T) {
require_True(t, m.Header.Get("Description") == "Message Not Found")
}
func TestJetStreamConsumerInactiveThreshold(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
require_NoError(t, err)
for i := 0; i < 10; i++ {
js.PublishAsync("foo", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
waitOnCleanup := func(ci *nats.ConsumerInfo) {
t.Helper()
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
_, err := js.ConsumerInfo(ci.Stream, ci.Name)
if err == nil {
return fmt.Errorf("Consumer still present")
}
return nil
})
}
// Test to make sure inactive threshold is enforced for all types.
// Ephemeral and Durable, both push and pull.
// Ephemeral Push (no bind to deliver subject)
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverSubject: "_no_bind_",
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Ephemeral Pull
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Support InactiveThresholds for Durables as well.
// Durable Push (no bind to deliver subject)
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "d1",
DeliverSubject: "_no_bind_",
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
// Durable Pull
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "d2",
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 50 * time.Millisecond,
})
require_NoError(t, err)
waitOnCleanup(ci)
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////