- Improve and fix stream source consumer sequence number finding on leadership change or stream config update to work with the new stream sourcing

- Updates source index name format and adds two fields (source's filter and transform) to the `"Nats-Stream-Source"` message header
- Backwards compatibility for streams containing previous (2.9) message headers for seamless upgrade
- Update TestJetStreamSourceBasics as it could flap sometimes

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
This commit is contained in:
Jean-Noël Moyne
2023-03-31 15:49:10 -07:00
parent d057889b13
commit 0420c6e317
5 changed files with 193 additions and 129 deletions

View File

@@ -278,6 +278,9 @@ const (
// JSSourceConsumerSetupFailedErrF General source consumer setup failure string ({err})
JSSourceConsumerSetupFailedErrF ErrorIdentifier = 10045
// JSSourceDuplicateDetected source stream, filter and transform must form a unique combination (duplicate source configuration detected)
JSSourceDuplicateDetected ErrorIdentifier = 10140
// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046
@@ -513,6 +516,7 @@ var (
JSSequenceNotFoundErrF: {Code: 400, ErrCode: 10043, Description: "sequence {seq} not found"},
JSSnapshotDeliverSubjectInvalidErr: {Code: 400, ErrCode: 10015, Description: "deliver subject not valid"},
JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"},
JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"},
JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"},
@@ -1585,6 +1589,16 @@ func NewJSSourceConsumerSetupFailedError(err error, opts ...ErrorOption) *ApiErr
}
}
// NewJSSourceDuplicateDetectedError creates a new JSSourceDuplicateDetected error: "duplicate source configuration detected"
func NewJSSourceDuplicateDetectedError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSSourceDuplicateDetected]
}
// NewJSSourceMaxMessageSizeTooBigError creates a new JSSourceMaxMessageSizeTooBigErr error: "stream source must have max message size >= target"
func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)