diff --git a/server/errors.json b/server/errors.json index 5f4d0fdb..08f3a369 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1058,5 +1058,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerWithFlowControlNeedsHeartbeats", + "code": 400, + "error_code": 10108, + "description": "consumer with flow control also needs heartbeats", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } -] +] \ No newline at end of file diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 8ce6e3f9..7793512a 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3046,6 +3046,13 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj return } + // We reject if flow control is set without heartbeats. + if req.Config.FlowControl && req.Config.Heartbeat == 0 { + resp.Error = NewJSConsumerWithFlowControlNeedsHeartbeatsError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we have sane defaults. setConsumerConfigDefaults(&req.Config) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 8a897e4a..d50cb455 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -8626,6 +8626,26 @@ func TestJetStreamClusterLargeHeaders(t *testing.T) { } } +func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "dlc", + DeliverSubject: nats.NewInbox(), + FlowControl: true, + }); err == nil || IsNatsErr(err, JSConsumerWithFlowControlNeedsHeartbeats) { + t.Fatalf("Unexpected error: %v", err) + } +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index cc3fa124..6ae5db1f 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -152,6 +152,9 @@ const ( // JSConsumerWQRequiresExplicitAckErr workqueue stream requires explicit ack JSConsumerWQRequiresExplicitAckErr ErrorIdentifier = 10098 + // JSConsumerWithFlowControlNeedsHeartbeats consumer with flow control also needs heartbeats + JSConsumerWithFlowControlNeedsHeartbeats ErrorIdentifier = 10108 + // JSInsufficientResourcesErr insufficient resources JSInsufficientResourcesErr ErrorIdentifier = 10023 @@ -375,6 +378,7 @@ var ( JSConsumerWQConsumerNotUniqueErr: {Code: 400, ErrCode: 10100, Description: "filtered consumer not unique on workqueue stream"}, JSConsumerWQMultipleUnfilteredErr: {Code: 400, ErrCode: 10099, Description: "multiple non-filtered consumers not allowed on workqueue stream"}, JSConsumerWQRequiresExplicitAckErr: {Code: 400, ErrCode: 10098, Description: "workqueue stream requires explicit ack"}, + JSConsumerWithFlowControlNeedsHeartbeats: {Code: 400, ErrCode: 10108, Description: "consumer with flow control also needs heartbeats"}, JSInsufficientResourcesErr: {Code: 503, ErrCode: 10023, Description: "insufficient resources"}, JSInvalidJSONErr: {Code: 400, ErrCode: 10025, Description: "invalid JSON"}, JSMaximumConsumersLimitErr: {Code: 400, ErrCode: 10026, Description: "maximum consumers limit reached"}, @@ -983,6 +987,16 @@ func NewJSConsumerWQRequiresExplicitAckError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerWQRequiresExplicitAckErr] } +// NewJSConsumerWithFlowControlNeedsHeartbeatsError creates a new JSConsumerWithFlowControlNeedsHeartbeats error: "consumer with flow control also needs heartbeats" +func NewJSConsumerWithFlowControlNeedsHeartbeatsError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerWithFlowControlNeedsHeartbeats] +} + // NewJSInsufficientResourcesError creates a new JSInsufficientResourcesErr error: "insufficient resources" func NewJSInsufficientResourcesError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index cdfd1acd..a69be9e6 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -7143,6 +7143,7 @@ func TestJetStreamPushConsumerFlowControl(t *testing.T) { Durable: "dlc", DeliverSubject: sub.Subject, FlowControl: true, + Heartbeat: 5 * time.Second, }, } req, err := json.Marshal(obsReq) @@ -7218,6 +7219,30 @@ func TestJetStreamPushConsumerFlowControl(t *testing.T) { } } +func TestJetStreamFlowControlRequiresHeartbeats(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "dlc", + DeliverSubject: nats.NewInbox(), + FlowControl: true, + }); err == nil || IsNatsErr(err, JSConsumerWithFlowControlNeedsHeartbeats) { + t.Fatalf("Unexpected error: %v", err) + } +} + func TestJetStreamPushConsumerIdleHeartbeats(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown()