Avoid race condition

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-12-29 08:26:10 -08:00
parent bd495f3b18
commit 1a37f0963a
2 changed files with 22 additions and 9 deletions

View File

@@ -955,7 +955,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
sysNode := s.Node()
// Cluster mode updates to resource usages, but we always will turn on. System internal prevents echos.
// Cluster mode updates to resource usage, but we always will turn on. System internal prevents echos.
jsa.mu.Lock()
jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode)
jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage)
@@ -1215,7 +1215,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
}
}
// Make sure to cleanup and old remaining snapshots.
// Make sure to cleanup any old remaining snapshots.
os.RemoveAll(path.Join(jsa.storeDir, snapsDir))
s.Debugf("JetStream state for account %q recovered", a.Name)
@@ -1505,8 +1505,16 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account
// Updates accounting on in use memory and storage. This is called from locally
// by the lower storage layers.
func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) {
jsa.mu.Lock()
var isClustered bool
// Ok to check jsa.js here w/o lock.
js := jsa.js
if js != nil {
isClustered = js.isClustered()
}
jsa.mu.Lock()
defer jsa.mu.Unlock()
if storeType == MemoryStorage {
jsa.usage.mem += delta
jsa.memTotal += delta
@@ -1523,10 +1531,9 @@ func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) {
}
}
// Publish our local updates if in clustered mode.
if js.cluster != nil {
if isClustered {
jsa.sendClusterUsageUpdate()
}
jsa.mu.Unlock()
}
const usageTick = 1500 * time.Millisecond

View File

@@ -197,10 +197,7 @@ func (s *Server) JetStreamIsClustered() bool {
if js == nil {
return false
}
js.mu.RLock()
isClustered := js.cluster != nil
js.mu.RUnlock()
return isClustered
return js.isClustered()
}
func (s *Server) JetStreamIsLeader() bool {
@@ -479,6 +476,15 @@ func (s *Server) enableJetStreamClustering() error {
return js.setupMetaGroup()
}
// isClustered returns if we are clustered.
// Lock should not be held.
func (js *jetStream) isClustered() bool {
js.mu.RLock()
isClustered := js.cluster != nil
js.mu.RUnlock()
return isClustered
}
func (js *jetStream) setupMetaGroup() error {
s := js.srv
s.Noticef("Creating JetStream metadata controller")