diff --git a/server/consumer.go b/server/consumer.go index c76ba146..db7b6fab 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -329,6 +329,10 @@ func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts, accLim } config.MaxAckPending = accPending } + // if applicable set max request batch size + if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 { + config.MaxRequestBatch = lim.MaxRequestBatch + } } func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { @@ -388,6 +392,9 @@ func checkConsumerCfg(config *ConsumerConfig, srvLim *JSLimitOpts, cfg *StreamCo if config.MaxRequestExpires != 0 && config.MaxRequestExpires < time.Millisecond { return NewJSConsumerMaxRequestExpiresToSmallError() } + if srvLim.MaxRequestBatch > 0 && config.MaxRequestBatch > srvLim.MaxRequestBatch { + return NewJSConsumerMaxRequestBatchExceededError(srvLim.MaxRequestBatch) + } } if srvLim.MaxAckPending > 0 && config.MaxAckPending > srvLim.MaxAckPending { return NewJSConsumerMaxPendingAckExcessError(srvLim.MaxAckPending) diff --git a/server/errors.json b/server/errors.json index f3480966..bf2ff9b6 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1228,5 +1228,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerMaxRequestBatchExceededF", + "code": 400, + "error_code": 10125, + "description": "consumer max request batch exceeds server limit of {limit}", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } -] +] \ No newline at end of file diff --git a/server/jetstream.go b/server/jetstream.go index aecb0ac0..0e3b226a 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -2191,9 +2191,9 @@ func (a *Account) addStreamTemplate(tc *StreamTemplateConfig) (*streamTemplate, // FIXME(dlc) - Hacky tcopy := tc.deepCopy() tcopy.Config.Name = "_" - cfg, err := checkStreamCfg(tcopy.Config, &s.getOpts().JetStreamLimits) - if err != nil { - return nil, err + cfg, apiErr := s.checkStreamCfg(tcopy.Config, a) + if apiErr != nil { + return nil, apiErr } tcopy.Config = &cfg t := &streamTemplate{ diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 52fb3fd1..65842b8d 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -290,7 +290,8 @@ func generateJSMappingTable(domain string) map[string]string { const JSMaxDescriptionLen = 4 * 1024 // JSMaxNameLen is the maximum name lengths for streams, consumers and templates. -const JSMaxNameLen = 256 +// Picked 255 as it seems to be a widely used file name limit +const JSMaxNameLen = 255 // Responses for API calls. @@ -1221,140 +1222,6 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } - hasStream := func(streamName string) (bool, int32, []string) { - var exists bool - var maxMsgSize int32 - var subs []string - if s.JetStreamIsClustered() { - if js, _ := s.getJetStreamCluster(); js != nil { - js.mu.RLock() - if sa := js.streamAssignment(acc.Name, streamName); sa != nil { - maxMsgSize = sa.Config.MaxMsgSize - subs = sa.Config.Subjects - exists = true - } - js.mu.RUnlock() - } - } else if mset, err := acc.lookupStream(streamName); err == nil { - maxMsgSize = mset.cfg.MaxMsgSize - subs = mset.cfg.Subjects - exists = true - } - return exists, maxMsgSize, subs - } - - var streamSubs []string - var deliveryPrefixes []string - var apiPrefixes []string - - // Do some pre-checking for mirror config to avoid cycles in clustered mode. - if cfg.Mirror != nil { - if len(cfg.Subjects) > 0 { - resp.Error = NewJSMirrorWithSubjectsError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if len(cfg.Sources) > 0 { - resp.Error = NewJSMirrorWithSourcesError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if cfg.Mirror.FilterSubject != _EMPTY_ { - resp.Error = NewJSMirrorWithSubjectFiltersError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil { - resp.Error = NewJSMirrorWithStartSeqAndTimeError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if cfg.Duplicates != time.Duration(0) { - resp.Error = &ApiError{Code: 400, Description: "stream mirrors do not make use of a de-duplication window"} - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // We do not require other stream to exist anymore, but if we can see it check payloads. - exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name) - if len(subs) > 0 { - streamSubs = append(streamSubs, subs...) - } - if exists && cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { - resp.Error = NewJSMirrorMaxMessageSizeTooBigError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if cfg.Mirror.External != nil { - if cfg.Mirror.External.DeliverPrefix != _EMPTY_ { - deliveryPrefixes = append(deliveryPrefixes, cfg.Mirror.External.DeliverPrefix) - } - if cfg.Mirror.External.ApiPrefix != _EMPTY_ { - apiPrefixes = append(apiPrefixes, cfg.Mirror.External.ApiPrefix) - } - } - } - if len(cfg.Sources) > 0 { - for _, src := range cfg.Sources { - if src.External == nil { - continue - } - exists, maxMsgSize, subs := hasStream(src.Name) - if len(subs) > 0 { - streamSubs = append(streamSubs, subs...) - } - if src.External.DeliverPrefix != _EMPTY_ { - deliveryPrefixes = append(deliveryPrefixes, src.External.DeliverPrefix) - } - if src.External.ApiPrefix != _EMPTY_ { - apiPrefixes = append(apiPrefixes, src.External.ApiPrefix) - } - if exists && cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { - resp.Error = NewJSSourceMaxMessageSizeTooBigError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - } - } - // check prefix overlap with subjects - for _, pfx := range deliveryPrefixes { - if !IsValidPublishSubject(pfx) { - resp.Error = NewJSStreamInvalidExternalDeliverySubjError(pfx) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - for _, sub := range streamSubs { - if SubjectsCollide(sub, fmt.Sprintf("%s.%s", pfx, sub)) { - resp.Error = NewJSStreamExternalDelPrefixOverlapsError(pfx, sub) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - } - } - // check if api prefixes overlap - for _, apiPfx := range apiPrefixes { - if !IsValidPublishSubject(apiPfx) { - resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("stream external api prefix %q must be a valid subject without wildcards", apiPfx)} - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if SubjectsCollide(apiPfx, JSApiPrefix) { - resp.Error = NewJSStreamExternalApiOverlapError(apiPfx, JSApiPrefix) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - } - - // Check for MaxBytes required and it's limit - if required, limit := acc.maxBytesLimits(&cfg); required && cfg.MaxBytes <= 0 { - resp.Error = NewJSStreamMaxBytesRequiredError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } else if limit > 0 && cfg.MaxBytes > limit { - resp.Error = NewJSStreamMaxStreamBytesExceededError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Can't create a stream with a sealed state. if cfg.Sealed { resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) @@ -1368,16 +1235,8 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } - selectedLimits, tier, jsa, apiErr := acc.selectLimits(&cfg) - - if apiErr != nil { - resp.Error = apiErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - - if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, &cfg) >= selectedLimits.MaxStreams { - resp.Error = NewJSMaximumStreamsLimitError() + if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil { + resp.Error = err s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -1442,9 +1301,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, return } - cfg, err := checkStreamCfg(&ncfg, &s.getOpts().JetStreamLimits) - if err != nil { - resp.Error = NewJSStreamInvalidConfigError(err) + cfg, apiErr := s.checkStreamCfg(&ncfg, acc) + if apiErr != nil { + resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -1456,17 +1315,6 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, return } - // Check for MaxBytes required and it's limit - if required, limit := acc.maxBytesLimits(&cfg); required && cfg.MaxBytes <= 0 { - resp.Error = NewJSStreamMaxBytesRequiredError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } else if limit > 0 && cfg.MaxBytes > limit { - resp.Error = NewJSStreamMaxStreamBytesExceededError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Handle clustered version here. if s.JetStreamIsClustered() { // If we are inline with client, we still may need to do a callout for stream info @@ -2783,6 +2631,23 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } +func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError { + selectedLimits, tier, jsa, apiErr := acc.selectLimits(cfg) + if apiErr != nil { + return apiErr + } + jsa.mu.RLock() + defer jsa.mu.RUnlock() + if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams { + return NewJSMaximumStreamsLimitError() + } + reserved := jsa.tieredReservation(tier, cfg) + if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil { + return NewJSStreamLimitsError(err, Unless(err)) + } + return nil +} + // Request to restore a stream. func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamIsLeader() { @@ -2819,11 +2684,25 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account req.Config.Name = stream } + // check stream config at the start of the restore process, not at the end + cfg, apiErr := s.checkStreamCfg(&req.Config, acc) + if apiErr != nil { + resp.Error = apiErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if s.JetStreamIsClustered() { s.jsClusteredStreamRestoreRequest(ci, acc, &req, stream, subject, reply, rmsg) return } + if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil { + resp.Error = err + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if _, err := acc.lookupStream(stream); err == nil { resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 091e8b30..bbdcf0cf 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4299,6 +4299,26 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st return &selectedLimits, tierName, jsa, nil } +// Read lock needs to be held +func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfig) *ApiError { + selectedLimits, tier, _, apiErr := acc.selectLimits(cfg) + if apiErr != nil { + return apiErr + } + + asa := js.cluster.streams[acc.Name] + numStreams, reservations := tieredStreamAndReservationCount(asa, tier, cfg) + + if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams { + return NewJSMaximumStreamsLimitError() + } + // Check for account limits here before proposing. + if err := js.checkAccountLimits(selectedLimits, cfg, reservations); err != nil { + return NewJSStreamLimitsError(err, Unless(err)) + } + return nil +} + func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamConfig) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { @@ -4307,41 +4327,24 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} - ccfg, err := checkStreamCfg(config, &s.getOpts().JetStreamLimits) - if err != nil { - resp.Error = NewJSStreamInvalidConfigError(err, Unless(err)) - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - return - } - cfg := &ccfg - - selectedLimits, tier, _, apiErr := acc.selectLimits(&ccfg) + ccfg, apiErr := s.checkStreamCfg(config, acc) if apiErr != nil { resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } + cfg := &ccfg - // Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa. js.mu.RLock() + apiErr = js.jsClusteredStreamLimitsCheck(acc, cfg) asa := cc.streams[acc.Name] - numStreams, reservations := tieredStreamAndReservationCount(asa, tier, config) - - if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams { - resp.Error = NewJSMaximumStreamsLimitError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - js.mu.RUnlock() - return - } - - // Check for account limits here before proposing. - if err := js.checkAccountLimits(selectedLimits, cfg, reservations); err != nil { - resp.Error = NewJSStreamLimitsError(err, Unless(err)) - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - js.mu.RUnlock() - return - } js.mu.RUnlock() + // Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa. + if apiErr != nil { + resp.Error = apiErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } // Now process the request and proposal. js.mu.Lock() @@ -4424,7 +4427,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { js.mu.Unlock() - ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, &s.getOpts().JetStreamLimits) + ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, s) js.mu.Lock() if err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) @@ -4742,6 +4745,12 @@ func (s *Server) jsClusteredStreamRestoreRequest( cfg := &req.Config resp := JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}} + if err := js.jsClusteredStreamLimitsCheck(acc, cfg); err != nil { + resp.Error = err + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + if sa := js.streamAssignment(ci.serviceAccount(), cfg.Name); sa != nil { resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 4449f70a..bc74d7ec 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -13923,3 +13923,52 @@ func (c *cluster) stableTotalSubs() (total int) { return nsubs } + +func TestJetStreamMirrorSourceLoop(t *testing.T) { + test := func(t *testing.T, s *Server, replicas int) { + nc, js := jsClientConnect(t, s) + defer nc.Close() + // Create a source/mirror loop + _, err := js.AddStream(&nats.StreamConfig{ + Name: "1", + Subjects: []string{"foo", "bar"}, + Replicas: replicas, + Sources: []*nats.StreamSource{{Name: "DECOY"}, {Name: "2"}}, + }) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "DECOY", + Subjects: []string{"baz"}, + Replicas: replicas, + Sources: []*nats.StreamSource{{Name: "NOTTHERE"}}, + }) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "2", + Replicas: replicas, + Sources: []*nats.StreamSource{{Name: "3"}}, + }) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "3", + Replicas: replicas, + Sources: []*nats.StreamSource{{Name: "1"}}, + }) + require_Error(t, err) + require_Equal(t, err.Error(), "detected cycle") + } + + t.Run("Single", func(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + test(t, s, 1) + }) + t.Run("Clustered", func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 5) + defer c.shutdown() + test(t, c.randomServer(), 2) + }) +} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 89ca20d3..26114931 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -113,6 +113,9 @@ const ( // JSConsumerMaxPendingAckPolicyRequiredErr consumer requires ack policy for max ack pending JSConsumerMaxPendingAckPolicyRequiredErr ErrorIdentifier = 10082 + // JSConsumerMaxRequestBatchExceededF consumer max request batch exceeds server limit of {limit} + JSConsumerMaxRequestBatchExceededF ErrorIdentifier = 10125 + // JSConsumerMaxRequestBatchNegativeErr consumer max request batch needs to be > 0 JSConsumerMaxRequestBatchNegativeErr ErrorIdentifier = 10114 @@ -413,6 +416,7 @@ var ( JSConsumerMaxDeliverBackoffErr: {Code: 400, ErrCode: 10116, Description: "max deliver is required to be > length of backoff values"}, JSConsumerMaxPendingAckExcessErrF: {Code: 400, ErrCode: 10121, Description: "consumer max ack pending exceeds system limit of {limit}"}, JSConsumerMaxPendingAckPolicyRequiredErr: {Code: 400, ErrCode: 10082, Description: "consumer requires ack policy for max ack pending"}, + JSConsumerMaxRequestBatchExceededF: {Code: 400, ErrCode: 10125, Description: "consumer max request batch exceeds server limit of {limit}"}, JSConsumerMaxRequestBatchNegativeErr: {Code: 400, ErrCode: 10114, Description: "consumer max request batch needs to be > 0"}, JSConsumerMaxRequestExpiresToSmall: {Code: 400, ErrCode: 10115, Description: "consumer max request expires needs to be >= 1ms"}, JSConsumerMaxWaitingNegativeErr: {Code: 400, ErrCode: 10087, Description: "consumer max waiting needs to be positive"}, @@ -915,6 +919,22 @@ func NewJSConsumerMaxPendingAckPolicyRequiredError(opts ...ErrorOption) *ApiErro return ApiErrors[JSConsumerMaxPendingAckPolicyRequiredErr] } +// NewJSConsumerMaxRequestBatchExceededError creates a new JSConsumerMaxRequestBatchExceededF error: "consumer max request batch exceeds server limit of {limit}" +func NewJSConsumerMaxRequestBatchExceededError(limit interface{}, opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + e := ApiErrors[JSConsumerMaxRequestBatchExceededF] + args := e.toReplacerArgs([]interface{}{"{limit}", limit}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } +} + // NewJSConsumerMaxRequestBatchNegativeError creates a new JSConsumerMaxRequestBatchNegativeErr error: "consumer max request batch needs to be > 0" func NewJSConsumerMaxRequestBatchNegativeError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 86c062c8..b39c1af9 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -4201,6 +4201,9 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { opts.JetStream = true opts.JetStreamDomain = "domain" opts.StoreDir = tdir + maxStore := int64(1024 * 1024 * 1024) + opts.maxStoreSet = true + opts.JetStreamMaxStore = maxStore rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lopts.LeafNode.Host, lopts.LeafNode.Port)) opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{rurl}}} @@ -12070,7 +12073,7 @@ func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) { require_NoError(t, err) _, err = js.UpdateStream(&nats.StreamConfig{Name: "MIRROR", Mirror: &nats.StreamSource{Name: "ORIGINAL"}, Subjects: []string{"x"}}) - if err == nil || err.Error() != "stream mirrors may not have subjects" { + if err == nil || err.Error() != "stream mirrors can not contain subjects" { t.Fatalf("Expected to not be able to put subjects on a stream, got: %+v", err) } } @@ -13511,9 +13514,9 @@ func TestJetStreamLongStreamNamesAndPubAck(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - data := make([]byte, 256) + data := make([]byte, 255) rand.Read(data) - stream := base64.StdEncoding.EncodeToString(data)[:256] + stream := base64.StdEncoding.EncodeToString(data)[:255] cfg := &nats.StreamConfig{ Name: stream, @@ -16573,6 +16576,83 @@ func TestJetStreamCrossAccounts(t *testing.T) { } } +func TestJetStreamInvalidRestoreRequests(t *testing.T) { + test := func(t *testing.T, s *Server, replica int) { + nc := natsConnect(t, s.ClientURL()) + // test invalid stream config in restore request + require_fail := func(cfg StreamConfig, errDesc string) { + t.Helper() + rreq := &JSApiStreamRestoreRequest{ + Config: cfg, + } + req, err := json.Marshal(rreq) + require_NoError(t, err) + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "fail"), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var rresp JSApiStreamRestoreResponse + json.Unmarshal(rmsg.Data, &rresp) + require_True(t, rresp.Error != nil) + require_Equal(t, rresp.Error.Description, errDesc) + } + require_fail(StreamConfig{Name: "fail", MaxBytes: 1024, Storage: FileStorage, Replicas: 6}, + "maximum replicas is 5") + require_fail(StreamConfig{Name: "fail", MaxBytes: 2 * 1012 * 1024, Storage: FileStorage, Replicas: replica}, + "insufficient storage resources available") + js, err := nc.JetStream() + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "stream", MaxBytes: 1024, Storage: nats.FileStorage, Replicas: 1}) + require_NoError(t, err) + require_fail(StreamConfig{Name: "fail", MaxBytes: 1024, Storage: FileStorage}, + "maximum number of streams reached") + } + + commonAccSection := ` + no_auth_user: u + accounts { + ONE { + users = [ { user: "u", pass: "s3cr3t!" } ] + jetstream: { + max_store: 1Mb + max_streams: 1 + } + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }` + + t.Run("clustered", func(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + max_mem_store: 2MB, + max_file_store: 8MB, + store_dir: '%s', + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + }`+commonAccSection, "clust", 3) + defer c.shutdown() + s := c.randomServer() + test(t, s, 3) + }) + t.Run("single", func(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s'} + %s`, storeDir, commonAccSection))) + defer removeFile(t, conf) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + test(t, s, 1) + }) +} + func TestJetStreamLimits(t *testing.T) { test := func(t *testing.T, s *Server) { nc := natsConnect(t, s.ClientURL()) @@ -16585,9 +16665,9 @@ func TestJetStreamLimits(t *testing.T) { require_NoError(t, err) require_True(t, si.Config.Duplicates == time.Minute) - si, err = js.AddStream(&nats.StreamConfig{Name: "bar", Duplicates: 500 * time.Millisecond}) + si, err = js.AddStream(&nats.StreamConfig{Name: "bar", Duplicates: 1500 * time.Millisecond}) require_NoError(t, err) - require_True(t, si.Config.Duplicates == 500*time.Millisecond) + require_True(t, si.Config.Duplicates == 1500*time.Millisecond) _, err = js.UpdateStream(&nats.StreamConfig{Name: "bar", Duplicates: 2 * time.Minute}) require_Error(t, err) @@ -16600,10 +16680,16 @@ func TestJetStreamLimits(t *testing.T) { ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dur1", AckPolicy: nats.AckExplicitPolicy}) require_NoError(t, err) require_True(t, ci.Config.MaxAckPending == 1000) + require_True(t, ci.Config.MaxRequestBatch == 250) + + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dur2", AckPolicy: nats.AckExplicitPolicy, MaxRequestBatch: 500}) + require_Error(t, err) + require_Equal(t, err.Error(), "consumer max request batch exceeds server limit of 250") ci, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dur2", AckPolicy: nats.AckExplicitPolicy, MaxAckPending: 500}) require_NoError(t, err) require_True(t, ci.Config.MaxAckPending == 500) + require_True(t, ci.Config.MaxRequestBatch == 250) _, err = js.UpdateConsumer("foo", &nats.ConsumerConfig{Durable: "dur2", AckPolicy: nats.AckExplicitPolicy, MaxAckPending: 2000}) require_Error(t, err) @@ -16622,7 +16708,7 @@ func TestJetStreamLimits(t *testing.T) { max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s', - limits: {duplicate_window: "1m"} + limits: {duplicate_window: "1m", max_request_batch: 250} } cluster { name: %s @@ -16659,7 +16745,7 @@ func TestJetStreamLimits(t *testing.T) { max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s', - limits: {duplicate_window: "1m"} + limits: {duplicate_window: "1m", max_request_batch: 250} } no_auth_user: u accounts { diff --git a/server/opts.go b/server/opts.go index 5c77d876..c3dcbe31 100644 --- a/server/opts.go +++ b/server/opts.go @@ -185,9 +185,10 @@ type RemoteLeafOpts struct { } type JSLimitOpts struct { - MaxAckPending int - MaxHAAssets int - Duplicates time.Duration + MaxRequestBatch int + MaxAckPending int + MaxHAAssets int + Duplicates time.Duration } // Options block for nats-server. @@ -1818,6 +1819,8 @@ func parseJetStreamLimits(v interface{}, opts *Options, errors *[]error, warning lim.MaxAckPending = int(mv.(int64)) case "max_ha_assets": lim.MaxHAAssets = int(mv.(int64)) + case "max_request_batch": + lim.MaxRequestBatch = int(mv.(int64)) case "duplicate_window": var err error lim.Duplicates, err = time.ParseDuration(mv.(string)) diff --git a/server/stream.go b/server/stream.go index d2a2605d..2b87d92d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -295,9 +295,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } // Sensible defaults. - cfg, err := checkStreamCfg(config, &s.getOpts().JetStreamLimits) - if err != nil { - return nil, NewJSStreamInvalidConfigError(err, Unless(err)) + cfg, apiErr := s.checkStreamCfg(config, a) + if apiErr != nil { + return nil, apiErr } singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() @@ -347,30 +347,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } } - // Check for mirror designation. - if cfg.Mirror != nil { - // Can't have subjects. - if len(cfg.Subjects) > 0 { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream mirrors can not also contain subjects") - } - if len(cfg.Sources) > 0 { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream mirrors can not also contain other sources") - } - if cfg.Mirror.FilterSubject != _EMPTY_ { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream mirrors can not contain filtered subjects") - } - if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream mirrors can not have both start seq and start time configured") - } - } else if len(cfg.Subjects) == 0 && len(cfg.Sources) == 0 { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream needs at least one configured subject or mirror") - } - // Setup our internal indexed names here for sources. if len(cfg.Sources) > 0 { for _, ssi := range cfg.Sources { @@ -870,18 +846,20 @@ func (jsa *jsAccount) subjectsOverlap(subjects []string) bool { // StreamDefaultDuplicatesWindow default duplicates window. const StreamDefaultDuplicatesWindow = 2 * time.Minute -func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error) { +func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfig, *ApiError) { + lim := &s.getOpts().JetStreamLimits + if config == nil { - return StreamConfig{}, fmt.Errorf("stream configuration invalid") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration invalid")) } if !isValidName(config.Name) { - return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream name is required and can not contain '.', '*', '>'")) } if len(config.Name) > JSMaxNameLen { - return StreamConfig{}, fmt.Errorf("stream name is too long, maximum allowed is %d", JSMaxNameLen) + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream name is too long, maximum allowed is %d", JSMaxNameLen)) } if len(config.Description) > JSMaxDescriptionLen { - return StreamConfig{}, fmt.Errorf("stream description is too long, maximum allowed is %d", JSMaxDescriptionLen) + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream description is too long, maximum allowed is %d", JSMaxDescriptionLen)) } cfg := *config @@ -894,7 +872,7 @@ func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error cfg.Replicas = 1 } if cfg.Replicas > StreamMaxReplicas { - return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas) + return cfg, NewJSStreamInvalidConfigError(fmt.Errorf("maximum replicas is %d", StreamMaxReplicas)) } if cfg.MaxMsgs == 0 { cfg.MaxMsgs = -1 @@ -911,7 +889,7 @@ func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error if cfg.MaxConsumers == 0 { cfg.MaxConsumers = -1 } - if cfg.Duplicates == 0 { + if cfg.Duplicates == 0 && cfg.Mirror == nil { maxWindow := StreamDefaultDuplicatesWindow if lim.Duplicates > 0 && maxWindow > lim.Duplicates { maxWindow = lim.Duplicates @@ -923,18 +901,156 @@ func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error } } if cfg.Duplicates < 0 { - return StreamConfig{}, fmt.Errorf("duplicates window can not be negative") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicates window can not be negative")) } // Check that duplicates is not larger then age if set. if cfg.MaxAge != 0 && cfg.Duplicates > cfg.MaxAge { - return StreamConfig{}, fmt.Errorf("duplicates window can not be larger then max age") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicates window can not be larger then max age")) } if lim.Duplicates > 0 && cfg.Duplicates > lim.Duplicates { - return StreamConfig{}, fmt.Errorf("duplicates window can not be larger then server limit of %v", lim.Duplicates.String()) + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicates window can not be larger then server limit of %v", lim.Duplicates.String())) } if cfg.DenyPurge && cfg.AllowRollup { - return StreamConfig{}, fmt.Errorf("roll-ups require the purge permission") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("roll-ups require the purge permission")) + } + + getStream := func(streamName string) (bool, StreamConfig) { + var exists bool + var cfg StreamConfig + if s.JetStreamIsClustered() { + if js, _ := s.getJetStreamCluster(); js != nil { + js.mu.RLock() + if sa := js.streamAssignment(acc.Name, streamName); sa != nil { + cfg = *sa.Config + exists = true + } + js.mu.RUnlock() + } + } else if mset, err := acc.lookupStream(streamName); err == nil { + cfg = mset.cfg + exists = true + } + return exists, cfg + } + hasStream := func(streamName string) (bool, int32, []string) { + exists, cfg := getStream(streamName) + return exists, cfg.MaxMsgSize, cfg.Subjects + } + + var streamSubs []string + var deliveryPrefixes []string + var apiPrefixes []string + + // Do some pre-checking for mirror config to avoid cycles in clustered mode. + if cfg.Mirror != nil { + if len(cfg.Subjects) > 0 { + return StreamConfig{}, NewJSMirrorWithSubjectsError() + + } + if len(cfg.Sources) > 0 { + return StreamConfig{}, NewJSMirrorWithSourcesError() + } + if cfg.Mirror.FilterSubject != _EMPTY_ { + return StreamConfig{}, NewJSMirrorWithSubjectFiltersError() + } + if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil { + return StreamConfig{}, NewJSMirrorWithStartSeqAndTimeError() + } + if cfg.Duplicates != time.Duration(0) { + return StreamConfig{}, NewJSStreamInvalidConfigError( + errors.New("stream mirrors do not make use of a de-duplication window")) + } + // We do not require other stream to exist anymore, but if we can see it check payloads. + exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name) + if len(subs) > 0 { + streamSubs = append(streamSubs, subs...) + } + if exists && cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { + return StreamConfig{}, NewJSMirrorMaxMessageSizeTooBigError() + } + if cfg.Mirror.External != nil { + if cfg.Mirror.External.DeliverPrefix != _EMPTY_ { + deliveryPrefixes = append(deliveryPrefixes, cfg.Mirror.External.DeliverPrefix) + } + if cfg.Mirror.External.ApiPrefix != _EMPTY_ { + apiPrefixes = append(apiPrefixes, cfg.Mirror.External.ApiPrefix) + } + } + } + if len(cfg.Sources) > 0 { + for _, src := range cfg.Sources { + if src.External == nil { + continue + } + exists, maxMsgSize, subs := hasStream(src.Name) + if len(subs) > 0 { + streamSubs = append(streamSubs, subs...) + } + if src.External.DeliverPrefix != _EMPTY_ { + deliveryPrefixes = append(deliveryPrefixes, src.External.DeliverPrefix) + } + if src.External.ApiPrefix != _EMPTY_ { + apiPrefixes = append(apiPrefixes, src.External.ApiPrefix) + } + if exists && cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { + return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError() + } + } + } + // check prefix overlap with subjects + for _, pfx := range deliveryPrefixes { + if !IsValidPublishSubject(pfx) { + return StreamConfig{}, NewJSStreamInvalidExternalDeliverySubjError(pfx) + } + for _, sub := range streamSubs { + if SubjectsCollide(sub, fmt.Sprintf("%s.%s", pfx, sub)) { + return StreamConfig{}, NewJSStreamExternalDelPrefixOverlapsError(pfx, sub) + } + } + } + // check if api prefixes overlap + for _, apiPfx := range apiPrefixes { + if !IsValidPublishSubject(apiPfx) { + return StreamConfig{}, NewJSStreamInvalidConfigError( + fmt.Errorf("stream external api prefix %q must be a valid subject without wildcards", apiPfx)) + } + if SubjectsCollide(apiPfx, JSApiPrefix) { + return StreamConfig{}, NewJSStreamExternalApiOverlapError(apiPfx, JSApiPrefix) + } + } + + // cycle check for source cycle + toVisit := []*StreamConfig{&cfg} + visited := make(map[string]struct{}) + for len(toVisit) > 0 { + cfg := toVisit[0] + toVisit = toVisit[1:] + visited[cfg.Name] = struct{}{} + for _, src := range cfg.Sources { + if src.External != nil { + // TODO (mh) look up service imports and see if src.External.ApiPrefix returns an account + // this will be much easier without the delivery subject + continue + } + if _, ok := visited[src.Name]; ok { + return StreamConfig{}, NewJSStreamInvalidConfigError(errors.New("detected cycle")) + } + if exists, cfg := getStream(src.Name); exists { + toVisit = append(toVisit, &cfg) + } + } + // Avoid cycles hiding behind mirrors + if m := cfg.Mirror; m != nil { + if m.External == nil { + if _, ok := visited[m.Name]; ok { + return StreamConfig{}, NewJSStreamInvalidConfigError(errors.New("detected cycle")) + } + if exists, cfg := getStream(m.Name); exists { + toVisit = append(toVisit, &cfg) + } + } + } } if len(cfg.Subjects) == 0 { @@ -943,27 +1059,40 @@ func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error } } else { if cfg.Mirror != nil { - return StreamConfig{}, fmt.Errorf("stream mirrors may not have subjects") + return StreamConfig{}, NewJSMirrorWithSubjectsError() } // We can allow overlaps, but don't allow direct duplicates. dset := make(map[string]struct{}, len(cfg.Subjects)) for _, subj := range cfg.Subjects { if _, ok := dset[subj]; ok { - return StreamConfig{}, fmt.Errorf("duplicate subjects detected") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected")) } // Also check to make sure we do not overlap with our $JS API subjects. if subjectIsSubsetMatch(subj, "$JS.API.>") { - return StreamConfig{}, fmt.Errorf("subjects overlap with jetstream api") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")) } // Make sure the subject is valid. if !IsValidSubject(subj) { - return StreamConfig{}, fmt.Errorf("invalid subject") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("invalid subject")) } // Mark for duplicate check. dset[subj] = struct{}{} } } + + if len(cfg.Subjects) == 0 && len(cfg.Sources) == 0 && cfg.Mirror == nil { + return StreamConfig{}, NewJSStreamInvalidConfigError( + fmt.Errorf("stream needs at least one configured subject or be a source/mirror")) + } + + // Check for MaxBytes required and it's limit + if required, limit := acc.maxBytesLimits(&cfg); required && cfg.MaxBytes <= 0 { + return StreamConfig{}, NewJSStreamMaxBytesRequiredError() + } else if limit > 0 && cfg.MaxBytes > limit { + return StreamConfig{}, NewJSStreamMaxStreamBytesExceededError() + } + return cfg, nil } @@ -985,10 +1114,10 @@ func (mset *stream) fileStoreConfig() (FileStoreConfig, error) { } // Do not hold jsAccount or jetStream lock -func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, lim *JSLimitOpts) (*StreamConfig, error) { - cfg, err := checkStreamCfg(new, lim) - if err != nil { - return nil, NewJSStreamInvalidConfigError(err, Unless(err)) +func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*StreamConfig, error) { + cfg, apiErr := s.checkStreamCfg(new, jsa.acc()) + if apiErr != nil { + return nil, apiErr } // Name must match. @@ -1096,8 +1225,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, lim *JSLimitOpts // Update will allow certain configuration properties of an existing stream to be updated. func (mset *stream) update(config *StreamConfig) error { - ocfg := mset.config() - cfg, err := mset.jsa.configUpdateCheck(&ocfg, config, &mset.srv.getOpts().JetStreamLimits) + mset.mu.RLock() + ocfg := mset.cfg + s := mset.srv + mset.mu.RUnlock() + cfg, err := mset.jsa.configUpdateCheck(&ocfg, config, s) if err != nil { return NewJSStreamInvalidConfigError(err, Unless(err)) } @@ -3793,16 +3925,16 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return nil, errors.New("nil config on stream restore") } - cfg, err := checkStreamCfg(ncfg, &a.srv.getOpts().JetStreamLimits) - if err != nil { - return nil, NewJSStreamNotFoundError(Unless(err)) - } - - _, jsa, err := a.checkForJetStream() + s, jsa, err := a.checkForJetStream() if err != nil { return nil, err } + cfg, apiErr := s.checkStreamCfg(ncfg, a) + if apiErr != nil { + return nil, apiErr + } + sd := filepath.Join(jsa.storeDir, snapsDir) if _, err := os.Stat(sd); os.IsNotExist(err) { if err := os.MkdirAll(sd, defaultDirPerms); err != nil {