Only enable JetStream account updates in clustered mode.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-06-10 15:21:55 -07:00
parent 8c513ad2c2
commit a5de25f213

View File

@@ -1020,9 +1020,9 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
return fmt.Errorf("jetstream can not be enabled on the system account")
}
s.mu.Lock()
s.mu.RLock()
sendq := s.sys.sendq
s.mu.Unlock()
s.mu.RUnlock()
// No limits means we dynamically set up limits.
// We also place limits here so we know that the account is configured for JetStream.
@@ -1054,12 +1054,15 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
jsa := &jsAccount{js: js, account: a, limits: limits, streams: make(map[string]*stream), sendq: sendq, usage: make(map[string]*jsaStorage)}
jsa.storeDir = filepath.Join(js.config.StoreDir, a.Name)
jsa.usageMu.Lock()
jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer)
// Cluster mode updates to resource usage, but we always will turn on. System internal prevents echos.
jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode)
jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage)
jsa.usageMu.Unlock()
// A single server does not need to do the account updates at this point.
if js.cluster != nil || !s.standAloneMode() {
jsa.usageMu.Lock()
jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer)
// Cluster mode updates to resource usage. System internal prevents echos.
jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode)
jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage)
jsa.usageMu.Unlock()
}
js.accounts[a.Name] = jsa
js.mu.Unlock()