diff --git a/server/consumer.go b/server/consumer.go index 84bb109a..457de69e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -520,9 +520,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerConfigRequiredError() } - jsa.mu.RLock() + jsa.usageMu.RLock() selectedLimits, limitsFound := jsa.limits[tierName] - jsa.mu.RUnlock() + jsa.usageMu.RUnlock() if !limitsFound { return nil, NewJSNoLimitsError() } diff --git a/server/jetstream.go b/server/jetstream.go index e26404ca..a34f6b90 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -132,25 +132,27 @@ type jsAccount struct { mu sync.RWMutex js *jetStream account *Account - limits map[string]JetStreamAccountLimits // indexed by tierName - usage map[string]*jsaStorage // indexed by tierName - apiTotal uint64 - apiErrors uint64 - usageApi uint64 - usageErr uint64 - rusage map[string]*remoteUsage // indexed by node id storeDir string streams map[string]*stream templates map[string]*streamTemplate store TemplateStore - // Cluster support + // From server + sendq *ipQueue // of *pubMsg + + // Usage/limits related fields that will be protected by usageMu + usageMu sync.RWMutex + limits map[string]JetStreamAccountLimits // indexed by tierName + usage map[string]*jsaStorage // indexed by tierName + rusage map[string]*remoteUsage // indexed by node id + apiTotal uint64 + apiErrors uint64 + usageApi uint64 + usageErr uint64 updatesPub string updatesSub *subscription - // From server - sendq *ipQueue // of *pubMsg - lupdate time.Time - utimer *time.Timer + lupdate time.Time + utimer *time.Timer } // Track general usage for this account. @@ -993,21 +995,21 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro return err } + sysNode := s.Node() + jsa := &jsAccount{js: js, account: a, limits: limits, streams: make(map[string]*stream), sendq: sendq, usage: make(map[string]*jsaStorage)} - jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) jsa.storeDir = filepath.Join(js.config.StoreDir, a.Name) + jsa.usageMu.Lock() + jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) + // Cluster mode updates to resource usage, but we always will turn on. System internal prevents echos. + jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode) + jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage) + jsa.usageMu.Unlock() + js.accounts[a.Name] = jsa js.mu.Unlock() - sysNode := s.Node() - - // Cluster mode updates to resource usage, but we always will turn on. System internal prevents echos. - jsa.mu.Lock() - jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode) - jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage) - jsa.mu.Unlock() - // Stamp inside account as well. a.mu.Lock() a.js = jsa @@ -1291,9 +1293,9 @@ func (a *Account) maxBytesLimits(cfg *StreamConfig) (bool, int64) { if jsa == nil { return false, 0 } - jsa.mu.RLock() + jsa.usageMu.RLock() selectedLimits, _, ok := jsa.selectLimits(cfg) - jsa.mu.RUnlock() + jsa.usageMu.RUnlock() if !ok { return false, 0 } @@ -1395,9 +1397,9 @@ func (a *Account) UpdateJetStreamLimits(limits map[string]JetStreamAccountLimits } // Calculate the delta between what we have and what we want. - jsa.mu.Lock() + jsa.usageMu.RLock() dl := diffCheckedLimits(jsa.limits, limits) - jsa.mu.Unlock() + jsa.usageMu.RUnlock() js.mu.Lock() // Check the limits against existing reservations. @@ -1408,9 +1410,9 @@ func (a *Account) UpdateJetStreamLimits(limits map[string]JetStreamAccountLimits js.mu.Unlock() // Update - jsa.mu.Lock() + jsa.usageMu.Lock() jsa.limits = limits - jsa.mu.Unlock() + jsa.usageMu.Unlock() return nil } @@ -1449,6 +1451,7 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { js := jsa.js js.mu.RLock() jsa.mu.RLock() + jsa.usageMu.RLock() stats.Memory, stats.Store = jsa.storageTotals() stats.Domain = js.config.Domain stats.API = JetStreamAPIStats{ @@ -1483,6 +1486,7 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { } } } + jsa.usageMu.RUnlock() if cc := jsa.js.cluster; cc != nil { sas := cc.streams[aname] if defaultTier { @@ -1591,10 +1595,12 @@ func (a *Account) JetStreamEnabled() bool { func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account, subject, _ string, msg []byte) { const usageSize = 32 - jsa.mu.Lock() + // jsa.js.srv is immutable and guaranteed to no be nil, so no lock needed. s := jsa.js.srv + + jsa.usageMu.Lock() if len(msg) < usageSize { - jsa.mu.Unlock() + jsa.usageMu.Unlock() s.Warnf("Ignoring remote usage update with size too short") return } @@ -1603,7 +1609,7 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account rnode = subject[li+1:] } if rnode == _EMPTY_ { - jsa.mu.Unlock() + jsa.usageMu.Unlock() s.Warnf("Received remote usage update with no remote node") return } @@ -1661,21 +1667,21 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account rUsage.err = apiErrors jsa.apiTotal += apiTotal jsa.apiErrors += apiErrors - jsa.mu.Unlock() + jsa.usageMu.Unlock() } // Updates accounting on in use memory and storage. This is called from locally // by the lower storage layers. func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta int64) { - var isClustered bool - // Ok to check jsa.js here w/o lock. + // jsa.js is immutable and cannot be nil, so ok w/o lock. js := jsa.js - if js != nil { - isClustered = js.isClustered() - } + // This function may be invoked under the mset's lock, so we can't get + // the js' lock to check if clustered. But again, js.cluster is immutable, + // so we check without the lock. + isClustered := js.cluster != nil - jsa.mu.Lock() - defer jsa.mu.Unlock() + jsa.usageMu.Lock() + defer jsa.usageMu.Unlock() s, ok := jsa.usage[tierName] if !ok { @@ -1700,8 +1706,8 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta const usageTick = 1500 * time.Millisecond func (jsa *jsAccount) sendClusterUsageUpdateTimer() { - jsa.mu.Lock() - defer jsa.mu.Unlock() + jsa.usageMu.Lock() + defer jsa.usageMu.Unlock() jsa.sendClusterUsageUpdate() if jsa.utimer != nil { jsa.utimer.Reset(usageTick) @@ -1709,11 +1715,8 @@ func (jsa *jsAccount) sendClusterUsageUpdateTimer() { } // Send updates to our account usage for this server. -// Lock should be held. +// jsa.usageMu lock should be held. func (jsa *jsAccount) sendClusterUsageUpdate() { - if jsa.js == nil || jsa.js.srv == nil || jsa.sendq == nil { - return - } // These values are absolute so we can limit send rates. now := time.Now() if now.Sub(jsa.lupdate) < 250*time.Millisecond { @@ -1791,7 +1794,7 @@ func (jsa *jsAccount) jetStreamAndClustered() (*jetStream, bool) { return js, js.isClustered() } -// Read lock should be held. +// jsa.usageMu read lock should be held. func (jsa *jsAccount) selectLimits(cfg *StreamConfig) (JetStreamAccountLimits, string, bool) { if selectedLimits, ok := jsa.limits[_EMPTY_]; ok { return selectedLimits, _EMPTY_, true @@ -1817,7 +1820,7 @@ func (jsa *jsAccount) countStreams(tier string, cfg *StreamConfig) int { return streams } -// Lock should be held. +// jsa.usageMu read lock (at least) should be held. func (jsa *jsAccount) storageTotals() (uint64, uint64) { mem := uint64(0) store := uint64(0) @@ -1829,8 +1832,8 @@ func (jsa *jsAccount) storageTotals() (uint64, uint64) { } func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string) (bool, *ApiError) { - jsa.mu.RLock() - defer jsa.mu.RUnlock() + jsa.usageMu.RLock() + defer jsa.usageMu.RUnlock() selectedLimits, ok := jsa.limits[tierName] if !ok { @@ -1936,16 +1939,18 @@ func (jsa *jsAccount) delete() { var ts []string jsa.mu.Lock() + // The update timer and subs need to be protected by usageMu lock + jsa.usageMu.Lock() if jsa.utimer != nil { jsa.utimer.Stop() jsa.utimer = nil } - if jsa.updatesSub != nil && jsa.js.srv != nil { s := jsa.js.srv s.sysUnsubscribe(jsa.updatesSub) jsa.updatesSub = nil } + jsa.usageMu.Unlock() for _, ms := range jsa.streams { streams = append(streams, ms) @@ -2032,11 +2037,11 @@ func (js *jetStream) sufficientResources(limits map[string]JetStreamAccountLimit // Since we know if we are here we are single server mode, check the account reservations. var storeReserved, memReserved int64 for _, jsa := range js.accounts { - jsa.mu.RLock() + jsa.usageMu.RLock() maxMemory, maxStore := totalMaxBytes(jsa.limits) + jsa.usageMu.RUnlock() memReserved += maxMemory storeReserved += maxStore - jsa.mu.RUnlock() } if memReserved+totalMaxMemory > js.config.MaxMemory { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b9be1f3f..40daa8dd 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -849,12 +849,12 @@ func (a *Account) trackAPI() { jsa := a.js a.mu.RUnlock() if jsa != nil { - jsa.mu.Lock() + jsa.usageMu.Lock() jsa.usageApi++ jsa.apiTotal++ jsa.sendClusterUsageUpdate() atomic.AddInt64(&jsa.js.apiTotal, 1) - jsa.mu.Unlock() + jsa.usageMu.Unlock() } } @@ -863,7 +863,7 @@ func (a *Account) trackAPIErr() { jsa := a.js a.mu.RUnlock() if jsa != nil { - jsa.mu.Lock() + jsa.usageMu.Lock() jsa.usageApi++ jsa.apiTotal++ jsa.usageErr++ @@ -871,7 +871,7 @@ func (a *Account) trackAPIErr() { jsa.sendClusterUsageUpdate() atomic.AddInt64(&jsa.js.apiTotal, 1) atomic.AddInt64(&jsa.js.apiErrors, 1) - jsa.mu.Unlock() + jsa.usageMu.Unlock() } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d6080eef..6edc4e2c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4324,9 +4324,9 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st return nil, _EMPTY_, nil, NewJSNotEnabledForAccountError() } - jsa.mu.RLock() + jsa.usageMu.RLock() selectedLimits, tierName, ok := jsa.selectLimits(cfg) - jsa.mu.RUnlock() + jsa.usageMu.RUnlock() if !ok { return nil, _EMPTY_, nil, NewJSNoLimitsError() @@ -5641,10 +5641,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Check here pre-emptively if we have exceeded our account limits. var exceeded bool - jsa.mu.RLock() + jsa.usageMu.Lock() jsaLimits, ok := jsa.limits[tierName] if !ok { - jsa.mu.RUnlock() + jsa.usageMu.Unlock() err := fmt.Errorf("no JetStream resource limits found account: %q", jsa.acc().Name) s.RateLimitWarnf(err.Error()) if canRespond { @@ -5671,7 +5671,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ exceeded = true } } - jsa.mu.RUnlock() + jsa.usageMu.Unlock() // If we have exceeded our account limits go ahead and return. if exceeded { diff --git a/server/monitor.go b/server/monitor.go index ff7ade80..492dec7b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2576,6 +2576,7 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg if acc.nameTag != "" { name = acc.nameTag } + jsa.usageMu.RLock() totalMem, totalStore := jsa.storageTotals() detail := AccountDetail{ Name: name, @@ -2590,6 +2591,7 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg }, Streams: make([]StreamDetail, 0, len(jsa.streams)), } + jsa.usageMu.RUnlock() var streams []*stream if optStreams { for _, stream := range jsa.streams { diff --git a/server/stream.go b/server/stream.go index e1aee1a2..7d9e12dd 100644 --- a/server/stream.go +++ b/server/stream.go @@ -322,7 +322,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, ApiErrors[JSStreamNameExistErr] } } + jsa.usageMu.RLock() selected, tier, hasTier := jsa.selectLimits(&cfg) + jsa.usageMu.RUnlock() reserved := int64(0) if !isClustered { reserved = jsa.tieredReservation(tier, &cfg) @@ -1204,10 +1206,12 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str js, isClustered := jsa.jetStreamAndClustered() jsa.mu.RLock() acc := jsa.account + jsa.usageMu.RLock() selected, tier, hasTier := jsa.selectLimits(&cfg) if !hasTier && old.Replicas != cfg.Replicas { selected, tier, hasTier = jsa.selectLimits(old) } + jsa.usageMu.RUnlock() reserved := int64(0) if !isClustered { reserved = jsa.tieredReservation(tier, &cfg) @@ -1308,7 +1312,10 @@ func (mset *stream) update(config *StreamConfig) error { if targetTier := tierName(cfg); mset.tier != targetTier { // In cases such as R1->R3, only one update is needed - if _, ok := mset.jsa.limits[targetTier]; ok { + mset.jsa.usageMu.RLock() + _, ok := mset.jsa.limits[targetTier] + mset.jsa.usageMu.RUnlock() + if ok { // error never set _, reported, _ := mset.store.Utilization() mset.jsa.updateUsage(mset.tier, mset.stype, -int64(reported))