Detect mal-formed stream state snapshots and return appropriate error

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-07-30 11:06:06 -07:00
parent c6ea1667a7
commit 54c5414c3d

View File

@@ -63,6 +63,8 @@ var (
ErrInvalidSequence = errors.New("invalid sequence")
// ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong.
ErrSequenceMismatch = errors.New("expected sequence does not match store")
// ErrCorruptStreamState
ErrCorruptStreamState = errors.New("stream state snapshot is corrupt")
)
// StoreMsg is the stored message format for messages that are retained by the Store layer.
@@ -237,12 +239,20 @@ func DecodeStreamState(buf []byte) (*StreamReplicatedState, error) {
return num
}
parserFailed := func() bool {
return bi < 0
}
ss.Msgs = readU64()
ss.Bytes = readU64()
ss.FirstSeq = readU64()
ss.LastSeq = readU64()
ss.Failed = readU64()
if parserFailed() {
return nil, ErrCorruptStreamState
}
if numDeleted := readU64(); numDeleted > 0 {
// If we have some deleted blocks.
for l := len(buf); l > bi; {
@@ -250,7 +260,7 @@ func DecodeStreamState(buf []byte) (*StreamReplicatedState, error) {
case seqSetMagic:
dmap, n, err := avl.Decode(buf[bi:])
if err != nil {
return nil, err
return nil, ErrCorruptStreamState
}
bi += n
ss.Deleted = append(ss.Deleted, dmap)
@@ -259,7 +269,12 @@ func DecodeStreamState(buf []byte) (*StreamReplicatedState, error) {
var rl DeleteRange
rl.First = readU64()
rl.Num = readU64()
if parserFailed() {
return nil, ErrCorruptStreamState
}
ss.Deleted = append(ss.Deleted, &rl)
default:
return nil, ErrCorruptStreamState
}
}
}