mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Make sure trailing usage updates accounted for eventually
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -112,6 +112,7 @@ type jsAccount struct {
|
||||
// From server
|
||||
sendq chan *pubMsg
|
||||
lupdate time.Time
|
||||
utimer *time.Timer
|
||||
}
|
||||
|
||||
// Track general usage for this account.
|
||||
@@ -653,7 +654,9 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
return err
|
||||
}
|
||||
jsa := &jsAccount{js: js, account: a, limits: *limits, streams: make(map[string]*stream), sendq: sendq}
|
||||
jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer)
|
||||
jsa.storeDir = path.Join(js.config.StoreDir, a.Name)
|
||||
|
||||
js.accounts[a] = jsa
|
||||
js.reserveResources(limits)
|
||||
js.mu.Unlock()
|
||||
@@ -1148,6 +1151,17 @@ func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) {
|
||||
jsa.mu.Unlock()
|
||||
}
|
||||
|
||||
const usageTick = 1500 * time.Millisecond
|
||||
|
||||
func (jsa *jsAccount) sendClusterUsageUpdateTimer() {
|
||||
jsa.sendClusterUsageUpdate()
|
||||
jsa.mu.Lock()
|
||||
if jsa.utimer != nil {
|
||||
jsa.utimer.Reset(usageTick)
|
||||
}
|
||||
jsa.mu.Unlock()
|
||||
}
|
||||
|
||||
// Send updates to our account usage for this server.
|
||||
// Lock should be held.
|
||||
func (jsa *jsAccount) sendClusterUsageUpdate() {
|
||||
@@ -1237,6 +1251,10 @@ func (jsa *jsAccount) delete() {
|
||||
var ts []string
|
||||
|
||||
jsa.mu.Lock()
|
||||
if jsa.utimer != nil {
|
||||
jsa.utimer.Stop()
|
||||
jsa.utimer = nil
|
||||
}
|
||||
|
||||
if jsa.updatesSub != nil && jsa.js.srv != nil {
|
||||
s := jsa.js.srv
|
||||
|
||||
@@ -2610,7 +2610,7 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
|
||||
}
|
||||
expectedSize := 25*msgSize + 75*msgSize*2 + 10*msgSize*3
|
||||
// This may lag.
|
||||
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
||||
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
|
||||
if stats.Store != expectedSize {
|
||||
stats = accountStats()
|
||||
return fmt.Errorf("Expected store size to be %d, got %+v\n", expectedSize, stats)
|
||||
|
||||
Reference in New Issue
Block a user