diff --git a/server/store.go b/server/store.go index 5d1aa7b7..3482abc8 100644 --- a/server/store.go +++ b/server/store.go @@ -63,6 +63,8 @@ var ( ErrInvalidSequence = errors.New("invalid sequence") // ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong. ErrSequenceMismatch = errors.New("expected sequence does not match store") + // ErrCorruptStreamState + ErrCorruptStreamState = errors.New("stream state snapshot is corrupt") ) // StoreMsg is the stored message format for messages that are retained by the Store layer. @@ -237,12 +239,20 @@ func DecodeStreamState(buf []byte) (*StreamReplicatedState, error) { return num } + parserFailed := func() bool { + return bi < 0 + } + ss.Msgs = readU64() ss.Bytes = readU64() ss.FirstSeq = readU64() ss.LastSeq = readU64() ss.Failed = readU64() + if parserFailed() { + return nil, ErrCorruptStreamState + } + if numDeleted := readU64(); numDeleted > 0 { // If we have some deleted blocks. for l := len(buf); l > bi; { @@ -250,7 +260,7 @@ func DecodeStreamState(buf []byte) (*StreamReplicatedState, error) { case seqSetMagic: dmap, n, err := avl.Decode(buf[bi:]) if err != nil { - return nil, err + return nil, ErrCorruptStreamState } bi += n ss.Deleted = append(ss.Deleted, dmap) @@ -259,7 +269,12 @@ func DecodeStreamState(buf []byte) (*StreamReplicatedState, error) { var rl DeleteRange rl.First = readU64() rl.Num = readU64() + if parserFailed() { + return nil, ErrCorruptStreamState + } ss.Deleted = append(ss.Deleted, &rl) + default: + return nil, ErrCorruptStreamState } } }