[CHANGED] JetStream: flow control requires heartbeats

The server will now reject the creation of a push consumer with
flow control if no heartbeat is set.

Resolves #2520

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-09-15 09:50:15 -06:00
parent d6f9a06f05
commit 108d4060ff
5 changed files with 77 additions and 1 deletions

View File

@@ -152,6 +152,9 @@ const (
// JSConsumerWQRequiresExplicitAckErr workqueue stream requires explicit ack
JSConsumerWQRequiresExplicitAckErr ErrorIdentifier = 10098
// JSConsumerWithFlowControlNeedsHeartbeats consumer with flow control also needs heartbeats
JSConsumerWithFlowControlNeedsHeartbeats ErrorIdentifier = 10108
// JSInsufficientResourcesErr insufficient resources
JSInsufficientResourcesErr ErrorIdentifier = 10023
@@ -375,6 +378,7 @@ var (
JSConsumerWQConsumerNotUniqueErr: {Code: 400, ErrCode: 10100, Description: "filtered consumer not unique on workqueue stream"},
JSConsumerWQMultipleUnfilteredErr: {Code: 400, ErrCode: 10099, Description: "multiple non-filtered consumers not allowed on workqueue stream"},
JSConsumerWQRequiresExplicitAckErr: {Code: 400, ErrCode: 10098, Description: "workqueue stream requires explicit ack"},
JSConsumerWithFlowControlNeedsHeartbeats: {Code: 400, ErrCode: 10108, Description: "consumer with flow control also needs heartbeats"},
JSInsufficientResourcesErr: {Code: 503, ErrCode: 10023, Description: "insufficient resources"},
JSInvalidJSONErr: {Code: 400, ErrCode: 10025, Description: "invalid JSON"},
JSMaximumConsumersLimitErr: {Code: 400, ErrCode: 10026, Description: "maximum consumers limit reached"},
@@ -983,6 +987,16 @@ func NewJSConsumerWQRequiresExplicitAckError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSConsumerWQRequiresExplicitAckErr]
}
// NewJSConsumerWithFlowControlNeedsHeartbeatsError creates a new JSConsumerWithFlowControlNeedsHeartbeats error: "consumer with flow control also needs heartbeats"
func NewJSConsumerWithFlowControlNeedsHeartbeatsError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSConsumerWithFlowControlNeedsHeartbeats]
}
// NewJSInsufficientResourcesError creates a new JSInsufficientResourcesErr error: "insufficient resources"
func NewJSInsufficientResourcesError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)