Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2022-11-16 12:12:33 -08:00
3 changed files with 111 additions and 16 deletions

View File

@@ -853,12 +853,19 @@ func (o *consumer) updateInactiveThreshold(cfg *ConsumerConfig) {
// Ephemerals will always have inactive thresholds.
if !o.isDurable() && cfg.InactiveThreshold <= 0 {
// Add in 1 sec of jitter above and beyond the default of 5s.
o.dthresh = JsDeleteWaitTimeDefault + time.Duration(rand.Int63n(1000))*time.Millisecond
o.dthresh = JsDeleteWaitTimeDefault + 100*time.Millisecond + time.Duration(rand.Int63n(900))*time.Millisecond
// Only stamp config with default sans jitter.
cfg.InactiveThreshold = JsDeleteWaitTimeDefault
} else if cfg.InactiveThreshold >= 0 {
} else if cfg.InactiveThreshold > 0 {
// Add in up to 1 sec of jitter if pull mode.
if o.isPullMode() {
o.dthresh = cfg.InactiveThreshold + 100*time.Millisecond + time.Duration(rand.Int63n(900))*time.Millisecond
} else {
o.dthresh = cfg.InactiveThreshold
}
} else if cfg.InactiveThreshold <= 0 {
// We accept InactiveThreshold be set to 0 (for durables)
o.dthresh = cfg.InactiveThreshold
o.dthresh = 0
}
}
@@ -1286,12 +1293,25 @@ func (o *consumer) deleteNotActive() {
return
}
} else {
// These need to keep firing so reset first.
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh)
// Pull mode.
elapsed := time.Since(o.waiting.last)
if elapsed <= o.cfg.InactiveThreshold {
// These need to keep firing so reset but use delta.
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh - elapsed)
} else {
o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() })
}
o.mu.Unlock()
return
}
// Check if we have had a request lately, or if we still have valid requests waiting.
if time.Since(o.waiting.last) <= o.dthresh || o.checkWaitingForInterest() {
// Check if we still have valid requests waiting.
if o.checkWaitingForInterest() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh)
} else {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
}
o.mu.Unlock()
return
}
@@ -1544,13 +1564,10 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// Set InactiveThreshold if changed.
if val := cfg.InactiveThreshold; val != o.cfg.InactiveThreshold {
o.updateInactiveThreshold(cfg)
// Clear and restart timer only if we are the leader.
if o.isLeader() {
stopAndClearTimer(&o.dtmr)
// Restart only if new value is > 0
if o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
}
stopAndClearTimer(&o.dtmr)
// Restart timer only if we are the leader.
if o.isLeader() && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
}
}
@@ -3067,6 +3084,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
// Grab the server lock to watch for server quit.
o.mu.RLock()
s := o.srv
hasInactiveThresh := o.cfg.InactiveThreshold > 0
o.mu.RUnlock()
for {
@@ -3079,6 +3097,10 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
ack.returnToPool()
}
o.ackMsgs.recycle(&acks)
// If we have an inactiveThreshold set, mark our activity.
if hasInactiveThresh {
o.suppressDeletion()
}
case <-qch:
return
case <-s.quitCh:
@@ -3087,6 +3109,20 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
}
}
// Suppress auto cleanup on ack activity of any kind.
func (o *consumer) suppressDeletion() {
o.mu.Lock()
defer o.mu.Unlock()
if o.isPushMode() && o.dtmr != nil {
// if dtmr is not nil we have started the countdown, simply reset to threshold.
o.dtmr.Reset(o.dthresh)
} else if o.isPullMode() {
// Pull mode always has timer running, just update last on waiting queue.
o.waiting.last = time.Now()
}
}
func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// On startup check to see if we are in a a reply situation where replay policy is not instant.
var (
@@ -3427,6 +3463,11 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
o.sendFlowControl()
}
// If pull mode and we have inactivity threshold, signaled by dthresh, update last activity.
if o.isPullMode() && o.dthresh > 0 {
o.waiting.last = time.Now()
}
// FIXME(dlc) - Capture errors?
o.updateDelivered(dseq, seq, dc, ts)

View File

@@ -1412,3 +1412,56 @@ func TestJetStreamClusterNoPanicOnStreamInfoWhenNoLeaderYet(t *testing.T) {
close(ch)
wg.Wait()
}
// Issue https://github.com/nats-io/nats-server/issues/3630
func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
n := 10
for i := 0; i < n; i++ {
sendStreamMsg(t, nc, "foo", "msg")
}
// Pull Consumer
sub, err := js.PullSubscribe("foo", "d", nats.InactiveThreshold(time.Second))
require_NoError(t, err)
fetchMsgs(t, sub, n/2, time.Second)
// Will wait for .5s.
time.Sleep(500 * time.Millisecond)
msgs := fetchMsgs(t, sub, n/2, time.Second)
if len(msgs) != n/2 {
t.Fatalf("Did not receive msgs: %d vs %d", len(msgs), n/2)
}
// Wait for .5s.
time.Sleep(500 * time.Millisecond)
msgs[0].Ack() // Ack
// Wait another .5s.
time.Sleep(500 * time.Millisecond)
msgs[1].Nak() // Nak
// Wait another .5s.
time.Sleep(500 * time.Millisecond)
msgs[2].Term() // Term
time.Sleep(500 * time.Millisecond)
msgs[3].InProgress() // WIP
// The above should have kept the consumer alive.
_, err = js.ConsumerInfo("TEST", "d")
require_NoError(t, err)
// Make sure it gets cleaned up.
time.Sleep(2 * time.Second)
_, err = js.ConsumerInfo("TEST", "d")
require_Error(t, err, nats.ErrConsumerNotFound)
}

View File

@@ -14634,7 +14634,8 @@ func TestJetStreamPullConsumerCrossAccountExpires(t *testing.T) {
return nil
})
// Now make sure the ephemeral goes away too.
checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
// Ephemerals have jitter by default of up to 1s.
checkFor(t, 6*time.Second, 10*time.Millisecond, func() error {
_, err := js.ConsumerInfo("PC", ci.Name)
if err == nil {
return fmt.Errorf("Consumer still present")