mirror of
https://github.com/taigrr/bitcask
synced 2025-01-18 04:03:17 -08:00
Test for data corruption in datafile decoding (#99)
* internal/data: move codec to own subpackage * internal/data/codec: check & test nil Entry Decode * internal/data/decoder: test for short prefix error * internal/data/codec: test invalid key & value sizes * internal/data/codec: check & test for truncated data * interna/data/codec: use assert for tests
This commit is contained in:
parent
5be114adab
commit
f17187a5c7
@ -1,7 +1,6 @@
|
|||||||
package data
|
package codec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
@ -9,59 +8,12 @@ import (
|
|||||||
"github.com/prologic/bitcask/internal"
|
"github.com/prologic/bitcask/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
keySize = 4
|
|
||||||
valueSize = 8
|
|
||||||
checksumSize = 4
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrInvalidKeyOrValueSize indicates a serialized key/value size
|
errInvalidKeyOrValueSize = errors.New("key/value size is invalid")
|
||||||
// which is greater than specified limit
|
errCantDecodeOnNilEntry = errors.New("can't decode on nil entry")
|
||||||
ErrInvalidKeyOrValueSize = errors.New("key/value size is invalid")
|
errTruncatedData = errors.New("data is truncated")
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewEncoder creates a streaming Entry encoder.
|
|
||||||
func NewEncoder(w io.Writer) *Encoder {
|
|
||||||
return &Encoder{w: bufio.NewWriter(w)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encoder wraps an underlying io.Writer and allows you to stream
|
|
||||||
// Entry encodings on it.
|
|
||||||
type Encoder struct {
|
|
||||||
w *bufio.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode takes any Entry and streams it to the underlying writer.
|
|
||||||
// Messages are framed with a key-length and value-length prefix.
|
|
||||||
func (e *Encoder) Encode(msg internal.Entry) (int64, error) {
|
|
||||||
var bufKeyValue = make([]byte, keySize+valueSize)
|
|
||||||
binary.BigEndian.PutUint32(bufKeyValue[:keySize], uint32(len(msg.Key)))
|
|
||||||
binary.BigEndian.PutUint64(bufKeyValue[keySize:keySize+valueSize], uint64(len(msg.Value)))
|
|
||||||
if _, err := e.w.Write(bufKeyValue); err != nil {
|
|
||||||
return 0, errors.Wrap(err, "failed writing key & value length prefix")
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := e.w.Write(msg.Key); err != nil {
|
|
||||||
return 0, errors.Wrap(err, "failed writing key data")
|
|
||||||
}
|
|
||||||
if _, err := e.w.Write(msg.Value); err != nil {
|
|
||||||
return 0, errors.Wrap(err, "failed writing value data")
|
|
||||||
}
|
|
||||||
|
|
||||||
bufChecksumSize := bufKeyValue[:checksumSize]
|
|
||||||
binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum)
|
|
||||||
if _, err := e.w.Write(bufChecksumSize); err != nil {
|
|
||||||
return 0, errors.Wrap(err, "failed writing checksum data")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := e.w.Flush(); err != nil {
|
|
||||||
return 0, errors.Wrap(err, "failed flushing data")
|
|
||||||
}
|
|
||||||
|
|
||||||
return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDecoder creates a streaming Entry decoder.
|
// NewDecoder creates a streaming Entry decoder.
|
||||||
func NewDecoder(r io.Reader, maxKeySize uint32, maxValueSize uint64) *Decoder {
|
func NewDecoder(r io.Reader, maxKeySize uint32, maxValueSize uint64) *Decoder {
|
||||||
return &Decoder{
|
return &Decoder{
|
||||||
@ -81,6 +33,10 @@ type Decoder struct {
|
|||||||
|
|
||||||
// Decode decodes the next Entry from the current stream
|
// Decode decodes the next Entry from the current stream
|
||||||
func (d *Decoder) Decode(v *internal.Entry) (int64, error) {
|
func (d *Decoder) Decode(v *internal.Entry) (int64, error) {
|
||||||
|
if v == nil {
|
||||||
|
return 0, errCantDecodeOnNilEntry
|
||||||
|
}
|
||||||
|
|
||||||
prefixBuf := make([]byte, keySize+valueSize)
|
prefixBuf := make([]byte, keySize+valueSize)
|
||||||
|
|
||||||
_, err := io.ReadFull(d.r, prefixBuf)
|
_, err := io.ReadFull(d.r, prefixBuf)
|
||||||
@ -90,12 +46,12 @@ func (d *Decoder) Decode(v *internal.Entry) (int64, error) {
|
|||||||
|
|
||||||
actualKeySize, actualValueSize, err := getKeyValueSizes(prefixBuf, d.maxKeySize, d.maxValueSize)
|
actualKeySize, actualValueSize, err := getKeyValueSizes(prefixBuf, d.maxKeySize, d.maxValueSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.Wrap(err, "error while getting key/value serialized sizes")
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, uint64(actualKeySize)+actualValueSize+checksumSize)
|
buf := make([]byte, uint64(actualKeySize)+actualValueSize+checksumSize)
|
||||||
if _, err = io.ReadFull(d.r, buf); err != nil {
|
if _, err = io.ReadFull(d.r, buf); err != nil {
|
||||||
return 0, errors.Wrap(translateError(err), "failed reading saved data")
|
return 0, errTruncatedData
|
||||||
}
|
}
|
||||||
|
|
||||||
decodeWithoutPrefix(buf, actualKeySize, v)
|
decodeWithoutPrefix(buf, actualKeySize, v)
|
||||||
@ -118,8 +74,9 @@ func getKeyValueSizes(buf []byte, maxKeySize uint32, maxValueSize uint64) (uint3
|
|||||||
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
|
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
|
||||||
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
|
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
|
||||||
|
|
||||||
if actualKeySize > maxKeySize || actualValueSize > maxValueSize {
|
if actualKeySize > maxKeySize || actualValueSize > maxValueSize || actualKeySize == 0 {
|
||||||
return 0, 0, ErrInvalidKeyOrValueSize
|
|
||||||
|
return 0, 0, errInvalidKeyOrValueSize
|
||||||
}
|
}
|
||||||
|
|
||||||
return actualKeySize, actualValueSize, nil
|
return actualKeySize, actualValueSize, nil
|
||||||
@ -131,9 +88,12 @@ func decodeWithoutPrefix(buf []byte, valueOffset uint32, v *internal.Entry) {
|
|||||||
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:])
|
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func translateError(err error) error {
|
// IsCorruptedData indicates if the error correspondes to possible data corruption
|
||||||
if err == io.EOF {
|
func IsCorruptedData(err error) bool {
|
||||||
return io.ErrUnexpectedEOF
|
switch err {
|
||||||
|
case errCantDecodeOnNilEntry, errInvalidKeyOrValueSize, errTruncatedData:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
}
|
}
|
109
internal/data/codec/decoder_test.go
Normal file
109
internal/data/codec/decoder_test.go
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
package codec
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prologic/bitcask/internal"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDecodeOnNilEntry(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
assert := assert.New(t)
|
||||||
|
decoder := NewDecoder(&bytes.Buffer{}, 1, 1)
|
||||||
|
|
||||||
|
_, err := decoder.Decode(nil)
|
||||||
|
if assert.Error(err) {
|
||||||
|
assert.Equal(errCantDecodeOnNilEntry, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestShortPrefix(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
assert := assert.New(t)
|
||||||
|
maxKeySize, maxValueSize := uint32(10), uint64(20)
|
||||||
|
prefix := make([]byte, keySize+valueSize)
|
||||||
|
binary.BigEndian.PutUint32(prefix, 1)
|
||||||
|
binary.BigEndian.PutUint64(prefix[keySize:], 1)
|
||||||
|
|
||||||
|
truncBytesCount := 2
|
||||||
|
buf := bytes.NewBuffer(prefix[:keySize+valueSize-truncBytesCount])
|
||||||
|
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
|
||||||
|
_, err := decoder.Decode(&internal.Entry{})
|
||||||
|
if assert.Error(err) {
|
||||||
|
assert.Equal(io.ErrUnexpectedEOF, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInvalidValueKeySizes(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
maxKeySize, maxValueSize := uint32(10), uint64(20)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
keySize uint32
|
||||||
|
valueSize uint64
|
||||||
|
name string
|
||||||
|
}{
|
||||||
|
{keySize: 0, valueSize: 5, name: "zero key size"}, //zero value size is correct for tombstones
|
||||||
|
{keySize: 11, valueSize: 5, name: "key size overflow"},
|
||||||
|
{keySize: 5, valueSize: 21, name: "value size overflow"},
|
||||||
|
{keySize: 11, valueSize: 21, name: "key and value size overflow"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range tests {
|
||||||
|
i := i
|
||||||
|
t.Run(tests[i].name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
prefix := make([]byte, keySize+valueSize)
|
||||||
|
binary.BigEndian.PutUint32(prefix, tests[i].keySize)
|
||||||
|
binary.BigEndian.PutUint64(prefix[keySize:], tests[i].valueSize)
|
||||||
|
|
||||||
|
buf := bytes.NewBuffer(prefix)
|
||||||
|
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
|
||||||
|
_, err := decoder.Decode(&internal.Entry{})
|
||||||
|
if assert.Error(err) {
|
||||||
|
assert.Equal(errInvalidKeyOrValueSize, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTruncatedData(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
maxKeySize, maxValueSize := uint32(10), uint64(20)
|
||||||
|
|
||||||
|
key := []byte("foo")
|
||||||
|
value := []byte("bar")
|
||||||
|
data := make([]byte, keySize+valueSize+len(key)+len(value)+checksumSize)
|
||||||
|
|
||||||
|
binary.BigEndian.PutUint32(data, uint32(len(key)))
|
||||||
|
binary.BigEndian.PutUint64(data[keySize:], uint64(len(value)))
|
||||||
|
copy(data[keySize+valueSize:], key)
|
||||||
|
copy(data[keySize+valueSize+len(key):], value)
|
||||||
|
copy(data[keySize+valueSize+len(key)+len(value):], bytes.Repeat([]byte("0"), checksumSize))
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
data []byte
|
||||||
|
name string
|
||||||
|
}{
|
||||||
|
{data: data[:keySize+valueSize+len(key)-1], name: "truncated key"},
|
||||||
|
{data: data[:keySize+valueSize+len(key)+len(value)-1], name: "truncated value"},
|
||||||
|
{data: data[:keySize+valueSize+len(key)+len(value)+checksumSize-1], name: "truncated checksum"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range tests {
|
||||||
|
i := i
|
||||||
|
t.Run(tests[i].name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
buf := bytes.NewBuffer(tests[i].data)
|
||||||
|
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
|
||||||
|
_, err := decoder.Decode(&internal.Entry{})
|
||||||
|
if assert.Error(err) {
|
||||||
|
assert.Equal(errTruncatedData, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
57
internal/data/codec/encoder.go
Normal file
57
internal/data/codec/encoder.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package codec
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prologic/bitcask/internal"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
keySize = 4
|
||||||
|
valueSize = 8
|
||||||
|
checksumSize = 4
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewEncoder creates a streaming Entry encoder.
|
||||||
|
func NewEncoder(w io.Writer) *Encoder {
|
||||||
|
return &Encoder{w: bufio.NewWriter(w)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encoder wraps an underlying io.Writer and allows you to stream
|
||||||
|
// Entry encodings on it.
|
||||||
|
type Encoder struct {
|
||||||
|
w *bufio.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode takes any Entry and streams it to the underlying writer.
|
||||||
|
// Messages are framed with a key-length and value-length prefix.
|
||||||
|
func (e *Encoder) Encode(msg internal.Entry) (int64, error) {
|
||||||
|
var bufKeyValue = make([]byte, keySize+valueSize)
|
||||||
|
binary.BigEndian.PutUint32(bufKeyValue[:keySize], uint32(len(msg.Key)))
|
||||||
|
binary.BigEndian.PutUint64(bufKeyValue[keySize:keySize+valueSize], uint64(len(msg.Value)))
|
||||||
|
if _, err := e.w.Write(bufKeyValue); err != nil {
|
||||||
|
return 0, errors.Wrap(err, "failed writing key & value length prefix")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := e.w.Write(msg.Key); err != nil {
|
||||||
|
return 0, errors.Wrap(err, "failed writing key data")
|
||||||
|
}
|
||||||
|
if _, err := e.w.Write(msg.Value); err != nil {
|
||||||
|
return 0, errors.Wrap(err, "failed writing value data")
|
||||||
|
}
|
||||||
|
|
||||||
|
bufChecksumSize := bufKeyValue[:checksumSize]
|
||||||
|
binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum)
|
||||||
|
if _, err := e.w.Write(bufChecksumSize); err != nil {
|
||||||
|
return 0, errors.Wrap(err, "failed writing checksum data")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.w.Flush(); err != nil {
|
||||||
|
return 0, errors.Wrap(err, "failed flushing data")
|
||||||
|
}
|
||||||
|
|
||||||
|
return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil
|
||||||
|
}
|
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prologic/bitcask/internal"
|
"github.com/prologic/bitcask/internal"
|
||||||
|
"github.com/prologic/bitcask/internal/data/codec"
|
||||||
"golang.org/x/exp/mmap"
|
"golang.org/x/exp/mmap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -41,8 +42,8 @@ type datafile struct {
|
|||||||
ra *mmap.ReaderAt
|
ra *mmap.ReaderAt
|
||||||
w *os.File
|
w *os.File
|
||||||
offset int64
|
offset int64
|
||||||
dec *Decoder
|
dec *codec.Decoder
|
||||||
enc *Encoder
|
enc *codec.Encoder
|
||||||
maxKeySize uint32
|
maxKeySize uint32
|
||||||
maxValueSize uint64
|
maxValueSize uint64
|
||||||
}
|
}
|
||||||
@ -81,8 +82,8 @@ func NewDatafile(path string, id int, readonly bool, maxKeySize uint32, maxValue
|
|||||||
|
|
||||||
offset := stat.Size()
|
offset := stat.Size()
|
||||||
|
|
||||||
dec := NewDecoder(r, maxKeySize, maxValueSize)
|
dec := codec.NewDecoder(r, maxKeySize, maxValueSize)
|
||||||
enc := NewEncoder(w)
|
enc := codec.NewEncoder(w)
|
||||||
|
|
||||||
return &datafile{
|
return &datafile{
|
||||||
id: id,
|
id: id,
|
||||||
@ -168,7 +169,7 @@ func (df *datafile) ReadAt(index, size int64) (e internal.Entry, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
DecodeEntry(b, &e, df.maxKeySize, df.maxValueSize)
|
codec.DecodeEntry(b, &e, df.maxKeySize, df.maxValueSize)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user