From f17187a5c73b4b712bf5a86bffa62df551ecaa67 Mon Sep 17 00:00:00 2001 From: Ignacio Hagopian Date: Mon, 16 Sep 2019 09:29:08 -0300 Subject: [PATCH] Test for data corruption in datafile decoding (#99) * internal/data: move codec to own subpackage * internal/data/codec: check & test nil Entry Decode * internal/data/decoder: test for short prefix error * internal/data/codec: test invalid key & value sizes * internal/data/codec: check & test for truncated data * interna/data/codec: use assert for tests --- internal/data/{codec.go => codec/decoder.go} | 80 ++++---------- internal/data/codec/decoder_test.go | 109 +++++++++++++++++++ internal/data/codec/encoder.go | 57 ++++++++++ internal/data/datafile.go | 11 +- 4 files changed, 192 insertions(+), 65 deletions(-) rename internal/data/{codec.go => codec/decoder.go} (51%) create mode 100644 internal/data/codec/decoder_test.go create mode 100644 internal/data/codec/encoder.go diff --git a/internal/data/codec.go b/internal/data/codec/decoder.go similarity index 51% rename from internal/data/codec.go rename to internal/data/codec/decoder.go index 52d969e..fe6bb59 100644 --- a/internal/data/codec.go +++ b/internal/data/codec/decoder.go @@ -1,7 +1,6 @@ -package data +package codec import ( - "bufio" "encoding/binary" "io" @@ -9,59 +8,12 @@ import ( "github.com/prologic/bitcask/internal" ) -const ( - keySize = 4 - valueSize = 8 - checksumSize = 4 -) - var ( - // ErrInvalidKeyOrValueSize indicates a serialized key/value size - // which is greater than specified limit - ErrInvalidKeyOrValueSize = errors.New("key/value size is invalid") + errInvalidKeyOrValueSize = errors.New("key/value size is invalid") + errCantDecodeOnNilEntry = errors.New("can't decode on nil entry") + errTruncatedData = errors.New("data is truncated") ) -// NewEncoder creates a streaming Entry encoder. -func NewEncoder(w io.Writer) *Encoder { - return &Encoder{w: bufio.NewWriter(w)} -} - -// Encoder wraps an underlying io.Writer and allows you to stream -// Entry encodings on it. -type Encoder struct { - w *bufio.Writer -} - -// Encode takes any Entry and streams it to the underlying writer. -// Messages are framed with a key-length and value-length prefix. -func (e *Encoder) Encode(msg internal.Entry) (int64, error) { - var bufKeyValue = make([]byte, keySize+valueSize) - binary.BigEndian.PutUint32(bufKeyValue[:keySize], uint32(len(msg.Key))) - binary.BigEndian.PutUint64(bufKeyValue[keySize:keySize+valueSize], uint64(len(msg.Value))) - if _, err := e.w.Write(bufKeyValue); err != nil { - return 0, errors.Wrap(err, "failed writing key & value length prefix") - } - - if _, err := e.w.Write(msg.Key); err != nil { - return 0, errors.Wrap(err, "failed writing key data") - } - if _, err := e.w.Write(msg.Value); err != nil { - return 0, errors.Wrap(err, "failed writing value data") - } - - bufChecksumSize := bufKeyValue[:checksumSize] - binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum) - if _, err := e.w.Write(bufChecksumSize); err != nil { - return 0, errors.Wrap(err, "failed writing checksum data") - } - - if err := e.w.Flush(); err != nil { - return 0, errors.Wrap(err, "failed flushing data") - } - - return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil -} - // NewDecoder creates a streaming Entry decoder. func NewDecoder(r io.Reader, maxKeySize uint32, maxValueSize uint64) *Decoder { return &Decoder{ @@ -81,6 +33,10 @@ type Decoder struct { // Decode decodes the next Entry from the current stream func (d *Decoder) Decode(v *internal.Entry) (int64, error) { + if v == nil { + return 0, errCantDecodeOnNilEntry + } + prefixBuf := make([]byte, keySize+valueSize) _, err := io.ReadFull(d.r, prefixBuf) @@ -90,12 +46,12 @@ func (d *Decoder) Decode(v *internal.Entry) (int64, error) { actualKeySize, actualValueSize, err := getKeyValueSizes(prefixBuf, d.maxKeySize, d.maxValueSize) if err != nil { - return 0, errors.Wrap(err, "error while getting key/value serialized sizes") + return 0, err } buf := make([]byte, uint64(actualKeySize)+actualValueSize+checksumSize) if _, err = io.ReadFull(d.r, buf); err != nil { - return 0, errors.Wrap(translateError(err), "failed reading saved data") + return 0, errTruncatedData } decodeWithoutPrefix(buf, actualKeySize, v) @@ -118,8 +74,9 @@ func getKeyValueSizes(buf []byte, maxKeySize uint32, maxValueSize uint64) (uint3 actualKeySize := binary.BigEndian.Uint32(buf[:keySize]) actualValueSize := binary.BigEndian.Uint64(buf[keySize:]) - if actualKeySize > maxKeySize || actualValueSize > maxValueSize { - return 0, 0, ErrInvalidKeyOrValueSize + if actualKeySize > maxKeySize || actualValueSize > maxValueSize || actualKeySize == 0 { + + return 0, 0, errInvalidKeyOrValueSize } return actualKeySize, actualValueSize, nil @@ -131,9 +88,12 @@ func decodeWithoutPrefix(buf []byte, valueOffset uint32, v *internal.Entry) { v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:]) } -func translateError(err error) error { - if err == io.EOF { - return io.ErrUnexpectedEOF +// IsCorruptedData indicates if the error correspondes to possible data corruption +func IsCorruptedData(err error) bool { + switch err { + case errCantDecodeOnNilEntry, errInvalidKeyOrValueSize, errTruncatedData: + return true + default: + return false } - return err } diff --git a/internal/data/codec/decoder_test.go b/internal/data/codec/decoder_test.go new file mode 100644 index 0000000..203d07b --- /dev/null +++ b/internal/data/codec/decoder_test.go @@ -0,0 +1,109 @@ +package codec + +import ( + "bytes" + "encoding/binary" + "io" + "testing" + + "github.com/prologic/bitcask/internal" + "github.com/stretchr/testify/assert" +) + +func TestDecodeOnNilEntry(t *testing.T) { + t.Parallel() + assert := assert.New(t) + decoder := NewDecoder(&bytes.Buffer{}, 1, 1) + + _, err := decoder.Decode(nil) + if assert.Error(err) { + assert.Equal(errCantDecodeOnNilEntry, err) + } +} + +func TestShortPrefix(t *testing.T) { + t.Parallel() + assert := assert.New(t) + maxKeySize, maxValueSize := uint32(10), uint64(20) + prefix := make([]byte, keySize+valueSize) + binary.BigEndian.PutUint32(prefix, 1) + binary.BigEndian.PutUint64(prefix[keySize:], 1) + + truncBytesCount := 2 + buf := bytes.NewBuffer(prefix[:keySize+valueSize-truncBytesCount]) + decoder := NewDecoder(buf, maxKeySize, maxValueSize) + _, err := decoder.Decode(&internal.Entry{}) + if assert.Error(err) { + assert.Equal(io.ErrUnexpectedEOF, err) + } +} + +func TestInvalidValueKeySizes(t *testing.T) { + assert := assert.New(t) + maxKeySize, maxValueSize := uint32(10), uint64(20) + + tests := []struct { + keySize uint32 + valueSize uint64 + name string + }{ + {keySize: 0, valueSize: 5, name: "zero key size"}, //zero value size is correct for tombstones + {keySize: 11, valueSize: 5, name: "key size overflow"}, + {keySize: 5, valueSize: 21, name: "value size overflow"}, + {keySize: 11, valueSize: 21, name: "key and value size overflow"}, + } + + for i := range tests { + i := i + t.Run(tests[i].name, func(t *testing.T) { + t.Parallel() + prefix := make([]byte, keySize+valueSize) + binary.BigEndian.PutUint32(prefix, tests[i].keySize) + binary.BigEndian.PutUint64(prefix[keySize:], tests[i].valueSize) + + buf := bytes.NewBuffer(prefix) + decoder := NewDecoder(buf, maxKeySize, maxValueSize) + _, err := decoder.Decode(&internal.Entry{}) + if assert.Error(err) { + assert.Equal(errInvalidKeyOrValueSize, err) + } + }) + } +} + +func TestTruncatedData(t *testing.T) { + assert := assert.New(t) + maxKeySize, maxValueSize := uint32(10), uint64(20) + + key := []byte("foo") + value := []byte("bar") + data := make([]byte, keySize+valueSize+len(key)+len(value)+checksumSize) + + binary.BigEndian.PutUint32(data, uint32(len(key))) + binary.BigEndian.PutUint64(data[keySize:], uint64(len(value))) + copy(data[keySize+valueSize:], key) + copy(data[keySize+valueSize+len(key):], value) + copy(data[keySize+valueSize+len(key)+len(value):], bytes.Repeat([]byte("0"), checksumSize)) + + tests := []struct { + data []byte + name string + }{ + {data: data[:keySize+valueSize+len(key)-1], name: "truncated key"}, + {data: data[:keySize+valueSize+len(key)+len(value)-1], name: "truncated value"}, + {data: data[:keySize+valueSize+len(key)+len(value)+checksumSize-1], name: "truncated checksum"}, + } + + for i := range tests { + i := i + t.Run(tests[i].name, func(t *testing.T) { + t.Parallel() + buf := bytes.NewBuffer(tests[i].data) + decoder := NewDecoder(buf, maxKeySize, maxValueSize) + _, err := decoder.Decode(&internal.Entry{}) + if assert.Error(err) { + assert.Equal(errTruncatedData, err) + } + }) + } +} diff --git a/internal/data/codec/encoder.go b/internal/data/codec/encoder.go new file mode 100644 index 0000000..c2ce784 --- /dev/null +++ b/internal/data/codec/encoder.go @@ -0,0 +1,57 @@ +package codec + +import ( + "bufio" + "encoding/binary" + "io" + + "github.com/pkg/errors" + "github.com/prologic/bitcask/internal" +) + +const ( + keySize = 4 + valueSize = 8 + checksumSize = 4 +) + +// NewEncoder creates a streaming Entry encoder. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w: bufio.NewWriter(w)} +} + +// Encoder wraps an underlying io.Writer and allows you to stream +// Entry encodings on it. +type Encoder struct { + w *bufio.Writer +} + +// Encode takes any Entry and streams it to the underlying writer. +// Messages are framed with a key-length and value-length prefix. +func (e *Encoder) Encode(msg internal.Entry) (int64, error) { + var bufKeyValue = make([]byte, keySize+valueSize) + binary.BigEndian.PutUint32(bufKeyValue[:keySize], uint32(len(msg.Key))) + binary.BigEndian.PutUint64(bufKeyValue[keySize:keySize+valueSize], uint64(len(msg.Value))) + if _, err := e.w.Write(bufKeyValue); err != nil { + return 0, errors.Wrap(err, "failed writing key & value length prefix") + } + + if _, err := e.w.Write(msg.Key); err != nil { + return 0, errors.Wrap(err, "failed writing key data") + } + if _, err := e.w.Write(msg.Value); err != nil { + return 0, errors.Wrap(err, "failed writing value data") + } + + bufChecksumSize := bufKeyValue[:checksumSize] + binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum) + if _, err := e.w.Write(bufChecksumSize); err != nil { + return 0, errors.Wrap(err, "failed writing checksum data") + } + + if err := e.w.Flush(); err != nil { + return 0, errors.Wrap(err, "failed flushing data") + } + + return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil +} diff --git a/internal/data/datafile.go b/internal/data/datafile.go index 1ccc038..50a7acb 100644 --- a/internal/data/datafile.go +++ b/internal/data/datafile.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/prologic/bitcask/internal" + "github.com/prologic/bitcask/internal/data/codec" "golang.org/x/exp/mmap" ) @@ -41,8 +42,8 @@ type datafile struct { ra *mmap.ReaderAt w *os.File offset int64 - dec *Decoder - enc *Encoder + dec *codec.Decoder + enc *codec.Encoder maxKeySize uint32 maxValueSize uint64 } @@ -81,8 +82,8 @@ func NewDatafile(path string, id int, readonly bool, maxKeySize uint32, maxValue offset := stat.Size() - dec := NewDecoder(r, maxKeySize, maxValueSize) - enc := NewEncoder(w) + dec := codec.NewDecoder(r, maxKeySize, maxValueSize) + enc := codec.NewEncoder(w) return &datafile{ id: id, @@ -168,7 +169,7 @@ func (df *datafile) ReadAt(index, size int64) (e internal.Entry, err error) { return } - DecodeEntry(b, &e, df.maxKeySize, df.maxValueSize) + codec.DecodeEntry(b, &e, df.maxKeySize, df.maxValueSize) return }