From 0c5f3688a75c094d311d1e1ca6437cb64e7eb579 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 28 Mar 2022 20:47:54 -0400 Subject: [PATCH] [ADDED] Tiered limits and fix limit issues on updates (#2945) * Adding tiered limits and fix limit issues on updates Signed-off-by: Matthias Hanel --- go.mod | 2 +- go.sum | 4 +- server/accounts.go | 34 +- server/consumer.go | 13 +- server/errors.json | 10 + server/jetstream.go | 451 ++++++++++++++++++++------- server/jetstream_api.go | 50 ++- server/jetstream_cluster.go | 93 +++++- server/jetstream_cluster_test.go | 129 +++++++- server/jetstream_errors_generated.go | 18 +- server/jetstream_test.go | 164 ++++++---- server/jwt_test.go | 308 ++++++++++++++++++ server/monitor.go | 5 +- server/mqtt_test.go | 12 +- server/opts.go | 11 +- server/server.go | 10 +- server/store.go | 2 +- server/stream.go | 96 +++++- 18 files changed, 1155 insertions(+), 257 deletions(-) diff --git a/go.mod b/go.mod index da628ddb..4fe29e0b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.17 require ( github.com/klauspost/compress v1.14.4 github.com/minio/highwayhash v1.0.2 - github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 + github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index ace0fd05..4ffa5e56 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,8 @@ github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6 github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba h1:NZi4+xOauRDb4znbGDeJqdS1Gh448BaQ3NS9F1UnwN0= +github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228 h1:czbQ9uYuV7dwLsh/0vpB+4rutgdLTYgoN5W5hf1S0eg= github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= diff --git a/server/accounts.go b/server/accounts.go index cd3a3677..1cd76a59 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -76,7 +76,7 @@ type Account struct { imports importMap exports exportMap js *jsAccount - jsLimits *JetStreamAccountLimits + jsLimits map[string]JetStreamAccountLimits limits expired bool incomplete bool @@ -3274,15 +3274,29 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim a.srv = s } - // Setup js limits regardless of whether this server has jsEnabled. - if ac.Limits.JetStreamLimits.DiskStorage != 0 || ac.Limits.JetStreamLimits.MemoryStorage != 0 { - // JetStreamAccountLimits and jwt.JetStreamLimits use same value for unlimited - a.jsLimits = &JetStreamAccountLimits{ - MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage, - MaxStore: ac.Limits.JetStreamLimits.DiskStorage, - MaxStreams: int(ac.Limits.JetStreamLimits.Streams), - MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer), - MaxBytesRequired: ac.Limits.JetStreamLimits.MaxBytesRequired, + if ac.Limits.IsJSEnabled() { + if ac.Limits.JetStreamLimits.DiskStorage != 0 || ac.Limits.JetStreamLimits.MemoryStorage != 0 { + // JetStreamAccountLimits and jwt.JetStreamLimits use same value for unlimited + a.jsLimits = map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage, + MaxStore: ac.Limits.JetStreamLimits.DiskStorage, + MaxStreams: int(ac.Limits.JetStreamLimits.Streams), + MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer), + MaxBytesRequired: ac.Limits.JetStreamLimits.MaxBytesRequired, + }, + } + } else { + a.jsLimits = map[string]JetStreamAccountLimits{} + for t, l := range ac.Limits.JetStreamTieredLimits { + a.jsLimits[t] = JetStreamAccountLimits{ + MaxMemory: l.MemoryStorage, + MaxStore: l.DiskStorage, + MaxStreams: int(l.Streams), + MaxConsumers: int(l.Consumer), + MaxBytesRequired: l.MaxBytesRequired, + } + } } } else if a.jsLimits != nil { // covers failed update followed by disable diff --git a/server/consumer.go b/server/consumer.go index d26061b1..44c9351b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -328,7 +328,7 @@ func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment) (*consumer, error) { mset.mu.RLock() - s, jsa := mset.srv, mset.jsa + s, jsa, tierName := mset.srv, mset.jsa, mset.tier mset.mu.RUnlock() // If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn. @@ -500,6 +500,13 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } c.mu.Unlock() + jsa.mu.RLock() + jsaLimits, limitsFound := jsa.limits[tierName] + jsa.mu.RUnlock() + if !limitsFound { + return nil, NewJSNoLimitsError() + } + // Hold mset lock here. mset.mu.Lock() if mset.client == nil || mset.store == nil { @@ -524,8 +531,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // than stream config we prefer the account limits to handle cases where account limits are // updated during the lifecycle of the stream maxc := mset.cfg.MaxConsumers - if maxc <= 0 || (mset.jsa.limits.MaxConsumers > 0 && mset.jsa.limits.MaxConsumers < maxc) { - maxc = mset.jsa.limits.MaxConsumers + if maxc <= 0 || (jsaLimits.MaxConsumers > 0 && jsaLimits.MaxConsumers < maxc) { + maxc = jsaLimits.MaxConsumers } if maxc > 0 && mset.numPublicConsumers() >= maxc { mset.mu.Unlock() diff --git a/server/errors.json b/server/errors.json index 60d3f4cf..00fb65b5 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1178,5 +1178,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSNoLimitsErr", + "code": 400, + "error_code": 10120, + "description": "no JetStream default or applicable tiered limit present", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/jetstream.go b/server/jetstream.go index 2a2ce886..d2b964dc 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -66,17 +66,22 @@ type JetStreamAccountLimits struct { MaxBytesRequired bool `json:"max_bytes_required"` } -// JetStreamAccountStats returns current statistics about the account's JetStream usage. -type JetStreamAccountStats struct { +type JetStreamTier struct { Memory uint64 `json:"memory"` Store uint64 `json:"storage"` Streams int `json:"streams"` Consumers int `json:"consumers"` - Domain string `json:"domain,omitempty"` - API JetStreamAPIStats `json:"api"` Limits JetStreamAccountLimits `json:"limits"` } +// JetStreamAccountStats returns current statistics about the account's JetStream usage. +type JetStreamAccountStats struct { + JetStreamTier // in case tiers are used, reflects totals with limits not set + Domain string `json:"domain,omitempty"` + API JetStreamAPIStats `json:"api"` + Tiers map[string]JetStreamTier `json:"tiers,omitempty"` // indexed by tier name +} + type JetStreamAPIStats struct { Total uint64 `json:"total"` Errors uint64 `json:"errors"` @@ -104,28 +109,37 @@ type jetStream struct { oos bool } +type remoteUsage struct { + tiers map[string]*jsaUsage // indexed by tier name + api uint64 + err uint64 +} + +type jsaStorage struct { + total jsaUsage + local jsaUsage +} + // This represents a jetstream enabled account. // Worth noting that we include the jetstream pointer, this is because // in general we want to be very efficient when receiving messages on // an internal sub for a stream, so we will direct link to the stream // and walk backwards as needed vs multiple hash lookups and locks, etc. type jsAccount struct { - mu sync.RWMutex - js *jetStream - account *Account - limits JetStreamAccountLimits - memReserved int64 - storeReserved int64 - memTotal int64 - storeTotal int64 - apiTotal uint64 - apiErrors uint64 - usage jsaUsage - rusage map[string]*jsaUsage - storeDir string - streams map[string]*stream - templates map[string]*streamTemplate - store TemplateStore + mu sync.RWMutex + js *jetStream + account *Account + limits map[string]JetStreamAccountLimits // indexed by tierName + usage map[string]*jsaStorage // indexed by tierName + apiTotal uint64 + apiErrors uint64 + usageApi uint64 + usageErr uint64 + rusage map[string]*remoteUsage // indexed by node id + storeDir string + streams map[string]*stream + templates map[string]*streamTemplate + store TemplateStore // Cluster support updatesPub string @@ -140,8 +154,6 @@ type jsAccount struct { type jsaUsage struct { mem int64 store int64 - api uint64 - err uint64 } // EnableJetStream will enable JetStream support on this server with the given configuration. @@ -553,8 +565,8 @@ func (s *Server) enableJetStreamAccounts() error { if s.globalAccountOnly() { gacc := s.GlobalAccount() gacc.mu.Lock() - if gacc.jsLimits == nil { - gacc.jsLimits = dynamicJSAccountLimits + if len(gacc.jsLimits) == 0 { + gacc.jsLimits = defaultJSAccountTiers } gacc.mu.Unlock() if err := s.configJetStream(gacc); err != nil { @@ -917,7 +929,7 @@ func (s *Server) getJetStream() *jetStream { return js } -func (a *Account) assignJetStreamLimits(limits *JetStreamAccountLimits) { +func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits) { a.mu.Lock() a.jsLimits = limits a.mu.Unlock() @@ -925,7 +937,7 @@ func (a *Account) assignJetStreamLimits(limits *JetStreamAccountLimits) { // EnableJetStream will enable JetStream on this account with the defined limits. // This is a helper for JetStreamEnableAccount. -func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { +func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) error { a.mu.RLock() s := a.srv a.mu.RUnlock() @@ -944,8 +956,8 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // No limits means we dynamically set up limits. // We also place limits here so we know that the account is configured for JetStream. - if limits == nil { - limits = dynamicJSAccountLimits + if len(limits) == 0 { + limits = defaultJSAccountTiers } a.assignJetStreamLimits(limits) @@ -967,7 +979,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { return err } - jsa := &jsAccount{js: js, account: a, limits: *limits, streams: make(map[string]*stream), sendq: sendq} + jsa := &jsAccount{js: js, account: a, limits: limits, streams: make(map[string]*stream), sendq: sendq, usage: make(map[string]*jsaStorage)} jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer) jsa.storeDir = filepath.Join(js.config.StoreDir, a.Name) @@ -993,8 +1005,16 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } s.Debugf("Enabled JetStream for account %q", a.Name) - s.Debugf(" Max Memory: %s", friendlyBytes(limits.MaxMemory)) - s.Debugf(" Max Storage: %s", friendlyBytes(limits.MaxStore)) + if l, ok := limits[_EMPTY_]; ok { + s.Debugf(" Max Memory: %s", friendlyBytes(l.MaxMemory)) + s.Debugf(" Max Storage: %s", friendlyBytes(l.MaxStore)) + } else { + for t, l := range limits { + s.Debugf(" Tier: %s", t) + s.Debugf(" Max Memory: %s", friendlyBytes(l.MaxMemory)) + s.Debugf(" Max Storage: %s", friendlyBytes(l.MaxStore)) + } + } // Clean up any old snapshots that were orphaned while staging. os.RemoveAll(filepath.Join(js.config.StoreDir, snapStagingDir)) @@ -1246,15 +1266,20 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Return whether or not we require MaxBytes to be set. -func (a *Account) maxBytesRequired() bool { +func (a *Account) maxBytesRequired(cfg *StreamConfig) bool { a.mu.RLock() - defer a.mu.RUnlock() - jsa := a.js + a.mu.RUnlock() if jsa == nil { return false } - return jsa.limits.MaxBytesRequired + jsa.mu.RLock() + selectedLimits, _, ok := jsa.selectLimits(cfg) + jsa.mu.RUnlock() + if !ok { + return false + } + return selectedLimits.MaxBytesRequired } // NumStreams will return how many streams we have. @@ -1325,7 +1350,7 @@ func (a *Account) lookupStream(name string) (*stream, error) { } // UpdateJetStreamLimits will update the account limits for a JetStream enabled account. -func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { +func (a *Account) UpdateJetStreamLimits(limits map[string]JetStreamAccountLimits) error { a.mu.RLock() s, jsa := a.srv, a.js a.mu.RUnlock() @@ -1341,18 +1366,18 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { return NewJSNotEnabledForAccountError() } - if limits == nil { - limits = dynamicJSAccountLimits + if len(limits) == 0 { + limits = defaultJSAccountTiers } // Calculate the delta between what we have and what we want. jsa.mu.Lock() - dl := diffCheckedLimits(&jsa.limits, limits) + dl := diffCheckedLimits(jsa.limits, limits) jsa.mu.Unlock() js.mu.Lock() // Check the limits against existing reservations. - if err := js.sufficientResources(&dl); err != nil { + if err := js.sufficientResources(dl); err != nil { js.mu.Unlock() return err } @@ -1360,23 +1385,39 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { // Update jsa.mu.Lock() - jsa.limits = *limits + jsa.limits = limits jsa.mu.Unlock() return nil } -func diffCheckedLimits(a, b *JetStreamAccountLimits) JetStreamAccountLimits { - return JetStreamAccountLimits{ - MaxMemory: b.MaxMemory - a.MaxMemory, - MaxStore: b.MaxStore - a.MaxStore, +func diffCheckedLimits(a, b map[string]JetStreamAccountLimits) map[string]JetStreamAccountLimits { + diff := map[string]JetStreamAccountLimits{} + for t, la := range a { + // in a, not in b will return 0 + lb := b[t] + diff[t] = JetStreamAccountLimits{ + MaxMemory: lb.MaxMemory - la.MaxMemory, + MaxStore: lb.MaxStore - la.MaxStore, + } } + for t, lb := range b { + if la, ok := a[t]; !ok { + // only in b not in a. (in a and b already covered) + diff[t] = JetStreamAccountLimits{ + MaxMemory: lb.MaxMemory - la.MaxMemory, + MaxStore: lb.MaxStore - la.MaxStore, + } + } + } + return diff } // JetStreamUsage reports on JetStream usage and limits for an account. func (a *Account) JetStreamUsage() JetStreamAccountStats { a.mu.RLock() jsa, aname := a.js, a.Name + accJsLimits := a.jsLimits a.mu.RUnlock() var stats JetStreamAccountStats @@ -1384,26 +1425,69 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { js := jsa.js js.mu.RLock() jsa.mu.RLock() - stats.Memory = uint64(jsa.memTotal) - stats.Store = uint64(jsa.storeTotal) + stats.Memory, stats.Store = jsa.storageTotals() stats.Domain = js.config.Domain stats.API = JetStreamAPIStats{ Total: jsa.apiTotal, Errors: jsa.apiErrors, } - if cc := jsa.js.cluster; cc != nil { - sas := cc.streams[aname] - stats.Streams = len(sas) - for _, sa := range sas { - stats.Consumers += len(sa.consumers) - } + l, defaultTier := jsa.limits[_EMPTY_] + if defaultTier { + stats.Limits = l } else { - stats.Streams = len(jsa.streams) - for _, mset := range jsa.streams { - stats.Consumers += mset.numConsumers() + stats.Tiers = make(map[string]JetStreamTier) + for t, total := range jsa.usage { + stats.Tiers[t] = JetStreamTier{ + Memory: uint64(total.total.mem), + Store: uint64(total.total.store), + Limits: jsa.limits[t], + } + } + if len(accJsLimits) != len(jsa.usage) { + // insert unused limits + for t, lim := range accJsLimits { + if _, ok := stats.Tiers[t]; !ok { + stats.Tiers[t] = JetStreamTier{Limits: lim} + } + } + } + } + if cc := jsa.js.cluster; cc != nil { + sas := cc.streams[aname] + if defaultTier { + stats.Streams = len(sas) + } + for _, sa := range sas { + stats.Consumers += len(sa.consumers) + if !defaultTier { + tier := tierName(sa.Config) + u, ok := stats.Tiers[tier] + if !ok { + u = JetStreamTier{} + } + u.Streams++ + u.Consumers += len(sa.consumers) + stats.Tiers[tier] = u + } + } + } else { + if defaultTier { + stats.Streams = len(jsa.streams) + } + for _, mset := range jsa.streams { + consCount := mset.numConsumers() + stats.Consumers += consCount + if !defaultTier { + u, ok := stats.Tiers[mset.tier] + if !ok { + u = JetStreamTier{} + } + u.Streams++ + u.Consumers += consCount + stats.Tiers[mset.tier] = u + } } } - stats.Limits = jsa.limits jsa.mu.RUnlock() js.mu.RUnlock() } @@ -1457,7 +1541,7 @@ func (a *Account) jetStreamConfigured() bool { } a.mu.RLock() defer a.mu.RUnlock() - return a.jsLimits != nil + return len(a.jsLimits) > 0 } // JetStreamEnabled is a helper to determine if jetstream is enabled for an account. @@ -1490,27 +1574,58 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account s.Warnf("Received remote usage update with no remote node") return } - var le = binary.LittleEndian - memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:])) - apiTotal, apiErrors := le.Uint64(msg[16:]), le.Uint64(msg[24:]) + rUsage, ok := jsa.rusage[rnode] + if !ok { + if jsa.rusage == nil { + jsa.rusage = make(map[string]*remoteUsage) + } + rUsage = &remoteUsage{tiers: make(map[string]*jsaUsage)} + jsa.rusage[rnode] = rUsage + } + updateTotal := func(tierName string, memUsed, storeUsed int64) { + total, ok := jsa.usage[tierName] + if !ok { + total = &jsaStorage{} + jsa.usage[tierName] = total + } + // Update the usage for this remote. + if usage := rUsage.tiers[tierName]; usage != nil { + // Decrement our old values. + total.total.mem -= usage.mem + total.total.store -= usage.store + usage.mem, usage.store = memUsed, storeUsed + } else { + rUsage.tiers[tierName] = &jsaUsage{memUsed, storeUsed} + } + total.total.mem += memUsed + total.total.store += storeUsed + } - if jsa.rusage == nil { - jsa.rusage = make(map[string]*jsaUsage) + var le = binary.LittleEndian + apiTotal, apiErrors := le.Uint64(msg[16:]), le.Uint64(msg[24:]) + memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:])) + + // we later extended the data structure to support multiple tiers + excessRecordCnt := uint32(0) + tierName := _EMPTY_ + if len(msg) >= 44 { + excessRecordCnt = le.Uint32(msg[32:]) + length := le.Uint64(msg[36:]) + tierName = string(msg[44 : 44+length]) + msg = msg[44+length:] } - // Update the usage for this remote. - if usage := jsa.rusage[rnode]; usage != nil { - // Decrement our old values. - jsa.memTotal -= usage.mem - jsa.storeTotal -= usage.store - jsa.apiTotal -= usage.api - jsa.apiErrors -= usage.err - usage.mem, usage.store = memUsed, storeUsed - usage.api, usage.err = apiTotal, apiErrors - } else { - jsa.rusage[rnode] = &jsaUsage{memUsed, storeUsed, apiTotal, apiErrors} + updateTotal(tierName, memUsed, storeUsed) + for ; excessRecordCnt > 0 && len(msg) >= 24; excessRecordCnt-- { + memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:])) + length := le.Uint64(msg[16:]) + tierName = string(msg[24 : 24+length]) + msg = msg[24+length:] + updateTotal(tierName, memUsed, storeUsed) } - jsa.memTotal += memUsed - jsa.storeTotal += storeUsed + jsa.apiTotal -= rUsage.api + jsa.apiErrors -= rUsage.err + rUsage.api = apiTotal + rUsage.err = apiErrors jsa.apiTotal += apiTotal jsa.apiErrors += apiErrors jsa.mu.Unlock() @@ -1518,7 +1633,7 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account // Updates accounting on in use memory and storage. This is called from locally // by the lower storage layers. -func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) { +func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta int64) { var isClustered bool // Ok to check jsa.js here w/o lock. js := jsa.js @@ -1529,13 +1644,18 @@ func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) { jsa.mu.Lock() defer jsa.mu.Unlock() + s, ok := jsa.usage[tierName] + if !ok { + s = &jsaStorage{} + jsa.usage[tierName] = s + } if storeType == MemoryStorage { - jsa.usage.mem += delta - jsa.memTotal += delta + s.local.mem += delta + s.total.mem += delta atomic.AddInt64(&js.memUsed, delta) } else { - jsa.usage.store += delta - jsa.storeTotal += delta + s.local.store += delta + s.total.store += delta atomic.AddInt64(&js.storeUsed, delta) } // Publish our local updates if in clustered mode. @@ -1568,13 +1688,36 @@ func (jsa *jsAccount) sendClusterUsageUpdate() { } jsa.lupdate = now - b := make([]byte, 32) + lenUsage := len(jsa.usage) + // every base record contains mem/store/len(tier) as well as the tier name + l := 24 * lenUsage + for tier := range jsa.usage { + l += len(tier) + } + if lenUsage > 0 { + // first record contains api/usage errors as well as count for extra base records + l += 20 + } var le = binary.LittleEndian - le.PutUint64(b[0:], uint64(jsa.usage.mem)) - le.PutUint64(b[8:], uint64(jsa.usage.store)) - le.PutUint64(b[16:], uint64(jsa.usage.api)) - le.PutUint64(b[24:], uint64(jsa.usage.err)) + b := make([]byte, l) + i := 0 + for tier, usage := range jsa.usage { + le.PutUint64(b[i+0:], uint64(usage.local.mem)) + le.PutUint64(b[i+8:], uint64(usage.local.store)) + if i == 0 { + le.PutUint64(b[i+16:], jsa.usageApi) + le.PutUint64(b[i+24:], jsa.usageErr) + le.PutUint32(b[i+32:], uint32(len(jsa.usage)-1)) + le.PutUint64(b[i+36:], uint64(len(tier))) + copy(b[i+44:], tier) + i += 44 + len(tier) + } else { + le.PutUint64(b[i+16:], uint64(len(tier))) + copy(b[i+24:], tier) + i += 24 + len(tier) + } + } jsa.sendq.push(newPubMsg(nil, jsa.updatesPub, _EMPTY_, nil, nil, b, noCompression, false, false)) } @@ -1595,16 +1738,74 @@ func (js *jetStream) limitsExceeded(storeType StorageType) bool { return js.wouldExceedLimits(storeType, 0) } -func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool { +func tierName(cfg *StreamConfig) string { + // TODO (mh) this is where we could select based off a placement tag as well "qos:tier" + return fmt.Sprintf("R%d", cfg.Replicas) +} + +func isSameTier(cfgA, cfgB *StreamConfig) bool { + // TODO (mh) this is where we could select based off a placement tag as well "qos:tier" + return cfgA.Replicas == cfgB.Replicas +} + +func (jsa *jsAccount) jetStreamAndClustered() (*jetStream, bool) { + jsa.mu.RLock() + js := jsa.js + jsa.mu.RUnlock() + return js, js.isClustered() +} + +// Read lock should be held. +func (jsa *jsAccount) selectLimits(cfg *StreamConfig) (JetStreamAccountLimits, string, bool) { + if selectedLimits, ok := jsa.limits[_EMPTY_]; ok { + return selectedLimits, _EMPTY_, true + } + tier := tierName(cfg) + if selectedLimits, ok := jsa.limits[tier]; ok { + return selectedLimits, tier, true + } + return JetStreamAccountLimits{}, _EMPTY_, false +} + +// Lock should be held. +func (jsa *jsAccount) countStreams(tier string, cfg *StreamConfig) int { + streams := len(jsa.streams) + if tier != _EMPTY_ { + streams = 0 + for _, sa := range jsa.streams { + if isSameTier(&sa.cfg, cfg) { + streams++ + } + } + } + return streams +} + +// Lock should be held. +func (jsa *jsAccount) storageTotals() (uint64, uint64) { + mem := uint64(0) + store := uint64(0) + for _, sa := range jsa.usage { + mem += uint64(sa.total.mem) + store += uint64(sa.total.store) + } + return mem, store +} + +func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string) bool { jsa.mu.RLock() defer jsa.mu.RUnlock() + selectedLimits, ok := jsa.limits[tierName] + if !ok { + return true + } if storeType == MemoryStorage { - if jsa.limits.MaxMemory >= 0 && jsa.memTotal > jsa.limits.MaxMemory { + if selectedLimits.MaxMemory >= 0 && jsa.usage[tierName].total.mem > selectedLimits.MaxMemory { return true } } else { - if jsa.limits.MaxStore >= 0 && jsa.storeTotal > jsa.limits.MaxStore { + if selectedLimits.MaxStore >= 0 && jsa.usage[tierName].total.store > selectedLimits.MaxStore { return true } } @@ -1613,47 +1814,46 @@ func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool { } // Check account limits. -func (jsa *jsAccount) checkAccountLimits(config *StreamConfig) error { - return jsa.checkLimits(config, false) +// Read Lock should be held +func (js *jetStream) checkAccountLimits(selected *JetStreamAccountLimits, config *StreamConfig, currentRes int64) error { + return js.checkLimits(selected, config, false, currentRes, 0) } // Check account and server limits. -func (jsa *jsAccount) checkAllLimits(config *StreamConfig) error { - return jsa.checkLimits(config, true) +// Read Lock should be held +func (js *jetStream) checkAllLimits(selected *JetStreamAccountLimits, config *StreamConfig, currentRes, maxBytesOffset int64) error { + return js.checkLimits(selected, config, true, currentRes, maxBytesOffset) } // Check if a new proposed msg set while exceed our account limits. // Lock should be held. -func (jsa *jsAccount) checkLimits(config *StreamConfig, checkServer bool) error { - if jsa.limits.MaxStreams > 0 && len(jsa.streams) >= jsa.limits.MaxStreams { - return NewJSMaximumStreamsLimitError() - } +func (js *jetStream) checkLimits(selected *JetStreamAccountLimits, config *StreamConfig, checkServer bool, currentRes, maxBytesOffset int64) error { // Check MaxConsumers - if config.MaxConsumers > 0 && jsa.limits.MaxConsumers > 0 && config.MaxConsumers > jsa.limits.MaxConsumers { + if config.MaxConsumers > 0 && selected.MaxConsumers > 0 && config.MaxConsumers > selected.MaxConsumers { return NewJSMaximumConsumersLimitError() } - + // stream limit is checked separately on stream create only! // Check storage, memory or disk. - return jsa.checkBytesLimits(config.MaxBytes, config.Storage, config.Replicas, checkServer) + return js.checkBytesLimits(selected, config.MaxBytes, config.Storage, config.Replicas, checkServer, currentRes, maxBytesOffset) } // Check if additional bytes will exceed our account limits and optionally the server itself. // This should account for replicas. -// Lock should be held. -func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, replicas int, checkServer bool) error { +// Read Lock should be held. +func (js *jetStream) checkBytesLimits(selectedLimits *JetStreamAccountLimits, addBytes int64, storage StorageType, replicas int, checkServer bool, currentRes, maxBytesOffset int64) error { if replicas < 1 { replicas = 1 } if addBytes < 0 { addBytes = 1 } - js, totalBytes := jsa.js, addBytes*int64(replicas) + totalBytes := (addBytes * int64(replicas)) + maxBytesOffset switch storage { case MemoryStorage: // Account limits defined. - if jsa.limits.MaxMemory >= 0 { - if jsa.memReserved+totalBytes > jsa.limits.MaxMemory { + if selectedLimits.MaxMemory >= 0 { + if currentRes+totalBytes > selectedLimits.MaxMemory { return NewJSMemoryResourcesExceededError() } } @@ -1663,8 +1863,8 @@ func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType, repl } case FileStorage: // Account limits defined. - if jsa.limits.MaxStore >= 0 { - if jsa.storeReserved+totalBytes > jsa.limits.MaxStore { + if selectedLimits.MaxStore >= 0 { + if currentRes+totalBytes > selectedLimits.MaxStore { return NewJSStorageResourcesExceededError() } } @@ -1748,7 +1948,7 @@ func (js *jetStream) usageStats() *JetStreamStats { // Check to see if we have enough system resources for this account. // Lock should be held. -func (js *jetStream) sufficientResources(limits *JetStreamAccountLimits) error { +func (js *jetStream) sufficientResources(limits map[string]JetStreamAccountLimits) error { // If we are clustered we do not really know how many resources will be ultimately available. // This needs to be handled out of band. // If we are a single server, we can make decisions here. @@ -1756,11 +1956,27 @@ func (js *jetStream) sufficientResources(limits *JetStreamAccountLimits) error { return nil } + totalMaxBytes := func(limits map[string]JetStreamAccountLimits) (int64, int64) { + totalMaxMemory := int64(0) + totalMaxStore := int64(0) + for _, l := range limits { + if l.MaxMemory > 0 { + totalMaxMemory += l.MaxMemory + } + if l.MaxStore > 0 { + totalMaxStore += l.MaxStore + } + } + return totalMaxMemory, totalMaxStore + } + + totalMaxMemory, totalMaxStore := totalMaxBytes(limits) + // Reserved is now specific to the MaxBytes for streams. - if js.memReserved+limits.MaxMemory > js.config.MaxMemory { + if js.memReserved+totalMaxMemory > js.config.MaxMemory { return NewJSMemoryResourcesExceededError() } - if js.storeReserved+limits.MaxStore > js.config.MaxStore { + if js.storeReserved+totalMaxStore > js.config.MaxStore { return NewJSStorageResourcesExceededError() } @@ -1768,19 +1984,16 @@ func (js *jetStream) sufficientResources(limits *JetStreamAccountLimits) error { var storeReserved, memReserved int64 for _, jsa := range js.accounts { jsa.mu.RLock() - if jsa.limits.MaxMemory > 0 { - memReserved += jsa.limits.MaxMemory - } - if jsa.limits.MaxStore > 0 { - storeReserved += jsa.limits.MaxStore - } + maxMemory, maxStore := totalMaxBytes(jsa.limits) + memReserved += maxMemory + storeReserved += maxStore jsa.mu.RUnlock() } - if memReserved+limits.MaxMemory > js.config.MaxMemory { + if memReserved+totalMaxMemory > js.config.MaxMemory { return NewJSMemoryResourcesExceededError() } - if storeReserved+limits.MaxStore > js.config.MaxStore { + if storeReserved+totalMaxStore > js.config.MaxStore { return NewJSStorageResourcesExceededError() } @@ -2254,7 +2467,7 @@ func validateJetStreamOptions(o *Options) error { } else { for _, acc := range o.Accounts { if a == acc.GetName() { - if acc.jsLimits != nil && domain != _EMPTY_ { + if len(acc.jsLimits) > 0 && domain != _EMPTY_ { return fmt.Errorf("default_js_domain contains account name %q with enabled JetStream", a) } found = true diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 3889909e..f8b8e999 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -846,7 +846,7 @@ func (a *Account) trackAPI() { a.mu.RUnlock() if jsa != nil { jsa.mu.Lock() - jsa.usage.api++ + jsa.usageApi++ jsa.apiTotal++ jsa.sendClusterUsageUpdate() atomic.AddInt64(&jsa.js.apiTotal, 1) @@ -860,9 +860,9 @@ func (a *Account) trackAPIErr() { a.mu.RUnlock() if jsa != nil { jsa.mu.Lock() - jsa.usage.api++ + jsa.usageApi++ jsa.apiTotal++ - jsa.usage.err++ + jsa.usageErr++ jsa.apiErrors++ jsa.sendClusterUsageUpdate() atomic.AddInt64(&jsa.js.apiTotal, 1) @@ -1145,6 +1145,31 @@ func (s *Server) jsonResponse(v interface{}) string { return string(b) } +// Read lock must be held +func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 { + reservation := int64(0) + if tier == _EMPTY_ { + for _, sa := range jsa.streams { + if sa.cfg.MaxBytes > 0 { + if sa.cfg.Storage == cfg.Storage && sa.cfg.Name != cfg.Name { + reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes) + } + } + } + } else { + for _, sa := range jsa.streams { + if sa.cfg.Replicas == cfg.Replicas { + if sa.cfg.MaxBytes > 0 { + if isSameTier(&sa.cfg, cfg) && sa.cfg.Name != cfg.Name { + reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes) + } + } + } + } + } + return reservation +} + // Request to create a stream. func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { @@ -1320,7 +1345,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, } // Check for MaxBytes required. - if acc.maxBytesRequired() && cfg.MaxBytes <= 0 { + if acc.maxBytesRequired(&cfg) && cfg.MaxBytes <= 0 { resp.Error = NewJSStreamMaxBytesRequiredError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -1332,6 +1357,23 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } + acc.mu.RLock() + jsa := acc.js + acc.mu.RUnlock() + + selectedLimits, tier, ok := jsa.selectLimits(&cfg) + if !ok { + resp.Error = NewJSNoLimitsError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, &cfg) >= selectedLimits.MaxStreams { + resp.Error = NewJSMaximumStreamsLimitError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + mset, err := acc.addStream(&cfg) if err != nil { resp.Error = NewJSStreamCreateError(err, Unless(err)) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d6fc94a6..40ce855c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1712,7 +1712,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps func (mset *stream) resetClusteredState(err error) bool { mset.mu.RLock() s, js, jsa, sa, acc, node := mset.srv, mset.js, mset.jsa, mset.sa, mset.acc, mset.node - stype, isLeader := mset.cfg.Storage, mset.isLeader() + stype, isLeader, tierName := mset.cfg.Storage, mset.isLeader(), mset.tier mset.mu.RUnlock() // Stepdown regardless if we are the leader here. @@ -1727,7 +1727,7 @@ func (mset *stream) resetClusteredState(err error) bool { } // Account - if jsa.limitsExceeded(stype) { + if jsa.limitsExceeded(stype, tierName) { s.Warnf("stream '%s > %s' errored, account resources exceeded", acc, mset.name()) return false } @@ -3883,6 +3883,35 @@ func groupName(prefix string, peers []string, storage StorageType) string { return fmt.Sprintf("%s-R%d%s-%s", prefix, len(peers), storage.String()[:1], gns) } +// returns stream count for this tier as well as applicable reservation size (not including reservations for cfg) +// jetStream read lock should be held +func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier string, cfg *StreamConfig) (int, int64) { + numStreams := len(asa) + reservation := int64(0) + if tier == _EMPTY_ { + for _, sa := range asa { + if sa.Config.MaxBytes > 0 && sa.Config.Name != cfg.Name { + if sa.Config.Storage == cfg.Storage { + reservation += (int64(sa.Config.Replicas) * sa.Config.MaxBytes) + } + } + } + } else { + numStreams = 0 + for _, sa := range asa { + if isSameTier(sa.Config, cfg) { + numStreams++ + if sa.Config.MaxBytes > 0 { + if sa.Config.Storage == cfg.Storage && sa.Config.Name != cfg.Name { + reservation += (int64(sa.Config.Replicas) * sa.Config.MaxBytes) + } + } + } + } + } + return numStreams, reservation +} + // createGroupForStream will create a group for assignment for the stream. // Lock should be held. func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *raftGroup { @@ -3941,28 +3970,36 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } cfg := &ccfg + jsa.mu.RLock() + selectedLimits, tier, ok := jsa.selectLimits(config) + jsa.mu.RUnlock() + + if !ok { + resp.Error = NewJSNoLimitsError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + // Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa. js.mu.RLock() asa := cc.streams[acc.Name] - numStreams := len(asa) - js.mu.RUnlock() + numStreams, reservations := tieredStreamAndReservationCount(asa, tier, config) - jsa.mu.RLock() - exceeded := jsa.limits.MaxStreams > 0 && numStreams >= jsa.limits.MaxStreams - jsa.mu.RUnlock() - - if exceeded { + if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams { resp.Error = NewJSMaximumStreamsLimitError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + js.mu.RUnlock() return } // Check for account limits here before proposing. - if err := jsa.checkAccountLimits(cfg); err != nil { + if err := js.checkAccountLimits(&selectedLimits, cfg, reservations); err != nil { resp.Error = NewJSStreamLimitsError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + js.mu.RUnlock() return } + js.mu.RUnlock() // Now process the request and proposal. js.mu.Lock() @@ -4044,7 +4081,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { - if ncfg, err := jsa.configUpdateCheck(osa.Config, cfg); err != nil { + js.mu.Unlock() + ncfg, err := jsa.configUpdateCheck(osa.Config, cfg) + js.mu.Lock() + if err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return @@ -5008,7 +5048,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 name, stype := mset.cfg.Name, mset.cfg.Storage - s, js, jsa, st, rf, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq, mset.node + s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq mset.mu.RUnlock() @@ -5034,14 +5074,32 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Check here pre-emptively if we have exceeded our account limits. var exceeded bool jsa.mu.RLock() + jsaLimits, ok := jsa.limits[tierName] + if !ok { + jsa.mu.RUnlock() + err := fmt.Errorf("no JetStream resource limits found account: %q", jsa.acc().Name) + s.Warnf(err.Error()) + if canRespond { + var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} + resp.Error = NewJSNoLimitsError() + response, _ = json.Marshal(resp) + outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) + } + return err + } + t, ok := jsa.usage[tierName] + if !ok { + t = &jsaStorage{} + jsa.usage[tierName] = t + } if st == MemoryStorage { - total := jsa.storeTotal + int64(memStoreMsgSize(subject, hdr, msg)*uint64(rf)) - if jsa.limits.MaxMemory > 0 && total > jsa.limits.MaxMemory { + total := t.total.store + int64(memStoreMsgSize(subject, hdr, msg)*uint64(rf)) + if jsaLimits.MaxMemory > 0 && total > jsaLimits.MaxMemory { exceeded = true } } else { - total := jsa.storeTotal + int64(fileStoreMsgSize(subject, hdr, msg)*uint64(rf)) - if jsa.limits.MaxStore > 0 && total > jsa.limits.MaxStore { + total := t.total.store + int64(fileStoreMsgSize(subject, hdr, msg)*uint64(rf)) + if jsaLimits.MaxStore > 0 && total > jsaLimits.MaxStore { exceeded = true } } @@ -5416,9 +5474,10 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) { mset.mu.RLock() st := mset.cfg.Storage ddloaded := mset.ddloaded + tierName := mset.tier mset.mu.RUnlock() - if mset.js.limitsExceeded(st) || mset.jsa.limitsExceeded(st) { + if mset.js.limitsExceeded(st) || mset.jsa.limitsExceeded(st, tierName) { return 0, NewJSInsufficientResourcesError() } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 3558c26b..ffb41ba5 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -2978,11 +2978,13 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) { defer c.shutdown() // Adjust our limits. - c.updateLimits("$G", &JetStreamAccountLimits{ - MaxMemory: 1024, - MaxStore: 8000, - MaxStreams: 3, - MaxConsumers: 1, + c.updateLimits("$G", map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: 1024, + MaxStore: 8000, + MaxStreams: 3, + MaxConsumers: 1, + }, }) // Client based API @@ -9815,6 +9817,45 @@ func TestJetStreamSuperClusterPushConsumerInterest(t *testing.T) { } } +func TestJetStreamClusterAccountReservations(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, jsClusterMaxBytesAccountLimitTempl, "C1", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + accMax := 3 + + test := func(t *testing.T, replica int) { + mb := int64((1+accMax)-replica) * 1024 * 1024 * 1024 // GB, corrected for replication factor + _, err := js.AddStream(&nats.StreamConfig{Name: "S1", Subjects: []string{"s1"}, MaxBytes: mb, Replicas: replica}) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{Name: "S2", Subjects: []string{"s2"}, MaxBytes: 1024, Replicas: replica}) + require_Error(t, err) + require_Equal(t, err.Error(), "insufficient storage resources available") + + _, err = js.UpdateStream(&nats.StreamConfig{Name: "S1", Subjects: []string{"s1"}, MaxBytes: mb / 2, Replicas: replica}) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{Name: "S2", Subjects: []string{"s2"}, MaxBytes: mb / 2, Replicas: replica}) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{Name: "S3", Subjects: []string{"s3"}, MaxBytes: 1024, Replicas: replica}) + require_Error(t, err) + require_Equal(t, err.Error(), "insufficient storage resources available") + + _, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Subjects: []string{"s2"}, MaxBytes: mb/2 + 1, Replicas: replica}) + require_Error(t, err) + require_Equal(t, err.Error(), "insufficient storage resources available") + + require_NoError(t, js.DeleteStream("S1")) + require_NoError(t, js.DeleteStream("S2")) + } + test(t, 3) + test(t, 1) +} + func TestJetStreamClusterOverflowPlacement(t *testing.T) { sc := createJetStreamSuperClusterWithTemplate(t, jsClusterMaxBytesTempl, 3, 3) defer sc.shutdown() @@ -9891,6 +9932,47 @@ func TestJetStreamClusterOverflowPlacement(t *testing.T) { } } +func TestJetStreamClusterConcurrentAccountLimits(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, jsClusterMaxBytesAccountLimitTempl, "cluster", 3) + defer c.shutdown() + + startCh := make(chan bool) + var wg sync.WaitGroup + var swg sync.WaitGroup + failCount := int32(0) + + start := func(name string) { + wg.Add(1) + defer wg.Done() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + swg.Done() + <-startCh + + _, err := js.AddStream(&nats.StreamConfig{ + Name: name, + Replicas: 3, + MaxBytes: 1024 * 1024 * 1024, + }) + if err != nil { + atomic.AddInt32(&failCount, 1) + require_Equal(t, err.Error(), "insufficient storage resources available") + } + } + + swg.Add(2) + go start("foo") + go start("bar") + swg.Wait() + // Now start both at same time. + close(startCh) + wg.Wait() + require_True(t, failCount == 1) +} + func TestJetStreamClusterConcurrentOverflow(t *testing.T) { sc := createJetStreamSuperClusterWithTemplate(t, jsClusterMaxBytesTempl, 3, 3) defer sc.shutdown() @@ -9902,7 +9984,6 @@ func TestJetStreamClusterConcurrentOverflow(t *testing.T) { var swg sync.WaitGroup start := func(name string) { - wg.Add(1) defer wg.Done() s := sc.clusterForName(pcn).randomServer() @@ -9919,7 +10000,7 @@ func TestJetStreamClusterConcurrentOverflow(t *testing.T) { }) require_NoError(t, err) } - + wg.Add(2) swg.Add(2) go start("foo") go start("bar") @@ -11589,7 +11670,37 @@ var jsClusterMaxBytesTempl = ` users = [ { user: "u", pass: "p" } ] jetstream: { max_mem: 128MB - max_file: 8GB + max_file: 18GB + max_bytes: true // Forces streams to indicate max_bytes. + } + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + +var jsClusterMaxBytesAccountLimitTempl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 4GB, store_dir: '%s'} + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + no_auth_user: u + + accounts { + $U { + users = [ { user: "u", pass: "p" } ] + jetstream: { + max_mem: 128MB + max_file: 3GB max_bytes: true // Forces streams to indicate max_bytes. } } @@ -12245,7 +12356,7 @@ func (c *cluster) addSubjectMapping(account, src, dest string) { } // Adjust limits for the given account. -func (c *cluster) updateLimits(account string, newLimits *JetStreamAccountLimits) { +func (c *cluster) updateLimits(account string, newLimits map[string]JetStreamAccountLimits) { c.t.Helper() for _, s := range c.servers { acc, err := s.LookupAccount(account) diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index b71bbf9e..3d35af6d 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -2,7 +2,9 @@ package server -import "strings" +import ( + "strings" +) const ( // JSAccountResourcesExceededErr resource limits exceeded for account @@ -206,6 +208,9 @@ const ( // JSNoAccountErr account not found JSNoAccountErr ErrorIdentifier = 10035 + // JSNoLimitsErr no JetStream default or applicable tiered limit present + JSNoLimitsErr ErrorIdentifier = 10120 + // JSNoMessageFoundErr no message found JSNoMessageFoundErr ErrorIdentifier = 10037 @@ -429,6 +434,7 @@ var ( JSMirrorWithSubjectFiltersErr: {Code: 400, ErrCode: 10033, Description: "stream mirrors can not contain filtered subjects"}, JSMirrorWithSubjectsErr: {Code: 400, ErrCode: 10034, Description: "stream mirrors can not also contain subjects"}, JSNoAccountErr: {Code: 503, ErrCode: 10035, Description: "account not found"}, + JSNoLimitsErr: {Code: 400, ErrCode: 10120, Description: "no JetStream default or applicable tiered limit present"}, JSNoMessageFoundErr: {Code: 404, ErrCode: 10037, Description: "no message found"}, JSNotEmptyRequestErr: {Code: 400, ErrCode: 10038, Description: "expected an empty request payload"}, JSNotEnabledErr: {Code: 503, ErrCode: 10076, Description: "JetStream not enabled"}, @@ -1217,6 +1223,16 @@ func NewJSNoAccountError(opts ...ErrorOption) *ApiError { return ApiErrors[JSNoAccountErr] } +// NewJSNoLimitsError creates a new JSNoLimitsErr error: "no JetStream default or applicable tiered limit present" +func NewJSNoLimitsError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSNoLimitsErr] +} + // NewJSNoMessageFoundError creates a new JSNoMessageFoundErr error: "no message found" func NewJSNoMessageFoundError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index b0c9b0fa..d89b7635 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -4123,7 +4123,7 @@ func TestJetStreamSnapshots(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } // Now compare to make sure they are equal. - if nusage := acc.JetStreamUsage(); nusage != pusage { + if nusage := acc.JetStreamUsage(); !reflect.DeepEqual(nusage, pusage) { t.Fatalf("Usage does not match after restore: %+v vs %+v", nusage, pusage) } if state := mset.state(); !reflect.DeepEqual(state, info.state) { @@ -6661,12 +6661,14 @@ func TestJetStreamSystemLimits(t *testing.T) { t.Fatalf("Expected reserved memory and store to be 0, got %d and %d", rm, rd) } - limits := func(mem int64, store int64) *JetStreamAccountLimits { - return &JetStreamAccountLimits{ - MaxMemory: mem, - MaxStore: store, - MaxStreams: -1, - MaxConsumers: -1, + limits := func(mem int64, store int64) map[string]JetStreamAccountLimits { + return map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: mem, + MaxStore: store, + MaxStreams: -1, + MaxConsumers: -1, + }, } } @@ -6701,11 +6703,12 @@ func TestJetStreamSystemLimits(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } // Test Adjust - l := limits(jsconfig.MaxMemory, jsconfig.MaxStore) + lim := limits(jsconfig.MaxMemory, jsconfig.MaxStore) + l := lim[_EMPTY_] l.MaxStreams = 10 l.MaxConsumers = 10 - - if err := facc.UpdateJetStreamLimits(l); err != nil { + lim[_EMPTY_] = l + if err := facc.UpdateJetStreamLimits(lim); err != nil { t.Fatalf("Unexpected error updating jetstream account limits: %v", err) } @@ -6720,11 +6723,6 @@ func TestJetStreamSystemLimits(t *testing.T) { msets = append(msets, mset) } - // This one should fail since over the limit for max number of streams. - if _, err := facc.addStream(&StreamConfig{Name: "22", Storage: MemoryStorage, Subjects: []string{"foo.22"}}); err == nil { - t.Fatalf("Expected error adding stream over limit") - } - // Remove them all for _, mset := range msets { mset.delete() @@ -6788,7 +6786,8 @@ func TestJetStreamSystemLimits(t *testing.T) { } l.MaxConsumers = 5 - if err := facc.UpdateJetStreamLimits(l); err != nil { + lim[_EMPTY_] = l + if err := facc.UpdateJetStreamLimits(lim); err != nil { t.Fatalf("Unexpected error updating jetstream account limits: %v", err) } @@ -7347,18 +7346,18 @@ gateway { } func TestStreamLimitUpdate(t *testing.T) { - t.Skip("shouldn't fail") - s := RunBasicJetStreamServer() if config := s.JetStreamConfig(); config != nil { defer removeDir(t, config.StoreDir) } defer s.Shutdown() - err := s.GlobalAccount().UpdateJetStreamLimits(&JetStreamAccountLimits{ - MaxMemory: 128, - MaxStore: 128, - MaxStreams: 1, + err := s.GlobalAccount().UpdateJetStreamLimits(map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: 128, + MaxStore: 128, + MaxStreams: 1, + }, }) require_NoError(t, err) @@ -7395,11 +7394,13 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { gacc := s.GlobalAccount() - al := &JetStreamAccountLimits{ - MaxMemory: 8192, - MaxStore: -1, - MaxStreams: -1, - MaxConsumers: -1, + al := map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: 8192, + MaxStore: -1, + MaxStreams: -1, + MaxConsumers: -1, + }, } if err := gacc.UpdateJetStreamLimits(al); err != nil { @@ -7498,12 +7499,13 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { state = mset.state() usage = gacc.JetStreamUsage() - if usage.Memory > uint64(al.MaxMemory) { - t.Fatalf("Expected memory to not exceed limit of %d, got %d", al.MaxMemory, usage.Memory) + lim := al[_EMPTY_] + if usage.Memory > uint64(lim.MaxMemory) { + t.Fatalf("Expected memory to not exceed limit of %d, got %d", lim.MaxMemory, usage.Memory) } // make sure that unlimited accounts work - al.MaxMemory = -1 + lim.MaxMemory = -1 if err := gacc.UpdateJetStreamLimits(al); err != nil { t.Fatalf("Unexpected error updating jetstream account limits: %v", err) @@ -7523,11 +7525,13 @@ func TestJetStreamStreamFileTrackingAndLimits(t *testing.T) { gacc := s.GlobalAccount() - al := &JetStreamAccountLimits{ - MaxMemory: 8192, - MaxStore: 9600, - MaxStreams: -1, - MaxConsumers: -1, + al := map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: 8192, + MaxStore: 9600, + MaxStreams: -1, + MaxConsumers: -1, + }, } if err := gacc.UpdateJetStreamLimits(al); err != nil { @@ -7628,8 +7632,51 @@ func TestJetStreamStreamFileTrackingAndLimits(t *testing.T) { state = mset.state() usage = gacc.JetStreamUsage() - if usage.Memory > uint64(al.MaxMemory) { - t.Fatalf("Expected memory to not exceed limit of %d, got %d", al.MaxMemory, usage.Memory) + lim := al[_EMPTY_] + if usage.Memory > uint64(lim.MaxMemory) { + t.Fatalf("Expected memory to not exceed limit of %d, got %d", lim.MaxMemory, usage.Memory) + } +} + +func TestJetStreamTieredLimits(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + gacc := s.GlobalAccount() + + tFail := map[string]JetStreamAccountLimits{ + "nottheer": { + MaxMemory: 8192, + MaxStore: 9600, + MaxStreams: -1, + MaxConsumers: -1, + }, + } + + if err := gacc.UpdateJetStreamLimits(tFail); err != nil { + t.Fatalf("Unexpected error updating jetstream account limits: %v", err) + } + + mconfig := &StreamConfig{Name: "LIMITS", Storage: FileStorage, Retention: WorkQueuePolicy} + mset, err := gacc.addStream(mconfig) + defer mset.delete() + require_Error(t, err) + require_Contains(t, err.Error(), "no JetStream default or applicable tiered limit present") + + tPass := map[string]JetStreamAccountLimits{ + "R1": { + MaxMemory: 8192, + MaxStore: 9600, + MaxStreams: -1, + MaxConsumers: -1, + }, + } + + if err := gacc.UpdateJetStreamLimits(tPass); err != nil { + t.Fatalf("Unexpected error updating jetstream account limits: %v", err) } } @@ -7731,7 +7778,7 @@ func TestJetStreamSimpleFileRecovery(t *testing.T) { acc = s.GlobalAccount() nusage := acc.JetStreamUsage() - if nusage != pusage { + if !reflect.DeepEqual(nusage, pusage) { t.Fatalf("Usage does not match after restore: %+v vs %+v", nusage, pusage) } @@ -15803,23 +15850,22 @@ func TestStorageReservedBytes(t *testing.T) { createMaxBytes: int64(math.Round(float64(systemLimit/2) * .666)), updateMaxBytes: int64(math.Round(float64(systemLimit/2)*.666)) + 1, }, - // TODO: Enable these once account limits are enforced. - //{ - // name: "file update past account limit", - // accountLimit: systemLimit / 2, - // storage: nats.FileStorage, - // createMaxBytes: (systemLimit / 2), - // updateMaxBytes: (systemLimit / 2) + 1, - // wantUpdateError: true, - //}, - //{ - // name: "memory update past account limit", - // accountLimit: systemLimit / 2, - // storage: nats.MemoryStorage, - // createMaxBytes: (systemLimit / 2), - // updateMaxBytes: (systemLimit / 2) + 1, - // wantUpdateError: true, - //}, + { + name: "file update past account limit", + accountLimit: systemLimit / 2, + storage: nats.FileStorage, + createMaxBytes: (systemLimit / 2), + updateMaxBytes: (systemLimit / 2) + 1, + wantUpdateError: true, + }, + { + name: "memory update past account limit", + accountLimit: systemLimit / 2, + storage: nats.MemoryStorage, + createMaxBytes: (systemLimit / 2), + updateMaxBytes: (systemLimit / 2) + 1, + wantUpdateError: true, + }, { name: "file update to account limit", accountLimit: systemLimit / 2, @@ -15839,9 +15885,11 @@ func TestStorageReservedBytes(t *testing.T) { c := cases[i] t.Run(c.name, func(st *testing.T) { // Setup limits - err = s.GlobalAccount().UpdateJetStreamLimits(&JetStreamAccountLimits{ - MaxMemory: c.accountLimit, - MaxStore: c.accountLimit, + err = s.GlobalAccount().UpdateJetStreamLimits(map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxMemory: c.accountLimit, + MaxStore: c.accountLimit, + }, }) require_NoError(st, err) diff --git a/server/jwt_test.go b/server/jwt_test.go index a391a71b..06ba72d7 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -5304,6 +5304,314 @@ func TestJWTResponseThreshold(t *testing.T) { }) } +func TestJWTJetStreamTiers(t *testing.T) { + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) + defer removeFile(t, sysCreds) + + accKp, accPub := createKey(t) + accClaim := jwt.NewAccountClaims(accPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: 1100, MemoryStorage: 0, Consumer: 2, Streams: 2} + accJwt1 := encodeClaim(t, accClaim, accPub) + accCreds := newUser(t, accKp) + + start := time.Now() + + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + dirSrv := createDir(t, "srv") + defer removeDir(t, dirSrv) + cf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: s1 + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + } + `, storeDir, ojwt, syspub, dirSrv))) + defer removeFile(t, cf) + + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + + updateJwt(t, s.ClientURL(), sysCreds, sysJwt, 1) + updateJwt(t, s.ClientURL(), sysCreds, accJwt1, 1) + + nc := natsConnect(t, s.ClientURL(), nats.UserCredentials(accCreds)) + defer nc.Close() + + js, err := nc.JetStream() + require_NoError(t, err) + + // Test tiers up to stream limits + _, err = js.AddStream(&nats.StreamConfig{Name: "testR1-1", Replicas: 1, Subjects: []string{"testR1-1"}}) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "testR1-2", Replicas: 1, Subjects: []string{"testR1-2"}}) + require_NoError(t, err) + + // Test exceeding tiered stream limit + _, err = js.AddStream(&nats.StreamConfig{Name: "testR1-3", Replicas: 1, Subjects: []string{"testR1-3"}}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum number of streams reached") + + // Test tiers up to consumer limits + _, err = js.AddConsumer("testR1-1", &nats.ConsumerConfig{Durable: "dur1", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + _, err = js.AddConsumer("testR1-1", &nats.ConsumerConfig{Durable: "dur3", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + // test exceeding tiered consumer limits + _, err = js.AddConsumer("testR1-1", &nats.ConsumerConfig{Durable: "dur4", AckPolicy: nats.AckExplicitPolicy}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum consumers limit reached") + _, err = js.AddConsumer("testR1-1", &nats.ConsumerConfig{Durable: "dur5", AckPolicy: nats.AckExplicitPolicy}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum consumers limit reached") + + // test tiered storage limit + msg := [512]byte{} + _, err = js.Publish("testR1-1", msg[:]) + require_NoError(t, err) + _, err = js.Publish("testR1-2", msg[:]) + require_NoError(t, err) + + // test exceeding tiered storage limit + _, err = js.Publish("testR1-1", []byte("1")) + require_Error(t, err) + require_Equal(t, err.Error(), "nats: resource limits exceeded for account") + + time.Sleep(time.Second - time.Since(start)) // make sure the time stamp changes + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: 1650, MemoryStorage: 0, Consumer: 1, Streams: 3} + accJwt2 := encodeClaim(t, accClaim, accPub) + updateJwt(t, s.ClientURL(), sysCreds, accJwt2, 1) + + // test same sequence as before, add stream, fail add stream, add consumer, fail add consumer, publish, fail publish + _, err = js.AddStream(&nats.StreamConfig{Name: "testR1-3", Replicas: 1, Subjects: []string{"testR1-3"}}) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "testR1-4", Replicas: 1, Subjects: []string{"testR1-4"}}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum number of streams reached") + _, err = js.AddConsumer("testR1-3", &nats.ConsumerConfig{Durable: "dur6", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + _, err = js.AddConsumer("testR1-3", &nats.ConsumerConfig{Durable: "dur7", AckPolicy: nats.AckExplicitPolicy}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum consumers limit reached") + _, err = js.Publish("testR1-3", msg[:]) + require_NoError(t, err) + _, err = js.Publish("testR1-3", []byte("1")) + require_Error(t, err) + require_Equal(t, err.Error(), "nats: resource limits exceeded for account") +} + +func TestJWTClusteredJetStreamTiers(t *testing.T) { + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) + defer removeFile(t, sysCreds) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: 1100, MemoryStorage: 0, Consumer: 2, Streams: 2} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ + DiskStorage: 3100, MemoryStorage: 0, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + tmlp := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + fmt.Sprintf(` + operator: %s + system_account: %s + resolver = MEMORY + resolver_preload = { + %s : %s + %s : %s + } + `, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + + c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3) + defer c.shutdown() + + nc := natsConnect(t, c.randomServer().ClientURL(), nats.UserCredentials(accCreds)) + defer nc.Close() + + js, err := nc.JetStream() + require_NoError(t, err) + + // Test absent tiers + _, err = js.AddStream(&nats.StreamConfig{Name: "testR2", Replicas: 2, Subjects: []string{"testR2"}}) + require_Error(t, err) + require_Equal(t, err.Error(), "no JetStream default or applicable tiered limit present") + _, err = js.AddStream(&nats.StreamConfig{Name: "testR5", Replicas: 5, Subjects: []string{"testR5"}}) + require_Error(t, err) + require_Equal(t, err.Error(), "no JetStream default or applicable tiered limit present") + + // Test tiers up to stream limits + _, err = js.AddStream(&nats.StreamConfig{Name: "testR1-1", Replicas: 1, Subjects: []string{"testR1-1"}}) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "testR3-1", Replicas: 3, Subjects: []string{"testR3-1"}}) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "testR1-2", Replicas: 1, Subjects: []string{"testR1-2"}}) + require_NoError(t, err) + + // Test exceeding tiered stream limit + _, err = js.AddStream(&nats.StreamConfig{Name: "testR1-3", Replicas: 1, Subjects: []string{"testR1-3"}}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum number of streams reached") + _, err = js.AddStream(&nats.StreamConfig{Name: "testR3-3", Replicas: 3, Subjects: []string{"testR3-3"}}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum number of streams reached") + + // Test tiers up to consumer limits + _, err = js.AddConsumer("testR1-1", &nats.ConsumerConfig{Durable: "dur1", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + _, err = js.AddConsumer("testR3-1", &nats.ConsumerConfig{Durable: "dur2", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + _, err = js.AddConsumer("testR1-1", &nats.ConsumerConfig{Durable: "dur3", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + // test exceeding tiered consumer limits + _, err = js.AddConsumer("testR1-1", &nats.ConsumerConfig{Durable: "dur4", AckPolicy: nats.AckExplicitPolicy}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum consumers limit reached") + _, err = js.AddConsumer("testR1-1", &nats.ConsumerConfig{Durable: "dur5", AckPolicy: nats.AckExplicitPolicy}) + require_Error(t, err) + require_Equal(t, err.Error(), "maximum consumers limit reached") + + // test tiered storage limit + msg := [512]byte{} + _, err = js.Publish("testR1-1", msg[:]) + require_NoError(t, err) + _, err = js.Publish("testR3-1", msg[:]) + require_NoError(t, err) + _, err = js.Publish("testR3-1", msg[:]) + require_NoError(t, err) + _, err = js.Publish("testR1-2", msg[:]) + require_NoError(t, err) + + time.Sleep(2000 * time.Millisecond) // wait for update timer to synchronize totals + + // test exceeding tiered storage limit + _, err = js.Publish("testR1-1", []byte("1")) + require_Error(t, err) + require_Equal(t, err.Error(), "nats: resource limits exceeded for account") + _, err = js.Publish("testR3-1", []byte("fail this message!")) + require_Error(t, err) + require_Equal(t, err.Error(), "nats: resource limits exceeded for account") +} + +func TestJWTClusteredJetStreamTiersChange(t *testing.T) { + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) + defer removeFile(t, sysCreds) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: 1000, MemoryStorage: 0, Consumer: 1, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ + DiskStorage: 1500, MemoryStorage: 0, Consumer: 1, Streams: 1} + accJwt1 := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + + start := time.Now() + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + dirSrv := createDir(t, "srv") + defer removeDir(t, dirSrv) + + tmlp := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + fmt.Sprintf(` + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + } + `, ojwt, syspub, dirSrv) + + c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3) + defer c.shutdown() + + updateJwt(t, c.randomServer().ClientURL(), sysCreds, sysJwt, 3) + updateJwt(t, c.randomServer().ClientURL(), sysCreds, accJwt1, 3) + + nc := natsConnect(t, c.randomServer().ClientURL(), nats.UserCredentials(accCreds)) + defer nc.Close() + + js, err := nc.JetStream() + require_NoError(t, err) + + // Test tiers up to stream limits + cfg := &nats.StreamConfig{Name: "testR1-1", Replicas: 1, Subjects: []string{"testR1-1"}, MaxBytes: 1000} + _, err = js.AddStream(cfg) + require_NoError(t, err) + + cfg.Replicas = 3 + _, err = js.UpdateStream(cfg) + require_Error(t, err) + require_Equal(t, err.Error(), "insufficient storage resources available") + + time.Sleep(time.Second - time.Since(start)) // make sure the time stamp changes + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ + DiskStorage: 3000, MemoryStorage: 0, Consumer: 1, Streams: 1} + accJwt2 := encodeClaim(t, accClaim, aExpPub) + + updateJwt(t, c.randomServer().ClientURL(), sysCreds, accJwt2, 3) + + var rBefore, rAfter JSApiAccountInfoResponse + m, err := nc.Request("$JS.API.INFO", nil, time.Second) + require_NoError(t, err) + err = json.Unmarshal(m.Data, &rBefore) + require_NoError(t, err) + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + + m, err = nc.Request("$JS.API.INFO", nil, time.Second) + require_NoError(t, err) + err = json.Unmarshal(m.Data, &rAfter) + require_NoError(t, err) + require_True(t, rBefore.Tiers["R1"].Streams == 1) + require_True(t, rBefore.Tiers["R1"].Streams == rAfter.Tiers["R3"].Streams) + require_True(t, rBefore.Tiers["R3"].Streams == 0) + require_True(t, rAfter.Tiers["R1"].Streams == 0) +} + func TestJWTQueuePermissions(t *testing.T) { aExpKp, aExpPub := createKey(t) aExpClaim := jwt.NewAccountClaims(aExpPub) diff --git a/server/monitor.go b/server/monitor.go index b9324486..18b9d2bd 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2501,12 +2501,13 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg if acc.nameTag != "" { name = acc.nameTag } + totalMem, totalStore := jsa.storageTotals() detail := AccountDetail{ Name: name, Id: id, JetStreamStats: JetStreamStats{ - Memory: uint64(jsa.memTotal), - Store: uint64(jsa.storeTotal), + Memory: totalMem, + Store: totalStore, API: JetStreamAPIStats{ Total: jsa.apiTotal, Errors: jsa.apiErrors, diff --git a/server/mqtt_test.go b/server/mqtt_test.go index bc6e6db3..0774dcda 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -949,11 +949,13 @@ func testMQTTEnableJSForAccount(t *testing.T, s *Server, accName string) { if err != nil { t.Fatalf("Error looking up account: %v", err) } - limits := &JetStreamAccountLimits{ - MaxConsumers: -1, - MaxStreams: -1, - MaxMemory: 1024 * 1024, - MaxStore: 1024 * 1024, + limits := map[string]JetStreamAccountLimits{ + _EMPTY_: { + MaxConsumers: -1, + MaxStreams: -1, + MaxMemory: 1024 * 1024, + MaxStore: 1024 * 1024, + }, } if err := acc.EnableJetStream(limits); err != nil { t.Fatalf("Error enabling JS: %v", err) diff --git a/server/opts.go b/server/opts.go index 6cf87719..c836b64e 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1653,7 +1653,8 @@ func parseGateway(v interface{}, o *Options, errors *[]error, warnings *[]error) return nil } -var dynamicJSAccountLimits = &JetStreamAccountLimits{-1, -1, -1, -1, false} +var dynamicJSAccountLimits = JetStreamAccountLimits{-1, -1, -1, -1, false} +var defaultJSAccountTiers = map[string]JetStreamAccountLimits{_EMPTY_: dynamicJSAccountLimits} // Parses jetstream account limits for an account. Simple setup with boolen is allowed, and we will // use dynamic account limits. @@ -1666,19 +1667,19 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn switch vv := v.(type) { case bool: if vv { - acc.jsLimits = dynamicJSAccountLimits + acc.jsLimits = defaultJSAccountTiers } case string: switch strings.ToLower(vv) { case "enabled", "enable": - acc.jsLimits = dynamicJSAccountLimits + acc.jsLimits = defaultJSAccountTiers case "disabled", "disable": acc.jsLimits = nil default: return &configErr{tk, fmt.Sprintf("Expected 'enabled' or 'disabled' for string value, got '%s'", vv)} } case map[string]interface{}: - jsLimits := &JetStreamAccountLimits{-1, -1, -1, -1, false} + jsLimits := JetStreamAccountLimits{-1, -1, -1, -1, false} for mk, mv := range vv { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { @@ -1725,7 +1726,7 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn } } } - acc.jsLimits = jsLimits + acc.jsLimits = map[string]JetStreamAccountLimits{_EMPTY_: jsLimits} default: return &configErr{tk, fmt.Sprintf("Expected map, bool or string to define JetStream, got %T", v)} } diff --git a/server/server.go b/server/server.go index 675d290d..519a057c 100644 --- a/server/server.go +++ b/server/server.go @@ -1357,7 +1357,7 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account { acc.srv = s acc.updated = time.Now().UTC() accName := acc.Name - jsEnabled := acc.jsLimits != nil + jsEnabled := len(acc.jsLimits) > 0 acc.mu.Unlock() if opts := s.getOpts(); opts != nil && len(opts.JsAccDefaultDomain) > 0 { @@ -1709,7 +1709,7 @@ func (s *Server) Start() { // own system account if one is not present. if opts.JetStream { // Make sure someone is not trying to enable on the system account. - if sa := s.SystemAccount(); sa != nil && sa.jsLimits != nil { + if sa := s.SystemAccount(); sa != nil && len(sa.jsLimits) > 0 { s.Fatalf("Not allowed to enable JetStream on the system account") } cfg := &JetStreamConfig{ @@ -1737,7 +1737,7 @@ func (s *Server) Start() { hasGlobal = true } acc.mu.RLock() - hasJs := acc.jsLimits != nil + hasJs := len(acc.jsLimits) > 0 acc.mu.RUnlock() if hasJs { s.checkJetStreamExports() @@ -1749,7 +1749,9 @@ func (s *Server) Start() { // go ahead and enable JS on $G in case we are in simple mixed mode setup. if total == 2 && hasSys && hasGlobal && !s.standAloneMode() { ga.mu.Lock() - ga.jsLimits = dynamicJSAccountLimits + ga.jsLimits = map[string]JetStreamAccountLimits{ + _EMPTY_: dynamicJSAccountLimits, + } ga.mu.Unlock() s.checkJetStreamExports() ga.enableAllJetStreamServiceImportsAndMappings() diff --git a/server/store.go b/server/store.go index 84dc8757..5aa91949 100644 --- a/server/store.go +++ b/server/store.go @@ -451,7 +451,7 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { } func isOutOfSpaceErr(err error) bool { - return err != nil && strings.Contains(err.Error(), "no space left") + return err != nil && (strings.Contains(err.Error(), "no space left")) } // For when our upper layer catchup detects its missing messages from the beginning of the stream. diff --git a/server/stream.go b/server/stream.go index eb201a53..78fe76a2 100644 --- a/server/stream.go +++ b/server/stream.go @@ -173,6 +173,7 @@ type stream struct { cfg StreamConfig created time.Time stype StorageType + tier string ddmap map[string]*ddentry ddarr []*ddentry ddindex int @@ -298,10 +299,10 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, ApiErrors[JSStreamReplicasNotSupportedErr] } - jsa.mu.Lock() - js := jsa.js + js, isClustered := jsa.jetStreamAndClustered() + jsa.mu.RLock() if mset, ok := jsa.streams[cfg.Name]; ok { - jsa.mu.Unlock() + jsa.mu.RUnlock() // Check to see if configs are same. ocfg := mset.config() if reflect.DeepEqual(ocfg, cfg) { @@ -313,11 +314,25 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, ApiErrors[JSStreamNameExistErr] } } - // Check for account and server limits. - if err := jsa.checkAllLimits(&cfg); err != nil { - jsa.mu.Unlock() + selected, tier, hasTier := jsa.selectLimits(&cfg) + reserved := int64(0) + if !isClustered { + reserved = jsa.tieredReservation(tier, &cfg) + } + jsa.mu.RUnlock() + if !hasTier { + return nil, NewJSNoLimitsError() + } + js.mu.RLock() + if isClustered { + _, reserved = tieredStreamAndReservationCount(js.cluster.streams[a.Name], tier, &cfg) + } + if err := js.checkAllLimits(&selected, &cfg, reserved, 0); err != nil { + js.mu.RUnlock() return nil, err } + js.mu.RUnlock() + jsa.mu.Lock() // Check for template ownership if present. if cfg.Template != _EMPTY_ && jsa.account != nil { if !jsa.checkTemplateOwnership(cfg.Template, cfg.Name) { @@ -363,6 +378,11 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, fmt.Errorf("subjects overlap with an existing stream") } + if !hasTier { + jsa.mu.Unlock() + return nil, fmt.Errorf("no applicable tier found") + } + // Setup the internal clients. c := s.createInternalJetStreamClient() ic := s.createInternalJetStreamClient() @@ -376,6 +396,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt srv: s, client: c, sysc: ic, + tier: tier, stype: cfg.Storage, consumers: make(map[string]*consumer), msgs: s.newIPQueue(qpfx + "messages"), // of *inMsg @@ -929,6 +950,7 @@ func (mset *stream) fileStoreConfig() (FileStoreConfig, error) { return fs.fileStoreConfig(), nil } +// Do not hold jsAccount or jetStream lock func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig, error) { cfg, err := checkStreamCfg(new) if err != nil { @@ -994,21 +1016,47 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig, // Save the user configured MaxBytes. newMaxBytes := cfg.MaxBytes + maxBytesOffset := int64(0) + if old.MaxBytes > 0 { + if excessRep := cfg.Replicas - old.Replicas; excessRep > 0 { + maxBytesOffset = old.MaxBytes * int64(excessRep) + } + } + // We temporarily set cfg.MaxBytes to maxBytesDiff because checkAllLimits // adds cfg.MaxBytes to the current reserved limit and checks if we've gone // over. However, we don't want an addition cfg.MaxBytes, we only want to // reserve the difference between the new and the old values. cfg.MaxBytes = maxBytesDiff - // Check if we can reserve the additional difference. - err = jsa.checkAllLimits(&cfg) - - // Restore the user configured MaxBytes. - cfg.MaxBytes = newMaxBytes - - if err != nil { + // Check limits. + js, isClustered := jsa.jetStreamAndClustered() + jsa.mu.RLock() + acc := jsa.account + selected, tier, hasTier := jsa.selectLimits(&cfg) + if !hasTier && old.Replicas != cfg.Replicas { + selected, tier, hasTier = jsa.selectLimits(old) + } + reserved := int64(0) + if !isClustered { + reserved = jsa.tieredReservation(tier, &cfg) + } + jsa.mu.RUnlock() + if !hasTier { + return nil, NewJSNoLimitsError() + } + js.mu.RLock() + defer js.mu.RUnlock() + if isClustered { + _, reserved = tieredStreamAndReservationCount(js.cluster.streams[acc.Name], tier, &cfg) + } + // reservation does not account for this stream, hence add the old value + reserved += int64(old.Replicas) * old.MaxBytes + if err := js.checkAllLimits(&selected, &cfg, reserved, maxBytesOffset); err != nil { return nil, err } + // Restore the user configured MaxBytes. + cfg.MaxBytes = newMaxBytes return &cfg, nil } @@ -1085,6 +1133,19 @@ func (mset *stream) update(config *StreamConfig) error { js := mset.js + if targetTier := tierName(cfg); mset.tier != targetTier { + // In cases such as R1->R3, only one update is needed + if _, ok := mset.jsa.limits[targetTier]; ok { + // error never set + _, reported, _ := mset.store.Utilization() + mset.jsa.updateUsage(mset.tier, mset.stype, -int64(reported)) + mset.jsa.updateUsage(targetTier, mset.stype, int64(reported)) + mset.tier = targetTier + } + // else in case the new tier does not exist (say on move), keep the old tier around + // a subsequent update to an existing tier will then move from existing past tier to existing new tier + } + // Now update config and store's version of our config. mset.cfg = *cfg @@ -2559,7 +2620,7 @@ func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) { } if mset.jsa != nil { - mset.jsa.updateUsage(mset.stype, bd) + mset.jsa.updateUsage(mset.tier, mset.stype, bd) } } @@ -3037,7 +3098,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, mset.lmsgId = msgId clfs := mset.clfs mset.lseq++ - + tierName := mset.tier // We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions. // Currently can not hold while calling store b/c we have inline storage update calls that may need the lock. // Note that upstream that sets seq/ts should be serialized as much as possible. @@ -3075,7 +3136,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.Error = NewJSStreamStoreFailedError(err, Unless(err)) response, _ = json.Marshal(resp) } - } else if jsa.limitsExceeded(stype) { + } else if jsa.limitsExceeded(stype, tierName) { s.Warnf("JetStream resource limits exceeded for account: %q", accName) if canRespond { resp.PubAck = &PubAck{Stream: name} @@ -3372,6 +3433,9 @@ func (mset *stream) internalLoop() { // Internal function to delete a stream. func (mset *stream) delete() error { + if mset == nil { + return nil + } return mset.stop(true, true) }