Merge pull request #1021 from nats-io/fix_acc_conn_update_timer

Fixed setting timer for account connection updates
This commit is contained in:
Ivan Kozlovic
2019-05-29 15:00:49 -06:00
committed by GitHub
2 changed files with 77 additions and 10 deletions

View File

@@ -44,8 +44,12 @@ const (
serverSubjectIndex = 2
accUpdateTokens = 5
accUpdateAccIndex = 2
defaultEventsHBItvl = 30 * time.Second
)
// FIXME(dlc) - make configurable.
var eventsHBInterval = defaultEventsHBItvl
// Used to send and receive messages from inside the server.
type internal struct {
account *Account
@@ -702,9 +706,6 @@ func (s *Server) sendLeafNodeConnect(a *Account) {
s.switchAccountToInterestMode(a.Name)
}
// FIXME(dlc) - make configurable.
const eventsHBInterval = 30 * time.Second
// sendAccConnsUpdate is called to send out our information on the
// account's local connections.
// Lock should be held on entry.
@@ -734,13 +735,13 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
// Set timer to fire again unless we are at zero.
a.mu.Lock()
if a.numLocalConnections() == 0 {
clearTimer(&a.etmr)
clearTimer(&a.ctmr)
} else {
// Check to see if we have an HB running and update.
if a.ctmr == nil {
a.etmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) })
a.ctmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) })
} else {
a.etmr.Reset(eventsHBInterval)
a.ctmr.Reset(eventsHBInterval)
}
}
a.mu.Unlock()

View File

@@ -22,6 +22,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -486,9 +487,9 @@ func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) {
acc, _ := sb.LookupAccount(pub)
// Make sure we have the timer running.
acc.mu.RLock()
etmr := acc.etmr
ctmr := acc.ctmr
acc.mu.RUnlock()
if etmr == nil {
if ctmr == nil {
t.Fatalf("Expected event timer for acc conns to be running")
}
@@ -528,9 +529,9 @@ func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) {
// Make sure we have the timer is NOT running.
acc.mu.RLock()
etmr = acc.etmr
ctmr = acc.ctmr
acc.mu.RUnlock()
if etmr != nil {
if ctmr != nil {
t.Fatalf("Expected event timer for acc conns to NOT be running after reaching zero local clients")
}
}
@@ -1304,3 +1305,68 @@ func TestFetchAccountRace(t *testing.T) {
t.Fatalf("B should be able to receive messages")
}
}
func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
eventsHBInterval = 50 * time.Millisecond
defer func() { eventsHBInterval = defaultEventsHBItvl }()
sa, _, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Normal Account
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10 // set any limit...
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, jwt)
// Listen for HB updates...
count := int32(0)
cb := func(sub *subscription, subject, reply string, msg []byte) {
atomic.AddInt32(&count, 1)
}
subj := fmt.Sprintf(accConnsEventSubj, pub)
sub, err := sa.sysSubscribe(subj, cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
}
defer sa.sysUnsubscribe(sub)
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc := natsConnect(t, url, createUserCreds(t, sb, akp))
defer nc.Close()
time.Sleep(500 * time.Millisecond)
// After waiting 500ms with HB interval of 50ms, we should get
// about 10 updates, no much more
if n := atomic.LoadInt32(&count); n > 15 {
t.Fatalf("Expected about 10 updates, got %v", n)
}
// Now lookup the account doing the events on sb.
acc, _ := sb.LookupAccount(pub)
// Make sure we have the timer running.
acc.mu.RLock()
ctmr := acc.ctmr
acc.mu.RUnlock()
if ctmr == nil {
t.Fatalf("Expected event timer for acc conns to be running")
}
nc.Close()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
// Make sure we have the timer is NOT running.
acc.mu.RLock()
ctmr = acc.ctmr
acc.mu.RUnlock()
if ctmr != nil {
return fmt.Errorf("Expected event timer for acc conns to NOT be running after reaching zero local clients")
}
return nil
})
}