From 92f4dc986aa9007944606769a758abf3f42d9b3a Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 31 Mar 2022 14:17:16 -0400 Subject: [PATCH] added max_ack_pending setting to js account limits (#2982) * added max_ack_penind setting to js account limits because of the addition, defaults now have to be set later (depend on these new limits now) also re-organized the code to closer track how stream create looks Signed-off-by: Matthias Hanel --- go.mod | 2 +- go.sum | 4 +- server/accounts.go | 2 + server/consumer.go | 44 +++++++++++--------- server/jetstream.go | 1 + server/jetstream_api.go | 25 ++--------- server/jetstream_cluster.go | 68 +++++++++++++++++++++--------- server/jetstream_test.go | 71 ++++++++++++++++++++++---------- server/jwt_test.go | 82 +++++++++++++++++++++++++++++++++++++ server/opts.go | 16 +++++--- 10 files changed, 223 insertions(+), 92 deletions(-) diff --git a/go.mod b/go.mod index 23eb36d7..fe4a4f02 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.17 require ( github.com/klauspost/compress v1.14.4 github.com/minio/highwayhash v1.0.2 - github.com/nats-io/jwt/v2 v2.2.1-0.20220328222144-5efd4536dd5c + github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index a643ebba..1f40540c 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,8 @@ github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6 github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220328222144-5efd4536dd5c h1:UvynQFR0nFftljtWBBVXg0CO2PFZUgr8ph/3nbzMidc= -github.com/nats-io/jwt/v2 v2.2.1-0.20220328222144-5efd4536dd5c/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228 h1:czbQ9uYuV7dwLsh/0vpB+4rutgdLTYgoN5W5hf1S0eg= github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= diff --git a/server/accounts.go b/server/accounts.go index 8913f838..b161dc0a 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3289,6 +3289,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim MemoryMaxStreamBytes: ac.Limits.JetStreamLimits.MemoryMaxStreamBytes, StoreMaxStreamBytes: ac.Limits.JetStreamLimits.DiskMaxStreamBytes, MaxBytesRequired: ac.Limits.JetStreamLimits.MaxBytesRequired, + MaxAckPending: int(ac.Limits.JetStreamLimits.MaxAckPending), }, } } else { @@ -3302,6 +3303,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim MemoryMaxStreamBytes: l.MemoryMaxStreamBytes, StoreMaxStreamBytes: l.DiskMaxStreamBytes, MaxBytesRequired: l.MaxBytesRequired, + MaxAckPending: int(l.MaxAckPending), } } } diff --git a/server/consumer.go b/server/consumer.go index 8bbd572c..68131624 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -299,7 +299,7 @@ const ( ) // Helper function to set consumer config defaults from above. -func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts) { +func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits) { // Set to default if not specified. if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 { config.MaxWaiting = JSWaitQueueDefaultMax @@ -318,11 +318,14 @@ func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts) { } // Set proper default for max ack pending if we are ack explicit and none has been set. if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 { - if lim.MaxAckPending > 0 && lim.MaxAckPending < JsDefaultMaxAckPending { - config.MaxAckPending = lim.MaxAckPending - } else { - config.MaxAckPending = JsDefaultMaxAckPending + accPending := JsDefaultMaxAckPending + if lim.MaxAckPending > 0 && lim.MaxAckPending < accPending { + accPending = lim.MaxAckPending } + if accLim.MaxAckPending > 0 && accLim.MaxAckPending < accPending { + accPending = accLim.MaxAckPending + } + config.MaxAckPending = accPending } } @@ -330,7 +333,7 @@ func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { return mset.addConsumerWithAssignment(config, _EMPTY_, nil) } -func checkConsumerCfg(config *ConsumerConfig, lim *JSLimitOpts, cfg *StreamConfig, acc *Account) *ApiError { +func checkConsumerCfg(config *ConsumerConfig, srvLim *JSLimitOpts, cfg *StreamConfig, acc *Account, accLim *JetStreamAccountLimits) *ApiError { // Check if we have a BackOff defined that MaxDeliver is within range etc. if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo { return NewJSConsumerMaxDeliverBackoffError() @@ -384,7 +387,8 @@ func checkConsumerCfg(config *ConsumerConfig, lim *JSLimitOpts, cfg *StreamConfi return NewJSConsumerMaxRequestExpiresToSmallError() } } - if lim.MaxAckPending > 0 && config.MaxAckPending > lim.MaxAckPending { + if srvLim.MaxAckPending > 0 && config.MaxAckPending > srvLim.MaxAckPending || + accLim.MaxAckPending > 0 && config.MaxAckPending > accLim.MaxAckPending { return NewJSConsumerMaxPendingAckExcessError() } @@ -494,11 +498,18 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerConfigRequiredError() } - lim := &s.getOpts().JetStreamLimits - // Make sure we have sane defaults. - setConsumerConfigDefaults(config, lim) + jsa.mu.RLock() + selectedLimits, limitsFound := jsa.limits[tierName] + jsa.mu.RUnlock() + if !limitsFound { + return nil, NewJSNoLimitsError() + } - if err := checkConsumerCfg(config, lim, &cfg, acc); err != nil { + srvLim := &s.getOpts().JetStreamLimits + // Make sure we have sane defaults. + setConsumerConfigDefaults(config, srvLim, &selectedLimits) + + if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits); err != nil { return nil, err } @@ -521,13 +532,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } c.mu.Unlock() - jsa.mu.RLock() - jsaLimits, limitsFound := jsa.limits[tierName] - jsa.mu.RUnlock() - if !limitsFound { - return nil, NewJSNoLimitsError() - } - // Hold mset lock here. mset.mu.Lock() if mset.client == nil || mset.store == nil { @@ -552,8 +556,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // than stream config we prefer the account limits to handle cases where account limits are // updated during the lifecycle of the stream maxc := mset.cfg.MaxConsumers - if maxc <= 0 || (jsaLimits.MaxConsumers > 0 && jsaLimits.MaxConsumers < maxc) { - maxc = jsaLimits.MaxConsumers + if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) { + maxc = selectedLimits.MaxConsumers } if maxc > 0 && mset.numPublicConsumers() >= maxc { mset.mu.Unlock() diff --git a/server/jetstream.go b/server/jetstream.go index 2270e1dd..a6b90742 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -63,6 +63,7 @@ type JetStreamAccountLimits struct { MaxStore int64 `json:"max_storage"` MaxStreams int `json:"max_streams"` MaxConsumers int `json:"max_consumers"` + MaxAckPending int `json:"max_ack_pending"` MemoryMaxStreamBytes int64 `json:"memory_max_stream_bytes"` StoreMaxStreamBytes int64 `json:"storage_max_stream_bytes"` MaxBytesRequired bool `json:"max_bytes_required"` diff --git a/server/jetstream_api.go b/server/jetstream_api.go index ba21518e..6e7dbfa8 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1361,13 +1361,10 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } - acc.mu.RLock() - jsa := acc.js - acc.mu.RUnlock() + selectedLimits, tier, jsa, apiErr := acc.selectLimits(&cfg) - selectedLimits, tier, ok := jsa.selectLimits(&cfg) - if !ok { - resp.Error = NewJSNoLimitsError() + if apiErr != nil { + resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -3253,11 +3250,6 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj return } - lim := &s.getOpts().JetStreamLimits - - // Make sure we have sane defaults. - setConsumerConfigDefaults(&req.Config, lim) - var js *jetStream isClustered := s.JetStreamIsClustered() @@ -3331,17 +3323,6 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj } if isClustered && !req.Config.Direct { - streamCfg, ok := js.clusterStreamConfig(acc.Name, streamName) - if !ok { - resp.Error = NewJSStreamNotFoundError(Unless(err)) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if err := checkConsumerCfg(&req.Config, lim, &streamCfg, acc); err != nil { - resp.Error = err - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config) return } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ba90aba7..62e6ab3a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3968,6 +3968,26 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) *ra return nil } +func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) { + // Grab our jetstream account info. + acc.mu.RLock() + jsa := acc.js + acc.mu.RUnlock() + + if jsa == nil { + return nil, _EMPTY_, nil, NewJSNotEnabledForAccountError() + } + + jsa.mu.RLock() + selectedLimits, tierName, ok := jsa.selectLimits(cfg) + jsa.mu.RUnlock() + + if !ok { + return nil, _EMPTY_, nil, NewJSNoLimitsError() + } + return &selectedLimits, tierName, jsa, nil +} + func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamConfig) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { @@ -3976,17 +3996,6 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} - // Grab our jetstream account info. - acc.mu.RLock() - jsa := acc.js - acc.mu.RUnlock() - - if jsa == nil { - resp.Error = NewJSNotEnabledForAccountError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - return - } - ccfg, err := checkStreamCfg(config, &s.getOpts().JetStreamLimits) if err != nil { resp.Error = NewJSStreamInvalidConfigError(err, Unless(err)) @@ -3995,12 +4004,9 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } cfg := &ccfg - jsa.mu.RLock() - selectedLimits, tier, ok := jsa.selectLimits(config) - jsa.mu.RUnlock() - - if !ok { - resp.Error = NewJSNoLimitsError() + selectedLimits, tier, _, apiErr := acc.selectLimits(&ccfg) + if err != nil { + resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } @@ -4018,7 +4024,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, } // Check for account limits here before proposing. - if err := js.checkAccountLimits(&selectedLimits, cfg, reservations); err != nil { + if err := js.checkAccountLimits(selectedLimits, cfg, reservations); err != nil { resp.Error = NewJSStreamLimitsError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) js.mu.RUnlock() @@ -4777,11 +4783,33 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec return } + var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} + + streamCfg, ok := js.clusterStreamConfig(acc.Name, stream) + if !ok { + resp.Error = NewJSStreamNotFoundError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + selectedLimits, _, _, apiErr := acc.selectLimits(&streamCfg) + if apiErr != nil { + resp.Error = apiErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + srvLim := &s.getOpts().JetStreamLimits + // Make sure we have sane defaults + setConsumerConfigDefaults(cfg, srvLim, selectedLimits) + + if err := checkConsumerCfg(cfg, srvLim, &streamCfg, acc, selectedLimits); err != nil { + resp.Error = err + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + js.mu.Lock() defer js.mu.Unlock() - var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - // Lookup the stream assignment. sa := js.streamAssignment(acc.Name, stream) if sa == nil { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index e6ea07ff..3e1a0c5d 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -16784,14 +16784,14 @@ func TestJetStreamLimits(t *testing.T) { } t.Run("clustered", func(t *testing.T) { - c := createJetStreamClusterWithTemplate(t, ` + tmpl := ` listen: 127.0.0.1:-1 server_name: %s jetstream: { max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s', - limits: {max_ack_pending: 1000, duplicate_window: "1m"} + limits: {duplicate_window: "1m"} } cluster { name: %s @@ -16805,29 +16805,56 @@ func TestJetStreamLimits(t *testing.T) { jetstream: enabled } $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } - }`, "clust", 3) - defer c.shutdown() - s := c.randomServer() - test(t, s) + }` + limitsTest := func(t *testing.T, tmpl string) { + c := createJetStreamClusterWithTemplate(t, tmpl, "clust", 3) + defer c.shutdown() + s := c.randomServer() + test(t, s) + } + // test with max_ack_pending being defined in operator or account + t.Run("operator", func(t *testing.T) { + limitsTest(t, strings.Replace(tmpl, "duplicate_window", "max_ack_pending: 1000, duplicate_window", 1)) + }) + t.Run("account", func(t *testing.T) { + limitsTest(t, strings.Replace(tmpl, "jetstream: enabled", "jetstream: {max_ack_pending: 1000}", 1)) + }) }) t.Run("single", func(t *testing.T) { - storeDir := createDir(t, JetStreamStoreDir) - defer removeDir(t, storeDir) - conf := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - jetstream: { - max_mem_store: 2MB, - max_file_store: 8MB, - store_dir: '%s', - limits: {max_ack_pending: 1000, duplicate_window: "1m"} - }`, storeDir))) - defer removeFile(t, conf) - s, opts := RunServerWithConfig(conf) - defer s.Shutdown() - require_True(t, opts.JetStreamLimits.MaxAckPending == 1000) - require_True(t, opts.JetStreamLimits.Duplicates == time.Minute) - test(t, s) + tmpl := ` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB, + max_file_store: 8MB, + store_dir: '%s', + limits: {duplicate_window: "1m"} + } + no_auth_user: u + accounts { + ONE { + users = [ { user: "u", pass: "s3cr3t!" } ] + jetstream: enabled + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }` + limitsTest := func(t *testing.T, tmpl string) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, storeDir))) + defer removeFile(t, conf) + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + require_True(t, opts.JetStreamLimits.Duplicates == time.Minute) + test(t, s) + } + // test with max_ack_pending being defined in operator or account + t.Run("operator", func(t *testing.T) { + limitsTest(t, strings.Replace(tmpl, "duplicate_window", "max_ack_pending: 1000, duplicate_window", 1)) + }) + t.Run("account", func(t *testing.T) { + limitsTest(t, strings.Replace(tmpl, "jetstream: enabled", "jetstream: {max_ack_pending: 1000}", 1)) + }) }) } diff --git a/server/jwt_test.go b/server/jwt_test.go index fd6f3925..3135ac93 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -5414,6 +5414,88 @@ func TestJWTJetStreamTiers(t *testing.T) { require_Equal(t, err.Error(), "nats: resource limits exceeded for account") } +func TestJWTJetStreamMaxAckPendilng(t *testing.T) { + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) + defer removeFile(t, sysCreds) + + accKp, accPub := createKey(t) + accClaim := jwt.NewAccountClaims(accPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: jwt.NoLimit, MemoryStorage: jwt.NoLimit, + Consumer: jwt.NoLimit, Streams: jwt.NoLimit, MaxAckPending: int64(1000), + } + accJwt1 := encodeClaim(t, accClaim, accPub) + accCreds := newUser(t, accKp) + + start := time.Now() + + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + dirSrv := createDir(t, "srv") + defer removeDir(t, dirSrv) + cf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: s1 + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + } + `, storeDir, ojwt, syspub, dirSrv))) + defer removeFile(t, cf) + + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + + updateJwt(t, s.ClientURL(), sysCreds, sysJwt, 1) + updateJwt(t, s.ClientURL(), sysCreds, accJwt1, 1) + + nc := natsConnect(t, s.ClientURL(), nats.UserCredentials(accCreds)) + defer nc.Close() + + js, err := nc.JetStream() + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1}) + require_NoError(t, err) + + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dur1", AckPolicy: nats.AckAllPolicy, MaxAckPending: 2000}) + require_Error(t, err) + require_Equal(t, err.Error(), "consumer max ack pending exceeds server limit") + + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dur2", AckPolicy: nats.AckAllPolicy, MaxAckPending: 500}) + require_NoError(t, err) + require_True(t, ci.Config.MaxAckPending == 500) + + _, err = js.UpdateConsumer("foo", &nats.ConsumerConfig{ + Durable: "dur2", AckPolicy: nats.AckAllPolicy, MaxAckPending: 2000}) + require_Error(t, err) + require_Equal(t, err.Error(), "consumer max ack pending exceeds server limit") + + time.Sleep(time.Second - time.Since(start)) // make sure the time stamp changes + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: jwt.NoLimit, MemoryStorage: jwt.NoLimit, Consumer: jwt.NoLimit, + Streams: jwt.NoLimit, MaxAckPending: int64(2000)} + accJwt2 := encodeClaim(t, accClaim, accPub) + updateJwt(t, s.ClientURL(), sysCreds, accJwt2, 1) + + ci, err = js.UpdateConsumer("foo", &nats.ConsumerConfig{ + Durable: "dur2", AckPolicy: nats.AckAllPolicy, MaxAckPending: 2000}) + require_NoError(t, err) + require_True(t, ci.Config.MaxAckPending == 2000) +} + func TestJWTJetStreamMaxStreamBytes(t *testing.T) { sysKp, syspub := createKey(t) sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) diff --git a/server/opts.go b/server/opts.go index 20ff5e16..297ddc18 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1659,7 +1659,7 @@ func parseGateway(v interface{}, o *Options, errors *[]error, warnings *[]error) return nil } -var dynamicJSAccountLimits = JetStreamAccountLimits{-1, -1, -1, -1, 0, 0, false} +var dynamicJSAccountLimits = JetStreamAccountLimits{-1, -1, -1, -1, 0, 0, 0, false} var defaultJSAccountTiers = map[string]JetStreamAccountLimits{_EMPTY_: dynamicJSAccountLimits} // Parses jetstream account limits for an account. Simple setup with boolen is allowed, and we will @@ -1685,7 +1685,7 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return &configErr{tk, fmt.Sprintf("Expected 'enabled' or 'disabled' for string value, got '%s'", vv)} } case map[string]interface{}: - jsLimits := JetStreamAccountLimits{-1, -1, -1, -1, 0, 0, false} + jsLimits := JetStreamAccountLimits{-1, -1, -1, -1, 0, 0, 0, false} for mk, mv := range vv { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { @@ -1694,13 +1694,13 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn if !ok { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } - jsLimits.MaxMemory = int64(vv) + jsLimits.MaxMemory = vv case "max_store", "max_file", "max_disk", "store", "disk": vv, ok := mv.(int64) if !ok { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } - jsLimits.MaxStore = int64(vv) + jsLimits.MaxStore = vv case "max_streams", "streams": vv, ok := mv.(int64) if !ok { @@ -1718,7 +1718,7 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn if !ok { return &configErr{tk, fmt.Sprintf("Expected a parseable bool for %q, got %v", mk, mv)} } - jsLimits.MaxBytesRequired = bool(vv) + jsLimits.MaxBytesRequired = vv case "mem_max_stream_bytes", "memory_max_stream_bytes": vv, ok := mv.(int64) if !ok { @@ -1731,6 +1731,12 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.StoreMaxStreamBytes = vv + case "max_ack_pending": + vv, ok := mv.(int64) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} + } + jsLimits.MaxAckPending = int(vv) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{