From 79b4374d01d874b32a3aa13d9d6527920181b772 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 18 Apr 2022 01:53:48 -0400 Subject: [PATCH] [Fixed] limits enforcement issues (#3046) * [Fixed] limits enforcement issues stream create had checks that stream restore did not have. Moved code into commonly used function checkStreamCfg. Also introduced (cluster/non clustered) StreamLimitsCheck functions to perform checks specific to clustered /non clustered data structures. Checking for valid stream config and limits/reservations before receiving all the data. Now fails the request right away. Added a jetstream limit "max_request_batch" to limit fetch batch size Shortened max name length from 256 to 255, more common file name limit Added check for loop in cyclic source stream configurations features related to limits Signed-off-by: Matthias Hanel --- server/consumer.go | 7 + server/errors.json | 12 +- server/jetstream.go | 6 +- server/jetstream_api.go | 197 +++++----------------- server/jetstream_cluster.go | 63 ++++--- server/jetstream_cluster_test.go | 49 ++++++ server/jetstream_errors_generated.go | 20 +++ server/jetstream_test.go | 100 ++++++++++- server/opts.go | 9 +- server/stream.go | 240 +++++++++++++++++++++------ 10 files changed, 449 insertions(+), 254 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index c76ba146..db7b6fab 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -329,6 +329,10 @@ func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts, accLim } config.MaxAckPending = accPending } + // if applicable set max request batch size + if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 { + config.MaxRequestBatch = lim.MaxRequestBatch + } } func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { @@ -388,6 +392,9 @@ func checkConsumerCfg(config *ConsumerConfig, srvLim *JSLimitOpts, cfg *StreamCo if config.MaxRequestExpires != 0 && config.MaxRequestExpires < time.Millisecond { return NewJSConsumerMaxRequestExpiresToSmallError() } + if srvLim.MaxRequestBatch > 0 && config.MaxRequestBatch > srvLim.MaxRequestBatch { + return NewJSConsumerMaxRequestBatchExceededError(srvLim.MaxRequestBatch) + } } if srvLim.MaxAckPending > 0 && config.MaxAckPending > srvLim.MaxAckPending { return NewJSConsumerMaxPendingAckExcessError(srvLim.MaxAckPending) diff --git a/server/errors.json b/server/errors.json index f3480966..bf2ff9b6 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1228,5 +1228,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerMaxRequestBatchExceededF", + "code": 400, + "error_code": 10125, + "description": "consumer max request batch exceeds server limit of {limit}", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } -] +] \ No newline at end of file diff --git a/server/jetstream.go b/server/jetstream.go index aecb0ac0..0e3b226a 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -2191,9 +2191,9 @@ func (a *Account) addStreamTemplate(tc *StreamTemplateConfig) (*streamTemplate, // FIXME(dlc) - Hacky tcopy := tc.deepCopy() tcopy.Config.Name = "_" - cfg, err := checkStreamCfg(tcopy.Config, &s.getOpts().JetStreamLimits) - if err != nil { - return nil, err + cfg, apiErr := s.checkStreamCfg(tcopy.Config, a) + if apiErr != nil { + return nil, apiErr } tcopy.Config = &cfg t := &streamTemplate{ diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 52fb3fd1..65842b8d 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -290,7 +290,8 @@ func generateJSMappingTable(domain string) map[string]string { const JSMaxDescriptionLen = 4 * 1024 // JSMaxNameLen is the maximum name lengths for streams, consumers and templates. -const JSMaxNameLen = 256 +// Picked 255 as it seems to be a widely used file name limit +const JSMaxNameLen = 255 // Responses for API calls. @@ -1221,140 +1222,6 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } - hasStream := func(streamName string) (bool, int32, []string) { - var exists bool - var maxMsgSize int32 - var subs []string - if s.JetStreamIsClustered() { - if js, _ := s.getJetStreamCluster(); js != nil { - js.mu.RLock() - if sa := js.streamAssignment(acc.Name, streamName); sa != nil { - maxMsgSize = sa.Config.MaxMsgSize - subs = sa.Config.Subjects - exists = true - } - js.mu.RUnlock() - } - } else if mset, err := acc.lookupStream(streamName); err == nil { - maxMsgSize = mset.cfg.MaxMsgSize - subs = mset.cfg.Subjects - exists = true - } - return exists, maxMsgSize, subs - } - - var streamSubs []string - var deliveryPrefixes []string - var apiPrefixes []string - - // Do some pre-checking for mirror config to avoid cycles in clustered mode. - if cfg.Mirror != nil { - if len(cfg.Subjects) > 0 { - resp.Error = NewJSMirrorWithSubjectsError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if len(cfg.Sources) > 0 { - resp.Error = NewJSMirrorWithSourcesError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if cfg.Mirror.FilterSubject != _EMPTY_ { - resp.Error = NewJSMirrorWithSubjectFiltersError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil { - resp.Error = NewJSMirrorWithStartSeqAndTimeError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if cfg.Duplicates != time.Duration(0) { - resp.Error = &ApiError{Code: 400, Description: "stream mirrors do not make use of a de-duplication window"} - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // We do not require other stream to exist anymore, but if we can see it check payloads. - exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name) - if len(subs) > 0 { - streamSubs = append(streamSubs, subs...) - } - if exists && cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { - resp.Error = NewJSMirrorMaxMessageSizeTooBigError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if cfg.Mirror.External != nil { - if cfg.Mirror.External.DeliverPrefix != _EMPTY_ { - deliveryPrefixes = append(deliveryPrefixes, cfg.Mirror.External.DeliverPrefix) - } - if cfg.Mirror.External.ApiPrefix != _EMPTY_ { - apiPrefixes = append(apiPrefixes, cfg.Mirror.External.ApiPrefix) - } - } - } - if len(cfg.Sources) > 0 { - for _, src := range cfg.Sources { - if src.External == nil { - continue - } - exists, maxMsgSize, subs := hasStream(src.Name) - if len(subs) > 0 { - streamSubs = append(streamSubs, subs...) - } - if src.External.DeliverPrefix != _EMPTY_ { - deliveryPrefixes = append(deliveryPrefixes, src.External.DeliverPrefix) - } - if src.External.ApiPrefix != _EMPTY_ { - apiPrefixes = append(apiPrefixes, src.External.ApiPrefix) - } - if exists && cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { - resp.Error = NewJSSourceMaxMessageSizeTooBigError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - } - } - // check prefix overlap with subjects - for _, pfx := range deliveryPrefixes { - if !IsValidPublishSubject(pfx) { - resp.Error = NewJSStreamInvalidExternalDeliverySubjError(pfx) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - for _, sub := range streamSubs { - if SubjectsCollide(sub, fmt.Sprintf("%s.%s", pfx, sub)) { - resp.Error = NewJSStreamExternalDelPrefixOverlapsError(pfx, sub) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - } - } - // check if api prefixes overlap - for _, apiPfx := range apiPrefixes { - if !IsValidPublishSubject(apiPfx) { - resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("stream external api prefix %q must be a valid subject without wildcards", apiPfx)} - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if SubjectsCollide(apiPfx, JSApiPrefix) { - resp.Error = NewJSStreamExternalApiOverlapError(apiPfx, JSApiPrefix) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - } - - // Check for MaxBytes required and it's limit - if required, limit := acc.maxBytesLimits(&cfg); required && cfg.MaxBytes <= 0 { - resp.Error = NewJSStreamMaxBytesRequiredError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } else if limit > 0 && cfg.MaxBytes > limit { - resp.Error = NewJSStreamMaxStreamBytesExceededError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Can't create a stream with a sealed state. if cfg.Sealed { resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) @@ -1368,16 +1235,8 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } - selectedLimits, tier, jsa, apiErr := acc.selectLimits(&cfg) - - if apiErr != nil { - resp.Error = apiErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - - if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, &cfg) >= selectedLimits.MaxStreams { - resp.Error = NewJSMaximumStreamsLimitError() + if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil { + resp.Error = err s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -1442,9 +1301,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, return } - cfg, err := checkStreamCfg(&ncfg, &s.getOpts().JetStreamLimits) - if err != nil { - resp.Error = NewJSStreamInvalidConfigError(err) + cfg, apiErr := s.checkStreamCfg(&ncfg, acc) + if apiErr != nil { + resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -1456,17 +1315,6 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, return } - // Check for MaxBytes required and it's limit - if required, limit := acc.maxBytesLimits(&cfg); required && cfg.MaxBytes <= 0 { - resp.Error = NewJSStreamMaxBytesRequiredError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } else if limit > 0 && cfg.MaxBytes > limit { - resp.Error = NewJSStreamMaxStreamBytesExceededError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Handle clustered version here. if s.JetStreamIsClustered() { // If we are inline with client, we still may need to do a callout for stream info @@ -2783,6 +2631,23 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } +func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError { + selectedLimits, tier, jsa, apiErr := acc.selectLimits(cfg) + if apiErr != nil { + return apiErr + } + jsa.mu.RLock() + defer jsa.mu.RUnlock() + if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams { + return NewJSMaximumStreamsLimitError() + } + reserved := jsa.tieredReservation(tier, cfg) + if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil { + return NewJSStreamLimitsError(err, Unless(err)) + } + return nil +} + // Request to restore a stream. func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamIsLeader() { @@ -2819,11 +2684,25 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account req.Config.Name = stream } + // check stream config at the start of the restore process, not at the end + cfg, apiErr := s.checkStreamCfg(&req.Config, acc) + if apiErr != nil { + resp.Error = apiErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if s.JetStreamIsClustered() { s.jsClusteredStreamRestoreRequest(ci, acc, &req, stream, subject, reply, rmsg) return } + if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil { + resp.Error = err + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if _, err := acc.lookupStream(stream); err == nil { resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 091e8b30..bbdcf0cf 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4299,6 +4299,26 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st return &selectedLimits, tierName, jsa, nil } +// Read lock needs to be held +func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfig) *ApiError { + selectedLimits, tier, _, apiErr := acc.selectLimits(cfg) + if apiErr != nil { + return apiErr + } + + asa := js.cluster.streams[acc.Name] + numStreams, reservations := tieredStreamAndReservationCount(asa, tier, cfg) + + if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams { + return NewJSMaximumStreamsLimitError() + } + // Check for account limits here before proposing. + if err := js.checkAccountLimits(selectedLimits, cfg, reservations); err != nil { + return NewJSStreamLimitsError(err, Unless(err)) + } + return nil +} + func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamConfig) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { @@ -4307,41 +4327,24 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} - ccfg, err := checkStreamCfg(config, &s.getOpts().JetStreamLimits) - if err != nil { - resp.Error = NewJSStreamInvalidConfigError(err, Unless(err)) - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - return - } - cfg := &ccfg - - selectedLimits, tier, _, apiErr := acc.selectLimits(&ccfg) + ccfg, apiErr := s.checkStreamCfg(config, acc) if apiErr != nil { resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } + cfg := &ccfg - // Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa. js.mu.RLock() + apiErr = js.jsClusteredStreamLimitsCheck(acc, cfg) asa := cc.streams[acc.Name] - numStreams, reservations := tieredStreamAndReservationCount(asa, tier, config) - - if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams { - resp.Error = NewJSMaximumStreamsLimitError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - js.mu.RUnlock() - return - } - - // Check for account limits here before proposing. - if err := js.checkAccountLimits(selectedLimits, cfg, reservations); err != nil { - resp.Error = NewJSStreamLimitsError(err, Unless(err)) - s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) - js.mu.RUnlock() - return - } js.mu.RUnlock() + // Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa. + if apiErr != nil { + resp.Error = apiErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } // Now process the request and proposal. js.mu.Lock() @@ -4424,7 +4427,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { js.mu.Unlock() - ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, &s.getOpts().JetStreamLimits) + ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, s) js.mu.Lock() if err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) @@ -4742,6 +4745,12 @@ func (s *Server) jsClusteredStreamRestoreRequest( cfg := &req.Config resp := JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}} + if err := js.jsClusteredStreamLimitsCheck(acc, cfg); err != nil { + resp.Error = err + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + if sa := js.streamAssignment(ci.serviceAccount(), cfg.Name); sa != nil { resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 4449f70a..bc74d7ec 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -13923,3 +13923,52 @@ func (c *cluster) stableTotalSubs() (total int) { return nsubs } + +func TestJetStreamMirrorSourceLoop(t *testing.T) { + test := func(t *testing.T, s *Server, replicas int) { + nc, js := jsClientConnect(t, s) + defer nc.Close() + // Create a source/mirror loop + _, err := js.AddStream(&nats.StreamConfig{ + Name: "1", + Subjects: []string{"foo", "bar"}, + Replicas: replicas, + Sources: []*nats.StreamSource{{Name: "DECOY"}, {Name: "2"}}, + }) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "DECOY", + Subjects: []string{"baz"}, + Replicas: replicas, + Sources: []*nats.StreamSource{{Name: "NOTTHERE"}}, + }) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "2", + Replicas: replicas, + Sources: []*nats.StreamSource{{Name: "3"}}, + }) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "3", + Replicas: replicas, + Sources: []*nats.StreamSource{{Name: "1"}}, + }) + require_Error(t, err) + require_Equal(t, err.Error(), "detected cycle") + } + + t.Run("Single", func(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + test(t, s, 1) + }) + t.Run("Clustered", func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 5) + defer c.shutdown() + test(t, c.randomServer(), 2) + }) +} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 89ca20d3..26114931 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -113,6 +113,9 @@ const ( // JSConsumerMaxPendingAckPolicyRequiredErr consumer requires ack policy for max ack pending JSConsumerMaxPendingAckPolicyRequiredErr ErrorIdentifier = 10082 + // JSConsumerMaxRequestBatchExceededF consumer max request batch exceeds server limit of {limit} + JSConsumerMaxRequestBatchExceededF ErrorIdentifier = 10125 + // JSConsumerMaxRequestBatchNegativeErr consumer max request batch needs to be > 0 JSConsumerMaxRequestBatchNegativeErr ErrorIdentifier = 10114 @@ -413,6 +416,7 @@ var ( JSConsumerMaxDeliverBackoffErr: {Code: 400, ErrCode: 10116, Description: "max deliver is required to be > length of backoff values"}, JSConsumerMaxPendingAckExcessErrF: {Code: 400, ErrCode: 10121, Description: "consumer max ack pending exceeds system limit of {limit}"}, JSConsumerMaxPendingAckPolicyRequiredErr: {Code: 400, ErrCode: 10082, Description: "consumer requires ack policy for max ack pending"}, + JSConsumerMaxRequestBatchExceededF: {Code: 400, ErrCode: 10125, Description: "consumer max request batch exceeds server limit of {limit}"}, JSConsumerMaxRequestBatchNegativeErr: {Code: 400, ErrCode: 10114, Description: "consumer max request batch needs to be > 0"}, JSConsumerMaxRequestExpiresToSmall: {Code: 400, ErrCode: 10115, Description: "consumer max request expires needs to be >= 1ms"}, JSConsumerMaxWaitingNegativeErr: {Code: 400, ErrCode: 10087, Description: "consumer max waiting needs to be positive"}, @@ -915,6 +919,22 @@ func NewJSConsumerMaxPendingAckPolicyRequiredError(opts ...ErrorOption) *ApiErro return ApiErrors[JSConsumerMaxPendingAckPolicyRequiredErr] } +// NewJSConsumerMaxRequestBatchExceededError creates a new JSConsumerMaxRequestBatchExceededF error: "consumer max request batch exceeds server limit of {limit}" +func NewJSConsumerMaxRequestBatchExceededError(limit interface{}, opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + e := ApiErrors[JSConsumerMaxRequestBatchExceededF] + args := e.toReplacerArgs([]interface{}{"{limit}", limit}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } +} + // NewJSConsumerMaxRequestBatchNegativeError creates a new JSConsumerMaxRequestBatchNegativeErr error: "consumer max request batch needs to be > 0" func NewJSConsumerMaxRequestBatchNegativeError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 86c062c8..b39c1af9 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -4201,6 +4201,9 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { opts.JetStream = true opts.JetStreamDomain = "domain" opts.StoreDir = tdir + maxStore := int64(1024 * 1024 * 1024) + opts.maxStoreSet = true + opts.JetStreamMaxStore = maxStore rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lopts.LeafNode.Host, lopts.LeafNode.Port)) opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{rurl}}} @@ -12070,7 +12073,7 @@ func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) { require_NoError(t, err) _, err = js.UpdateStream(&nats.StreamConfig{Name: "MIRROR", Mirror: &nats.StreamSource{Name: "ORIGINAL"}, Subjects: []string{"x"}}) - if err == nil || err.Error() != "stream mirrors may not have subjects" { + if err == nil || err.Error() != "stream mirrors can not contain subjects" { t.Fatalf("Expected to not be able to put subjects on a stream, got: %+v", err) } } @@ -13511,9 +13514,9 @@ func TestJetStreamLongStreamNamesAndPubAck(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - data := make([]byte, 256) + data := make([]byte, 255) rand.Read(data) - stream := base64.StdEncoding.EncodeToString(data)[:256] + stream := base64.StdEncoding.EncodeToString(data)[:255] cfg := &nats.StreamConfig{ Name: stream, @@ -16573,6 +16576,83 @@ func TestJetStreamCrossAccounts(t *testing.T) { } } +func TestJetStreamInvalidRestoreRequests(t *testing.T) { + test := func(t *testing.T, s *Server, replica int) { + nc := natsConnect(t, s.ClientURL()) + // test invalid stream config in restore request + require_fail := func(cfg StreamConfig, errDesc string) { + t.Helper() + rreq := &JSApiStreamRestoreRequest{ + Config: cfg, + } + req, err := json.Marshal(rreq) + require_NoError(t, err) + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "fail"), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var rresp JSApiStreamRestoreResponse + json.Unmarshal(rmsg.Data, &rresp) + require_True(t, rresp.Error != nil) + require_Equal(t, rresp.Error.Description, errDesc) + } + require_fail(StreamConfig{Name: "fail", MaxBytes: 1024, Storage: FileStorage, Replicas: 6}, + "maximum replicas is 5") + require_fail(StreamConfig{Name: "fail", MaxBytes: 2 * 1012 * 1024, Storage: FileStorage, Replicas: replica}, + "insufficient storage resources available") + js, err := nc.JetStream() + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "stream", MaxBytes: 1024, Storage: nats.FileStorage, Replicas: 1}) + require_NoError(t, err) + require_fail(StreamConfig{Name: "fail", MaxBytes: 1024, Storage: FileStorage}, + "maximum number of streams reached") + } + + commonAccSection := ` + no_auth_user: u + accounts { + ONE { + users = [ { user: "u", pass: "s3cr3t!" } ] + jetstream: { + max_store: 1Mb + max_streams: 1 + } + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }` + + t.Run("clustered", func(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + max_mem_store: 2MB, + max_file_store: 8MB, + store_dir: '%s', + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + }`+commonAccSection, "clust", 3) + defer c.shutdown() + s := c.randomServer() + test(t, s, 3) + }) + t.Run("single", func(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s'} + %s`, storeDir, commonAccSection))) + defer removeFile(t, conf) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + test(t, s, 1) + }) +} + func TestJetStreamLimits(t *testing.T) { test := func(t *testing.T, s *Server) { nc := natsConnect(t, s.ClientURL()) @@ -16585,9 +16665,9 @@ func TestJetStreamLimits(t *testing.T) { require_NoError(t, err) require_True(t, si.Config.Duplicates == time.Minute) - si, err = js.AddStream(&nats.StreamConfig{Name: "bar", Duplicates: 500 * time.Millisecond}) + si, err = js.AddStream(&nats.StreamConfig{Name: "bar", Duplicates: 1500 * time.Millisecond}) require_NoError(t, err) - require_True(t, si.Config.Duplicates == 500*time.Millisecond) + require_True(t, si.Config.Duplicates == 1500*time.Millisecond) _, err = js.UpdateStream(&nats.StreamConfig{Name: "bar", Duplicates: 2 * time.Minute}) require_Error(t, err) @@ -16600,10 +16680,16 @@ func TestJetStreamLimits(t *testing.T) { ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dur1", AckPolicy: nats.AckExplicitPolicy}) require_NoError(t, err) require_True(t, ci.Config.MaxAckPending == 1000) + require_True(t, ci.Config.MaxRequestBatch == 250) + + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dur2", AckPolicy: nats.AckExplicitPolicy, MaxRequestBatch: 500}) + require_Error(t, err) + require_Equal(t, err.Error(), "consumer max request batch exceeds server limit of 250") ci, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dur2", AckPolicy: nats.AckExplicitPolicy, MaxAckPending: 500}) require_NoError(t, err) require_True(t, ci.Config.MaxAckPending == 500) + require_True(t, ci.Config.MaxRequestBatch == 250) _, err = js.UpdateConsumer("foo", &nats.ConsumerConfig{Durable: "dur2", AckPolicy: nats.AckExplicitPolicy, MaxAckPending: 2000}) require_Error(t, err) @@ -16622,7 +16708,7 @@ func TestJetStreamLimits(t *testing.T) { max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s', - limits: {duplicate_window: "1m"} + limits: {duplicate_window: "1m", max_request_batch: 250} } cluster { name: %s @@ -16659,7 +16745,7 @@ func TestJetStreamLimits(t *testing.T) { max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s', - limits: {duplicate_window: "1m"} + limits: {duplicate_window: "1m", max_request_batch: 250} } no_auth_user: u accounts { diff --git a/server/opts.go b/server/opts.go index 5c77d876..c3dcbe31 100644 --- a/server/opts.go +++ b/server/opts.go @@ -185,9 +185,10 @@ type RemoteLeafOpts struct { } type JSLimitOpts struct { - MaxAckPending int - MaxHAAssets int - Duplicates time.Duration + MaxRequestBatch int + MaxAckPending int + MaxHAAssets int + Duplicates time.Duration } // Options block for nats-server. @@ -1818,6 +1819,8 @@ func parseJetStreamLimits(v interface{}, opts *Options, errors *[]error, warning lim.MaxAckPending = int(mv.(int64)) case "max_ha_assets": lim.MaxHAAssets = int(mv.(int64)) + case "max_request_batch": + lim.MaxRequestBatch = int(mv.(int64)) case "duplicate_window": var err error lim.Duplicates, err = time.ParseDuration(mv.(string)) diff --git a/server/stream.go b/server/stream.go index d2a2605d..2b87d92d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -295,9 +295,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } // Sensible defaults. - cfg, err := checkStreamCfg(config, &s.getOpts().JetStreamLimits) - if err != nil { - return nil, NewJSStreamInvalidConfigError(err, Unless(err)) + cfg, apiErr := s.checkStreamCfg(config, a) + if apiErr != nil { + return nil, apiErr } singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() @@ -347,30 +347,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } } - // Check for mirror designation. - if cfg.Mirror != nil { - // Can't have subjects. - if len(cfg.Subjects) > 0 { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream mirrors can not also contain subjects") - } - if len(cfg.Sources) > 0 { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream mirrors can not also contain other sources") - } - if cfg.Mirror.FilterSubject != _EMPTY_ { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream mirrors can not contain filtered subjects") - } - if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream mirrors can not have both start seq and start time configured") - } - } else if len(cfg.Subjects) == 0 && len(cfg.Sources) == 0 { - jsa.mu.Unlock() - return nil, fmt.Errorf("stream needs at least one configured subject or mirror") - } - // Setup our internal indexed names here for sources. if len(cfg.Sources) > 0 { for _, ssi := range cfg.Sources { @@ -870,18 +846,20 @@ func (jsa *jsAccount) subjectsOverlap(subjects []string) bool { // StreamDefaultDuplicatesWindow default duplicates window. const StreamDefaultDuplicatesWindow = 2 * time.Minute -func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error) { +func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfig, *ApiError) { + lim := &s.getOpts().JetStreamLimits + if config == nil { - return StreamConfig{}, fmt.Errorf("stream configuration invalid") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration invalid")) } if !isValidName(config.Name) { - return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream name is required and can not contain '.', '*', '>'")) } if len(config.Name) > JSMaxNameLen { - return StreamConfig{}, fmt.Errorf("stream name is too long, maximum allowed is %d", JSMaxNameLen) + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream name is too long, maximum allowed is %d", JSMaxNameLen)) } if len(config.Description) > JSMaxDescriptionLen { - return StreamConfig{}, fmt.Errorf("stream description is too long, maximum allowed is %d", JSMaxDescriptionLen) + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream description is too long, maximum allowed is %d", JSMaxDescriptionLen)) } cfg := *config @@ -894,7 +872,7 @@ func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error cfg.Replicas = 1 } if cfg.Replicas > StreamMaxReplicas { - return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas) + return cfg, NewJSStreamInvalidConfigError(fmt.Errorf("maximum replicas is %d", StreamMaxReplicas)) } if cfg.MaxMsgs == 0 { cfg.MaxMsgs = -1 @@ -911,7 +889,7 @@ func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error if cfg.MaxConsumers == 0 { cfg.MaxConsumers = -1 } - if cfg.Duplicates == 0 { + if cfg.Duplicates == 0 && cfg.Mirror == nil { maxWindow := StreamDefaultDuplicatesWindow if lim.Duplicates > 0 && maxWindow > lim.Duplicates { maxWindow = lim.Duplicates @@ -923,18 +901,156 @@ func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error } } if cfg.Duplicates < 0 { - return StreamConfig{}, fmt.Errorf("duplicates window can not be negative") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicates window can not be negative")) } // Check that duplicates is not larger then age if set. if cfg.MaxAge != 0 && cfg.Duplicates > cfg.MaxAge { - return StreamConfig{}, fmt.Errorf("duplicates window can not be larger then max age") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicates window can not be larger then max age")) } if lim.Duplicates > 0 && cfg.Duplicates > lim.Duplicates { - return StreamConfig{}, fmt.Errorf("duplicates window can not be larger then server limit of %v", lim.Duplicates.String()) + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicates window can not be larger then server limit of %v", lim.Duplicates.String())) } if cfg.DenyPurge && cfg.AllowRollup { - return StreamConfig{}, fmt.Errorf("roll-ups require the purge permission") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("roll-ups require the purge permission")) + } + + getStream := func(streamName string) (bool, StreamConfig) { + var exists bool + var cfg StreamConfig + if s.JetStreamIsClustered() { + if js, _ := s.getJetStreamCluster(); js != nil { + js.mu.RLock() + if sa := js.streamAssignment(acc.Name, streamName); sa != nil { + cfg = *sa.Config + exists = true + } + js.mu.RUnlock() + } + } else if mset, err := acc.lookupStream(streamName); err == nil { + cfg = mset.cfg + exists = true + } + return exists, cfg + } + hasStream := func(streamName string) (bool, int32, []string) { + exists, cfg := getStream(streamName) + return exists, cfg.MaxMsgSize, cfg.Subjects + } + + var streamSubs []string + var deliveryPrefixes []string + var apiPrefixes []string + + // Do some pre-checking for mirror config to avoid cycles in clustered mode. + if cfg.Mirror != nil { + if len(cfg.Subjects) > 0 { + return StreamConfig{}, NewJSMirrorWithSubjectsError() + + } + if len(cfg.Sources) > 0 { + return StreamConfig{}, NewJSMirrorWithSourcesError() + } + if cfg.Mirror.FilterSubject != _EMPTY_ { + return StreamConfig{}, NewJSMirrorWithSubjectFiltersError() + } + if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil { + return StreamConfig{}, NewJSMirrorWithStartSeqAndTimeError() + } + if cfg.Duplicates != time.Duration(0) { + return StreamConfig{}, NewJSStreamInvalidConfigError( + errors.New("stream mirrors do not make use of a de-duplication window")) + } + // We do not require other stream to exist anymore, but if we can see it check payloads. + exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name) + if len(subs) > 0 { + streamSubs = append(streamSubs, subs...) + } + if exists && cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { + return StreamConfig{}, NewJSMirrorMaxMessageSizeTooBigError() + } + if cfg.Mirror.External != nil { + if cfg.Mirror.External.DeliverPrefix != _EMPTY_ { + deliveryPrefixes = append(deliveryPrefixes, cfg.Mirror.External.DeliverPrefix) + } + if cfg.Mirror.External.ApiPrefix != _EMPTY_ { + apiPrefixes = append(apiPrefixes, cfg.Mirror.External.ApiPrefix) + } + } + } + if len(cfg.Sources) > 0 { + for _, src := range cfg.Sources { + if src.External == nil { + continue + } + exists, maxMsgSize, subs := hasStream(src.Name) + if len(subs) > 0 { + streamSubs = append(streamSubs, subs...) + } + if src.External.DeliverPrefix != _EMPTY_ { + deliveryPrefixes = append(deliveryPrefixes, src.External.DeliverPrefix) + } + if src.External.ApiPrefix != _EMPTY_ { + apiPrefixes = append(apiPrefixes, src.External.ApiPrefix) + } + if exists && cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { + return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError() + } + } + } + // check prefix overlap with subjects + for _, pfx := range deliveryPrefixes { + if !IsValidPublishSubject(pfx) { + return StreamConfig{}, NewJSStreamInvalidExternalDeliverySubjError(pfx) + } + for _, sub := range streamSubs { + if SubjectsCollide(sub, fmt.Sprintf("%s.%s", pfx, sub)) { + return StreamConfig{}, NewJSStreamExternalDelPrefixOverlapsError(pfx, sub) + } + } + } + // check if api prefixes overlap + for _, apiPfx := range apiPrefixes { + if !IsValidPublishSubject(apiPfx) { + return StreamConfig{}, NewJSStreamInvalidConfigError( + fmt.Errorf("stream external api prefix %q must be a valid subject without wildcards", apiPfx)) + } + if SubjectsCollide(apiPfx, JSApiPrefix) { + return StreamConfig{}, NewJSStreamExternalApiOverlapError(apiPfx, JSApiPrefix) + } + } + + // cycle check for source cycle + toVisit := []*StreamConfig{&cfg} + visited := make(map[string]struct{}) + for len(toVisit) > 0 { + cfg := toVisit[0] + toVisit = toVisit[1:] + visited[cfg.Name] = struct{}{} + for _, src := range cfg.Sources { + if src.External != nil { + // TODO (mh) look up service imports and see if src.External.ApiPrefix returns an account + // this will be much easier without the delivery subject + continue + } + if _, ok := visited[src.Name]; ok { + return StreamConfig{}, NewJSStreamInvalidConfigError(errors.New("detected cycle")) + } + if exists, cfg := getStream(src.Name); exists { + toVisit = append(toVisit, &cfg) + } + } + // Avoid cycles hiding behind mirrors + if m := cfg.Mirror; m != nil { + if m.External == nil { + if _, ok := visited[m.Name]; ok { + return StreamConfig{}, NewJSStreamInvalidConfigError(errors.New("detected cycle")) + } + if exists, cfg := getStream(m.Name); exists { + toVisit = append(toVisit, &cfg) + } + } + } } if len(cfg.Subjects) == 0 { @@ -943,27 +1059,40 @@ func checkStreamCfg(config *StreamConfig, lim *JSLimitOpts) (StreamConfig, error } } else { if cfg.Mirror != nil { - return StreamConfig{}, fmt.Errorf("stream mirrors may not have subjects") + return StreamConfig{}, NewJSMirrorWithSubjectsError() } // We can allow overlaps, but don't allow direct duplicates. dset := make(map[string]struct{}, len(cfg.Subjects)) for _, subj := range cfg.Subjects { if _, ok := dset[subj]; ok { - return StreamConfig{}, fmt.Errorf("duplicate subjects detected") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected")) } // Also check to make sure we do not overlap with our $JS API subjects. if subjectIsSubsetMatch(subj, "$JS.API.>") { - return StreamConfig{}, fmt.Errorf("subjects overlap with jetstream api") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")) } // Make sure the subject is valid. if !IsValidSubject(subj) { - return StreamConfig{}, fmt.Errorf("invalid subject") + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("invalid subject")) } // Mark for duplicate check. dset[subj] = struct{}{} } } + + if len(cfg.Subjects) == 0 && len(cfg.Sources) == 0 && cfg.Mirror == nil { + return StreamConfig{}, NewJSStreamInvalidConfigError( + fmt.Errorf("stream needs at least one configured subject or be a source/mirror")) + } + + // Check for MaxBytes required and it's limit + if required, limit := acc.maxBytesLimits(&cfg); required && cfg.MaxBytes <= 0 { + return StreamConfig{}, NewJSStreamMaxBytesRequiredError() + } else if limit > 0 && cfg.MaxBytes > limit { + return StreamConfig{}, NewJSStreamMaxStreamBytesExceededError() + } + return cfg, nil } @@ -985,10 +1114,10 @@ func (mset *stream) fileStoreConfig() (FileStoreConfig, error) { } // Do not hold jsAccount or jetStream lock -func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, lim *JSLimitOpts) (*StreamConfig, error) { - cfg, err := checkStreamCfg(new, lim) - if err != nil { - return nil, NewJSStreamInvalidConfigError(err, Unless(err)) +func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*StreamConfig, error) { + cfg, apiErr := s.checkStreamCfg(new, jsa.acc()) + if apiErr != nil { + return nil, apiErr } // Name must match. @@ -1096,8 +1225,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, lim *JSLimitOpts // Update will allow certain configuration properties of an existing stream to be updated. func (mset *stream) update(config *StreamConfig) error { - ocfg := mset.config() - cfg, err := mset.jsa.configUpdateCheck(&ocfg, config, &mset.srv.getOpts().JetStreamLimits) + mset.mu.RLock() + ocfg := mset.cfg + s := mset.srv + mset.mu.RUnlock() + cfg, err := mset.jsa.configUpdateCheck(&ocfg, config, s) if err != nil { return NewJSStreamInvalidConfigError(err, Unless(err)) } @@ -3793,16 +3925,16 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error return nil, errors.New("nil config on stream restore") } - cfg, err := checkStreamCfg(ncfg, &a.srv.getOpts().JetStreamLimits) - if err != nil { - return nil, NewJSStreamNotFoundError(Unless(err)) - } - - _, jsa, err := a.checkForJetStream() + s, jsa, err := a.checkForJetStream() if err != nil { return nil, err } + cfg, apiErr := s.checkStreamCfg(ncfg, a) + if apiErr != nil { + return nil, apiErr + } + sd := filepath.Join(jsa.storeDir, snapsDir) if _, err := os.Stat(sd); os.IsNotExist(err) { if err := os.MkdirAll(sd, defaultDirPerms); err != nil {