diff --git a/server/consumer.go b/server/consumer.go index 763d1428..ce36db0a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -420,7 +420,7 @@ const ( ) // Helper function to set consumer config defaults from above. -func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits) { +func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits) { // Set to default if not specified. if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 { config.MaxWaiting = JSWaitQueueDefaultMax @@ -452,6 +452,12 @@ func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts, accLim if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 { config.MaxRequestBatch = lim.MaxRequestBatch } + if config.MaxAckPending == 0 { + config.MaxAckPending = streamCfg.ConsumerLimits.MaxAckPending + } + if config.InactiveThreshold == 0 { + config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold + } } // Check the consumer config. If we are recovering don't check filter subjects. @@ -545,6 +551,12 @@ func checkConsumerCfg( if accLim.MaxAckPending > 0 && config.MaxAckPending > accLim.MaxAckPending { return NewJSConsumerMaxPendingAckExcessError(accLim.MaxAckPending) } + if cfg.ConsumerLimits.MaxAckPending > 0 && config.MaxAckPending > cfg.ConsumerLimits.MaxAckPending { + return NewJSConsumerMaxPendingAckExcessError(cfg.ConsumerLimits.MaxAckPending) + } + if cfg.ConsumerLimits.InactiveThreshold > 0 && config.InactiveThreshold > cfg.ConsumerLimits.InactiveThreshold { + return NewJSConsumerInactiveThresholdExcessError(cfg.ConsumerLimits.InactiveThreshold) + } // Direct need to be non-mapped ephemerals. if config.Direct { @@ -706,8 +718,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } srvLim := &s.getOpts().JetStreamLimits - // Make sure we have sane defaults. - setConsumerConfigDefaults(config, srvLim, &selectedLimits) + // Make sure we have sane defaults. Do so with the JS lock, otherwise a + // badly timed meta snapshot can result in a race condition. + mset.js.mu.Lock() + setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits) + mset.js.mu.Unlock() if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil { return nil, err diff --git a/server/errors.json b/server/errors.json index 123889ef..79602466 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1508,5 +1508,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerInactiveThresholdExcess", + "code": 400, + "error_code": 10153, + "description": "consumer inactive threshold exceeds system limit of {limit}", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d67fd450..9c4cf106 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6807,7 +6807,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } srvLim := &s.getOpts().JetStreamLimits // Make sure we have sane defaults - setConsumerConfigDefaults(cfg, srvLim, selectedLimits) + setConsumerConfigDefaults(cfg, &streamCfg, srvLim, selectedLimits) if err := checkConsumerCfg(cfg, srvLim, &streamCfg, acc, selectedLimits, false); err != nil { resp.Error = err diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 0907d77a..f6ad499a 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5405,3 +5405,142 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { cia.Created, cib.Created = now, now checkConsumerInfo(cia, cib) } + +func TestJetStreamClusterConsumerDefaultsFromStream(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + streamTmpl := &StreamConfig{ + Name: "test", + Subjects: []string{"test.*"}, + Storage: MemoryStorage, + ConsumerLimits: StreamConsumerLimits{ + MaxAckPending: 0, + InactiveThreshold: 0, + }, + } + + // Since nats.go doesn't yet know about the consumer limits, craft + // the stream configuration request by hand. + streamCreate := func(maxAckPending int, inactiveThreshold time.Duration) (*StreamConfig, error) { + cfg := streamTmpl + cfg.ConsumerLimits = StreamConsumerLimits{ + MaxAckPending: maxAckPending, + InactiveThreshold: inactiveThreshold, + } + j, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + msg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, "test"), j, time.Second*3) + if err != nil { + return nil, err + } + var resp JSApiStreamCreateResponse + if err := json.Unmarshal(msg.Data, &resp); err != nil { + return nil, err + } + if resp.StreamInfo == nil { + return nil, resp.ApiResponse.ToError() + } + return &resp.Config, resp.ApiResponse.ToError() + } + streamUpdate := func(maxAckPending int, inactiveThreshold time.Duration) (*StreamConfig, error) { + cfg := streamTmpl + cfg.ConsumerLimits = StreamConsumerLimits{ + MaxAckPending: maxAckPending, + InactiveThreshold: inactiveThreshold, + } + j, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + msg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, "test"), j, time.Second*3) + if err != nil { + return nil, err + } + var resp JSApiStreamUpdateResponse + if err := json.Unmarshal(msg.Data, &resp); err != nil { + return nil, err + } + if resp.StreamInfo == nil { + return nil, resp.ApiResponse.ToError() + } + return &resp.Config, resp.ApiResponse.ToError() + } + + if _, err := streamCreate(15, time.Second); err != nil { + t.Fatalf("Failed to add stream: %v", err) + } + + t.Run("InheritDefaultsFromStream", func(t *testing.T) { + ci, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "InheritDefaultsFromStream", + }) + require_NoError(t, err) + + switch { + case ci.Config.InactiveThreshold != time.Second: + t.Fatalf("InactiveThreshold should be 1s, got %s", ci.Config.InactiveThreshold) + case ci.Config.MaxAckPending != 15: + t.Fatalf("MaxAckPending should be 15, got %d", ci.Config.MaxAckPending) + } + }) + + t.Run("CreateConsumerErrorOnExceedMaxAckPending", func(t *testing.T) { + _, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "CreateConsumerErrorOnExceedMaxAckPending", + MaxAckPending: 30, + }) + switch e := err.(type) { + case *nats.APIError: + if ErrorIdentifier(e.ErrorCode) != JSConsumerMaxPendingAckExcessErrF { + t.Fatalf("invalid error code, got %d, wanted %d", e.ErrorCode, JSConsumerMaxPendingAckExcessErrF) + } + default: + t.Fatalf("should have returned API error, got %T", e) + } + }) + + t.Run("CreateConsumerErrorOnExceedInactiveThreshold", func(t *testing.T) { + _, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "CreateConsumerErrorOnExceedInactiveThreshold", + InactiveThreshold: time.Second * 2, + }) + switch e := err.(type) { + case *nats.APIError: + if ErrorIdentifier(e.ErrorCode) != JSConsumerInactiveThresholdExcess { + t.Fatalf("invalid error code, got %d, wanted %d", e.ErrorCode, JSConsumerInactiveThresholdExcess) + } + default: + t.Fatalf("should have returned API error, got %T", e) + } + }) + + t.Run("UpdateStreamErrorOnViolateConsumerMaxAckPending", func(t *testing.T) { + _, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "UpdateStreamErrorOnViolateConsumerMaxAckPending", + MaxAckPending: 15, + }) + require_NoError(t, err) + + if _, err = streamUpdate(10, 0); err == nil { + t.Fatalf("stream update should have errored but didn't") + } + }) + + t.Run("UpdateStreamErrorOnViolateConsumerInactiveThreshold", func(t *testing.T) { + _, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "UpdateStreamErrorOnViolateConsumerInactiveThreshold", + InactiveThreshold: time.Second, + }) + require_NoError(t, err) + + if _, err = streamUpdate(0, time.Second/2); err == nil { + t.Fatalf("stream update should have errored but didn't") + } + }) +} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index a39e612c..1543eeeb 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -113,6 +113,9 @@ const ( // JSConsumerHBRequiresPushErr consumer idle heartbeat requires a push based consumer JSConsumerHBRequiresPushErr ErrorIdentifier = 10088 + // JSConsumerInactiveThresholdExcess consumer inactive threshold exceeds system limit of {limit} + JSConsumerInactiveThresholdExcess ErrorIdentifier = 10153 + // JSConsumerInvalidDeliverSubject invalid push consumer deliver subject JSConsumerInvalidDeliverSubject ErrorIdentifier = 10112 @@ -497,6 +500,7 @@ var ( JSConsumerFCRequiresPushErr: {Code: 400, ErrCode: 10089, Description: "consumer flow control requires a push based consumer"}, JSConsumerFilterNotSubsetErr: {Code: 400, ErrCode: 10093, Description: "consumer filter subject is not a valid subset of the interest subjects"}, JSConsumerHBRequiresPushErr: {Code: 400, ErrCode: 10088, Description: "consumer idle heartbeat requires a push based consumer"}, + JSConsumerInactiveThresholdExcess: {Code: 400, ErrCode: 10153, Description: "consumer inactive threshold exceeds system limit of {limit}"}, JSConsumerInvalidDeliverSubject: {Code: 400, ErrCode: 10112, Description: "invalid push consumer deliver subject"}, JSConsumerInvalidPolicyErrF: {Code: 400, ErrCode: 10094, Description: "{err}"}, JSConsumerInvalidSamplingErrF: {Code: 400, ErrCode: 10095, Description: "failed to parse consumer sampling configuration: {err}"}, @@ -1015,6 +1019,22 @@ func NewJSConsumerHBRequiresPushError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerHBRequiresPushErr] } +// NewJSConsumerInactiveThresholdExcessError creates a new JSConsumerInactiveThresholdExcess error: "consumer inactive threshold exceeds system limit of {limit}" +func NewJSConsumerInactiveThresholdExcessError(limit interface{}, opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + e := ApiErrors[JSConsumerInactiveThresholdExcess] + args := e.toReplacerArgs([]interface{}{"{limit}", limit}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } +} + // NewJSConsumerInvalidDeliverSubjectError creates a new JSConsumerInvalidDeliverSubject error: "invalid push consumer deliver subject" func NewJSConsumerInvalidDeliverSubjectError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 9356dc5b..01b96407 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21653,3 +21653,116 @@ func TestJetStreamChangeMaxMessagesPerSubject(t *testing.T) { require_NoError(t, expectMsgs(3)) } + +func TestJetStreamConsumerDefaultsFromStream(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + acc := s.GlobalAccount() + if _, err := acc.addStream(&StreamConfig{ + Name: "test", + Subjects: []string{"test.*"}, + ConsumerLimits: StreamConsumerLimits{ + MaxAckPending: 15, + InactiveThreshold: time.Second, + }, + }); err != nil { + t.Fatalf("Failed to add stream: %v", err) + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Failed to connect to JetStream: %v", err) + } + + t.Run("InheritDefaultsFromStream", func(t *testing.T) { + ci, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "InheritDefaultsFromStream", + }) + require_NoError(t, err) + + switch { + case ci.Config.InactiveThreshold != time.Second: + t.Fatalf("InactiveThreshold should be 1s, got %s", ci.Config.InactiveThreshold) + case ci.Config.MaxAckPending != 15: + t.Fatalf("MaxAckPending should be 15, got %d", ci.Config.MaxAckPending) + } + }) + + t.Run("CreateConsumerErrorOnExceedMaxAckPending", func(t *testing.T) { + _, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "CreateConsumerErrorOnExceedMaxAckPending", + MaxAckPending: 30, + }) + switch e := err.(type) { + case *nats.APIError: + if ErrorIdentifier(e.ErrorCode) != JSConsumerMaxPendingAckExcessErrF { + t.Fatalf("invalid error code, got %d, wanted %d", e.ErrorCode, JSConsumerMaxPendingAckExcessErrF) + } + default: + t.Fatalf("should have returned API error, got %T", e) + } + }) + + t.Run("CreateConsumerErrorOnExceedInactiveThreshold", func(t *testing.T) { + _, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "CreateConsumerErrorOnExceedInactiveThreshold", + InactiveThreshold: time.Second * 2, + }) + switch e := err.(type) { + case *nats.APIError: + if ErrorIdentifier(e.ErrorCode) != JSConsumerInactiveThresholdExcess { + t.Fatalf("invalid error code, got %d, wanted %d", e.ErrorCode, JSConsumerInactiveThresholdExcess) + } + default: + t.Fatalf("should have returned API error, got %T", e) + } + }) + + t.Run("UpdateStreamErrorOnViolateConsumerMaxAckPending", func(t *testing.T) { + _, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "UpdateStreamErrorOnViolateConsumerMaxAckPending", + MaxAckPending: 15, + }) + require_NoError(t, err) + + stream, err := acc.lookupStream("test") + require_NoError(t, err) + + err = stream.update(&StreamConfig{ + Name: "test", + Subjects: []string{"test.*"}, + ConsumerLimits: StreamConsumerLimits{ + MaxAckPending: 10, + }, + }) + if err == nil { + t.Fatalf("stream update should have errored but didn't") + } + }) + + t.Run("UpdateStreamErrorOnViolateConsumerInactiveThreshold", func(t *testing.T) { + _, err := js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "UpdateStreamErrorOnViolateConsumerInactiveThreshold", + InactiveThreshold: time.Second, + }) + require_NoError(t, err) + + stream, err := acc.lookupStream("test") + require_NoError(t, err) + + err = stream.update(&StreamConfig{ + Name: "test", + Subjects: []string{"test.*"}, + ConsumerLimits: StreamConsumerLimits{ + InactiveThreshold: time.Second / 2, + }, + }) + if err == nil { + t.Fatalf("stream update should have errored but didn't") + } + }) +} diff --git a/server/stream.go b/server/stream.go index 9bb254f3..8a74b458 100644 --- a/server/stream.go +++ b/server/stream.go @@ -86,10 +86,20 @@ type StreamConfig struct { // all older messages using a special msg header. AllowRollup bool `json:"allow_rollup_hdrs"` + // The following defaults will apply to consumers when created against + // this stream, unless overridden manually. + // TODO(nat): Can/should we name these better? + ConsumerLimits StreamConsumerLimits `json:"consumer_limits"` + // Metadata is additional metadata for the Stream. Metadata map[string]string `json:"metadata,omitempty"` } +type StreamConsumerLimits struct { + InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` + MaxAckPending int `json:"max_ack_pending,omitempty"` +} + // SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received type SubjectTransformConfig struct { Source string `json:"src"` @@ -1671,6 +1681,35 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) return NewJSStreamInvalidConfigError(err, Unless(err)) } + // In the event that some of the stream-level limits have changed, yell appropriately + // if any of the consumers exceed that limit. + updateLimits := ocfg.ConsumerLimits.InactiveThreshold != cfg.ConsumerLimits.InactiveThreshold || + ocfg.ConsumerLimits.MaxAckPending != cfg.ConsumerLimits.MaxAckPending + if updateLimits { + var errorConsumers []string + consumers := map[string]*ConsumerConfig{} + if mset.js.isClustered() { + for _, c := range mset.sa.consumers { + consumers[c.Name] = c.Config + } + } else { + for _, c := range mset.consumers { + consumers[c.name] = &c.cfg + } + } + for name, ccfg := range consumers { + if ccfg.InactiveThreshold > cfg.ConsumerLimits.InactiveThreshold || + ccfg.MaxAckPending > cfg.ConsumerLimits.MaxAckPending { + errorConsumers = append(errorConsumers, name) + } + } + if len(errorConsumers) > 0 { + // TODO(nat): Return a parsable error so that we can surface something + // sensible through the JS API. + return fmt.Errorf("change to limits violates consumers: %s", strings.Join(errorConsumers, ", ")) + } + } + jsa.mu.RLock() if jsa.subjectsOverlap(cfg.Subjects, mset) { jsa.mu.RUnlock()