Reset activity interval on catchup to default vs ramp up. Tweak test.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-08-08 09:45:40 -07:00
committed by Ivan Kozlovic
parent 906afccb8a
commit 06112d6885
2 changed files with 4 additions and 19 deletions

View File

@@ -6549,24 +6549,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
var sub *subscription
var err error
const maxActivityInterval = 10 * time.Second
const minActivityInterval = time.Second
activityInterval := minActivityInterval
const activityInterval = 10 * time.Second
notActive := time.NewTimer(activityInterval)
defer notActive.Stop()
var gotMsgs bool
getActivityInterval := func() time.Duration {
if gotMsgs || activityInterval == maxActivityInterval {
return maxActivityInterval
}
activityInterval *= 5
if activityInterval > maxActivityInterval {
activityInterval = maxActivityInterval
}
return activityInterval
}
defer func() {
if sub != nil {
s.sysUnsubscribe(sub)
@@ -6656,7 +6642,7 @@ RETRY:
default:
}
}
notActive.Reset(getActivityInterval())
notActive.Reset(activityInterval)
// Grab sync request again on failures.
if sreq == nil {
@@ -6707,8 +6693,7 @@ RETRY:
for qch, lch := n.QuitC(), n.LeadChangeC(); ; {
select {
case <-msgsQ.ch:
gotMsgs = true
notActive.Reset(getActivityInterval())
notActive.Reset(activityInterval)
mrecs := msgsQ.pop()

View File

@@ -5315,7 +5315,7 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) {
updateStream(t, nc, cfg)
// Wait for the stream to register the new replicas and have a leader.
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return err