From 50500924682442e9e92b9f49ee6664f2bd9c2621 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 2 May 2022 09:50:32 -0600 Subject: [PATCH] [FIXED] JetStream: possible lock inversion When updating usage, there is a lock inversion in that the jetStream lock was acquired while under the stream's (mset) lock, which is not correct. Also, updateUsage was locking the jsAccount lock, which again, is not really correct since jsAccount contains streams, so it should be jsAccount->stream, not the other way around. Removed the locking of jetStream to check for clustered state since js.clustered is immutable. Replaced using jsAccount lock to update usage with a dedicated lock. Originally moved all the update/limit fields in jsAccount to new structure to make sure that I would see all code that is updating or reading those fields, and also all functions so that I could make sure that I use the new lock when calling these. Once that works was done, and to reduce code changes, I put the fields back into jsAccount (although I grouped them under the new usageMu mutex field). Signed-off-by: Ivan Kozlovic --- server/consumer.go | 4 +- server/jetstream.go | 107 +++++++++++++++++++----------------- server/jetstream_api.go | 8 +-- server/jetstream_cluster.go | 10 ++-- server/monitor.go | 2 + server/stream.go | 9 ++- 6 files changed, 77 insertions(+), 63 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 84bb109a..457de69e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -520,9 +520,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerConfigRequiredError() } - jsa.mu.RLock() + jsa.usageMu.RLock() selectedLimits, limitsFound := jsa.limits[tierName] - jsa.mu.RUnlock() + jsa.usageMu.RUnlock() if !limitsFound { return nil, NewJSNoLimitsError() } diff --git a/server/jetstream.go b/server/jetstream.go index e26404ca..a34f6b90 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -132,25 +132,27 @@ type jsAccount struct { mu sync.RWMutex js *jetStream account *Account - limits map[string]JetStreamAccountLimits // indexed by tierName - usage map[string]*jsaStorage // indexed by tierName - apiTotal uint64 - apiErrors uint64 - usageApi uint64 - usageErr uint64 - rusage map[string]*remoteUsage // indexed by node id storeDir string streams map[string]*stream templates map[string]*streamTemplate store TemplateStore - // Cluster support + // From server + sendq *ipQueue // of *pubMsg + + // Usage/limits related fields that will be protected by usageMu + usageMu sync.RWMutex + limits map[string]JetStreamAccountLimits // indexed by tierName + usage map[string]*jsaStorage // indexed by tierName + rusage map[string]*remoteUsage // indexed by node id + apiTotal uint64 + apiErrors uint64 + usageApi uint64 + usageErr uint64 updatesPub string updatesSub *subscription - // From server - sendq *ipQueue // of *pubMsg - lupdate time.Time - utimer *time.Timer + lupdate time.Time + utimer *time.Timer } // Track general usage for this account. @@ -993,21 +995,21 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro return err } + sysNode := s.Node() + jsa := &jsAccount{js: js, account: a, limits: limits, streams: make(map[string]*stream), sendq: sendq, usage: make(map[string]*jsaStorage)} - jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) jsa.storeDir = filepath.Join(js.config.StoreDir, a.Name) + jsa.usageMu.Lock() + jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) + // Cluster mode updates to resource usage, but we always will turn on. System internal prevents echos. + jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode) + jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage) + jsa.usageMu.Unlock() + js.accounts[a.Name] = jsa js.mu.Unlock() - sysNode := s.Node() - - // Cluster mode updates to resource usage, but we always will turn on. System internal prevents echos. - jsa.mu.Lock() - jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode) - jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage) - jsa.mu.Unlock() - // Stamp inside account as well. a.mu.Lock() a.js = jsa @@ -1291,9 +1293,9 @@ func (a *Account) maxBytesLimits(cfg *StreamConfig) (bool, int64) { if jsa == nil { return false, 0 } - jsa.mu.RLock() + jsa.usageMu.RLock() selectedLimits, _, ok := jsa.selectLimits(cfg) - jsa.mu.RUnlock() + jsa.usageMu.RUnlock() if !ok { return false, 0 } @@ -1395,9 +1397,9 @@ func (a *Account) UpdateJetStreamLimits(limits map[string]JetStreamAccountLimits } // Calculate the delta between what we have and what we want. - jsa.mu.Lock() + jsa.usageMu.RLock() dl := diffCheckedLimits(jsa.limits, limits) - jsa.mu.Unlock() + jsa.usageMu.RUnlock() js.mu.Lock() // Check the limits against existing reservations. @@ -1408,9 +1410,9 @@ func (a *Account) UpdateJetStreamLimits(limits map[string]JetStreamAccountLimits js.mu.Unlock() // Update - jsa.mu.Lock() + jsa.usageMu.Lock() jsa.limits = limits - jsa.mu.Unlock() + jsa.usageMu.Unlock() return nil } @@ -1449,6 +1451,7 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { js := jsa.js js.mu.RLock() jsa.mu.RLock() + jsa.usageMu.RLock() stats.Memory, stats.Store = jsa.storageTotals() stats.Domain = js.config.Domain stats.API = JetStreamAPIStats{ @@ -1483,6 +1486,7 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { } } } + jsa.usageMu.RUnlock() if cc := jsa.js.cluster; cc != nil { sas := cc.streams[aname] if defaultTier { @@ -1591,10 +1595,12 @@ func (a *Account) JetStreamEnabled() bool { func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account, subject, _ string, msg []byte) { const usageSize = 32 - jsa.mu.Lock() + // jsa.js.srv is immutable and guaranteed to no be nil, so no lock needed. s := jsa.js.srv + + jsa.usageMu.Lock() if len(msg) < usageSize { - jsa.mu.Unlock() + jsa.usageMu.Unlock() s.Warnf("Ignoring remote usage update with size too short") return } @@ -1603,7 +1609,7 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account rnode = subject[li+1:] } if rnode == _EMPTY_ { - jsa.mu.Unlock() + jsa.usageMu.Unlock() s.Warnf("Received remote usage update with no remote node") return } @@ -1661,21 +1667,21 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account rUsage.err = apiErrors jsa.apiTotal += apiTotal jsa.apiErrors += apiErrors - jsa.mu.Unlock() + jsa.usageMu.Unlock() } // Updates accounting on in use memory and storage. This is called from locally // by the lower storage layers. func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta int64) { - var isClustered bool - // Ok to check jsa.js here w/o lock. + // jsa.js is immutable and cannot be nil, so ok w/o lock. js := jsa.js - if js != nil { - isClustered = js.isClustered() - } + // This function may be invoked under the mset's lock, so we can't get + // the js' lock to check if clustered. But again, js.cluster is immutable, + // so we check without the lock. + isClustered := js.cluster != nil - jsa.mu.Lock() - defer jsa.mu.Unlock() + jsa.usageMu.Lock() + defer jsa.usageMu.Unlock() s, ok := jsa.usage[tierName] if !ok { @@ -1700,8 +1706,8 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta const usageTick = 1500 * time.Millisecond func (jsa *jsAccount) sendClusterUsageUpdateTimer() { - jsa.mu.Lock() - defer jsa.mu.Unlock() + jsa.usageMu.Lock() + defer jsa.usageMu.Unlock() jsa.sendClusterUsageUpdate() if jsa.utimer != nil { jsa.utimer.Reset(usageTick) @@ -1709,11 +1715,8 @@ func (jsa *jsAccount) sendClusterUsageUpdateTimer() { } // Send updates to our account usage for this server. -// Lock should be held. +// jsa.usageMu lock should be held. func (jsa *jsAccount) sendClusterUsageUpdate() { - if jsa.js == nil || jsa.js.srv == nil || jsa.sendq == nil { - return - } // These values are absolute so we can limit send rates. now := time.Now() if now.Sub(jsa.lupdate) < 250*time.Millisecond { @@ -1791,7 +1794,7 @@ func (jsa *jsAccount) jetStreamAndClustered() (*jetStream, bool) { return js, js.isClustered() } -// Read lock should be held. +// jsa.usageMu read lock should be held. func (jsa *jsAccount) selectLimits(cfg *StreamConfig) (JetStreamAccountLimits, string, bool) { if selectedLimits, ok := jsa.limits[_EMPTY_]; ok { return selectedLimits, _EMPTY_, true @@ -1817,7 +1820,7 @@ func (jsa *jsAccount) countStreams(tier string, cfg *StreamConfig) int { return streams } -// Lock should be held. +// jsa.usageMu read lock (at least) should be held. func (jsa *jsAccount) storageTotals() (uint64, uint64) { mem := uint64(0) store := uint64(0) @@ -1829,8 +1832,8 @@ func (jsa *jsAccount) storageTotals() (uint64, uint64) { } func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string) (bool, *ApiError) { - jsa.mu.RLock() - defer jsa.mu.RUnlock() + jsa.usageMu.RLock() + defer jsa.usageMu.RUnlock() selectedLimits, ok := jsa.limits[tierName] if !ok { @@ -1936,16 +1939,18 @@ func (jsa *jsAccount) delete() { var ts []string jsa.mu.Lock() + // The update timer and subs need to be protected by usageMu lock + jsa.usageMu.Lock() if jsa.utimer != nil { jsa.utimer.Stop() jsa.utimer = nil } - if jsa.updatesSub != nil && jsa.js.srv != nil { s := jsa.js.srv s.sysUnsubscribe(jsa.updatesSub) jsa.updatesSub = nil } + jsa.usageMu.Unlock() for _, ms := range jsa.streams { streams = append(streams, ms) @@ -2032,11 +2037,11 @@ func (js *jetStream) sufficientResources(limits map[string]JetStreamAccountLimit // Since we know if we are here we are single server mode, check the account reservations. var storeReserved, memReserved int64 for _, jsa := range js.accounts { - jsa.mu.RLock() + jsa.usageMu.RLock() maxMemory, maxStore := totalMaxBytes(jsa.limits) + jsa.usageMu.RUnlock() memReserved += maxMemory storeReserved += maxStore - jsa.mu.RUnlock() } if memReserved+totalMaxMemory > js.config.MaxMemory { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b9be1f3f..40daa8dd 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -849,12 +849,12 @@ func (a *Account) trackAPI() { jsa := a.js a.mu.RUnlock() if jsa != nil { - jsa.mu.Lock() + jsa.usageMu.Lock() jsa.usageApi++ jsa.apiTotal++ jsa.sendClusterUsageUpdate() atomic.AddInt64(&jsa.js.apiTotal, 1) - jsa.mu.Unlock() + jsa.usageMu.Unlock() } } @@ -863,7 +863,7 @@ func (a *Account) trackAPIErr() { jsa := a.js a.mu.RUnlock() if jsa != nil { - jsa.mu.Lock() + jsa.usageMu.Lock() jsa.usageApi++ jsa.apiTotal++ jsa.usageErr++ @@ -871,7 +871,7 @@ func (a *Account) trackAPIErr() { jsa.sendClusterUsageUpdate() atomic.AddInt64(&jsa.js.apiTotal, 1) atomic.AddInt64(&jsa.js.apiErrors, 1) - jsa.mu.Unlock() + jsa.usageMu.Unlock() } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d6080eef..6edc4e2c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4324,9 +4324,9 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st return nil, _EMPTY_, nil, NewJSNotEnabledForAccountError() } - jsa.mu.RLock() + jsa.usageMu.RLock() selectedLimits, tierName, ok := jsa.selectLimits(cfg) - jsa.mu.RUnlock() + jsa.usageMu.RUnlock() if !ok { return nil, _EMPTY_, nil, NewJSNoLimitsError() @@ -5641,10 +5641,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Check here pre-emptively if we have exceeded our account limits. var exceeded bool - jsa.mu.RLock() + jsa.usageMu.Lock() jsaLimits, ok := jsa.limits[tierName] if !ok { - jsa.mu.RUnlock() + jsa.usageMu.Unlock() err := fmt.Errorf("no JetStream resource limits found account: %q", jsa.acc().Name) s.RateLimitWarnf(err.Error()) if canRespond { @@ -5671,7 +5671,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ exceeded = true } } - jsa.mu.RUnlock() + jsa.usageMu.Unlock() // If we have exceeded our account limits go ahead and return. if exceeded { diff --git a/server/monitor.go b/server/monitor.go index ff7ade80..492dec7b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2576,6 +2576,7 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg if acc.nameTag != "" { name = acc.nameTag } + jsa.usageMu.RLock() totalMem, totalStore := jsa.storageTotals() detail := AccountDetail{ Name: name, @@ -2590,6 +2591,7 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg }, Streams: make([]StreamDetail, 0, len(jsa.streams)), } + jsa.usageMu.RUnlock() var streams []*stream if optStreams { for _, stream := range jsa.streams { diff --git a/server/stream.go b/server/stream.go index e1aee1a2..7d9e12dd 100644 --- a/server/stream.go +++ b/server/stream.go @@ -322,7 +322,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, ApiErrors[JSStreamNameExistErr] } } + jsa.usageMu.RLock() selected, tier, hasTier := jsa.selectLimits(&cfg) + jsa.usageMu.RUnlock() reserved := int64(0) if !isClustered { reserved = jsa.tieredReservation(tier, &cfg) @@ -1204,10 +1206,12 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str js, isClustered := jsa.jetStreamAndClustered() jsa.mu.RLock() acc := jsa.account + jsa.usageMu.RLock() selected, tier, hasTier := jsa.selectLimits(&cfg) if !hasTier && old.Replicas != cfg.Replicas { selected, tier, hasTier = jsa.selectLimits(old) } + jsa.usageMu.RUnlock() reserved := int64(0) if !isClustered { reserved = jsa.tieredReservation(tier, &cfg) @@ -1308,7 +1312,10 @@ func (mset *stream) update(config *StreamConfig) error { if targetTier := tierName(cfg); mset.tier != targetTier { // In cases such as R1->R3, only one update is needed - if _, ok := mset.jsa.limits[targetTier]; ok { + mset.jsa.usageMu.RLock() + _, ok := mset.jsa.limits[targetTier] + mset.jsa.usageMu.RUnlock() + if ok { // error never set _, reported, _ := mset.store.Utilization() mset.jsa.updateUsage(mset.tier, mset.stype, -int64(reported))