1
0
mirror of https://github.com/taigrr/bitcask synced 2025-01-18 04:03:17 -08:00

custom high-performance encoder implementation (#52)

This commit is contained in:
Ignacio Hagopian
2019-08-07 20:21:46 -03:00
committed by James Mills
parent 755b1879b5
commit fd179b4a86
13 changed files with 173 additions and 225 deletions

113
internal/codec/codec.go Normal file
View File

@@ -0,0 +1,113 @@
package codec
import (
"bufio"
"encoding/binary"
"io"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal/model"
)
const (
KeySize = 4
ValueSize = 8
checksumSize = 4
)
// NewEncoder creates a streaming Entry encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// Entry encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any Entry and streams it to the underlying writer.
// Messages are framed with a key-length and value-length prefix.
func (e *Encoder) Encode(msg model.Entry) (int64, error) {
var bufKeyValue = make([]byte, ValueSize)
bufKeySize := bufKeyValue[:KeySize]
binary.BigEndian.PutUint32(bufKeySize, uint32(len(msg.Key)))
if _, err := e.w.Write(bufKeySize); err != nil {
return 0, errors.Wrap(err, "failed writing key length prefix")
}
bufValueSize := bufKeyValue[:ValueSize]
binary.BigEndian.PutUint64(bufValueSize, uint64(len(msg.Value)))
if _, err := e.w.Write(bufValueSize); err != nil {
return 0, errors.Wrap(err, "failed writing value length prefix")
}
if _, err := e.w.Write([]byte(msg.Key)); err != nil {
return 0, errors.Wrap(err, "failed writing key data")
}
if _, err := e.w.Write(msg.Value); err != nil {
return 0, errors.Wrap(err, "failed writing value data")
}
bufChecksumSize := make([]byte, checksumSize)
binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum)
if _, err := e.w.Write(bufChecksumSize); err != nil {
return 0, errors.Wrap(err, "failed writing checksum data")
}
if err := e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(KeySize + ValueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil
}
// NewDecoder creates a streaming Entry decoder.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: r}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// Entry decodings on it.
type Decoder struct {
r io.Reader
}
func (d *Decoder) Decode(v *model.Entry) (int64, error) {
prefixBuf := make([]byte, KeySize+ValueSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
actualKeySize, actualValueSize := GetKeyValueSizes(prefixBuf)
buf := make([]byte, actualKeySize+actualValueSize+checksumSize)
if _, err = io.ReadFull(d.r, buf); err != nil {
return 0, errors.Wrap(translateError(err), "failed reading saved data")
}
DecodeWithoutPrefix(buf, actualValueSize, v)
return int64(KeySize + ValueSize + actualKeySize + actualValueSize + checksumSize), nil
}
func GetKeyValueSizes(buf []byte) (uint64, uint64) {
actualKeySize := binary.BigEndian.Uint32(buf[:KeySize])
actualValueSize := binary.BigEndian.Uint64(buf[KeySize:])
return uint64(actualKeySize), actualValueSize
}
func DecodeWithoutPrefix(buf []byte, valueOffset uint64, v *model.Entry) {
v.Key = buf[:valueOffset]
v.Value = buf[valueOffset : len(buf)-checksumSize]
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:])
}
func translateError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}

View File

@@ -6,25 +6,21 @@ import (
"path/filepath"
"sync"
"github.com/oxtoacart/bpool"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
"github.com/gogo/protobuf/proto"
pb "github.com/prologic/bitcask/internal/proto"
"github.com/prologic/bitcask/internal/streampb"
"github.com/prologic/bitcask/internal/codec"
"github.com/prologic/bitcask/internal/model"
)
const (
DefaultDatafileFilename = "%09d.data"
prefixSize = 8
)
var (
ErrReadonly = errors.New("error: read only datafile")
ErrReadError = errors.New("error: read error")
memPool *bpool.BufferPool
mxMemPool sync.RWMutex
)
@@ -36,8 +32,8 @@ type Datafile struct {
ra *mmap.ReaderAt
w *os.File
offset int64
dec *streampb.Decoder
enc *streampb.Encoder
dec *codec.Decoder
enc *codec.Encoder
}
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
@@ -73,8 +69,8 @@ func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
offset := stat.Size()
dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w)
dec := codec.NewDecoder(r)
enc := codec.NewEncoder(w)
return &Datafile{
id: id,
@@ -126,7 +122,7 @@ func (df *Datafile) Size() int64 {
return df.offset
}
func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
func (df *Datafile) Read() (e model.Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
@@ -138,20 +134,10 @@ func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
return
}
func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
func (df *Datafile) ReadAt(index, size int64) (e model.Entry, err error) {
var n int
var b []byte
if memPool == nil {
b = make([]byte, size)
} else {
poolSlice := memPool.Get()
if poolSlice.Cap() < int(size) {
poolSlice.Grow(int(size) - poolSlice.Cap())
}
defer memPool.Put(poolSlice)
b = poolSlice.Bytes()[:size]
}
b := make([]byte, size)
if df.w == nil {
n, err = df.ra.ReadAt(b, index)
@@ -166,14 +152,13 @@ func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
return
}
err = proto.Unmarshal(b[prefixSize:], &e)
if err != nil {
return
}
valueOffset, _ := codec.GetKeyValueSizes(b)
codec.DecodeWithoutPrefix(b[codec.KeySize+codec.ValueSize:], valueOffset, &e)
return
}
func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
func (df *Datafile) Write(e model.Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, ErrReadonly
}
@@ -183,23 +168,11 @@ func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
e.Offset = df.offset
n, err := df.enc.Encode(&e)
n, err := df.enc.Encode(e)
if err != nil {
return -1, 0, err
}
df.offset += n
return e.Offset, n, nil
}
// ConfigureMemPool configurate the mempool accordingly
func ConfigureMemPool(maxConcurrency *int) {
mxMemPool.Lock()
defer mxMemPool.Unlock()
if maxConcurrency == nil {
memPool = nil
} else {
memPool = bpool.NewBufferPool(*maxConcurrency)
}
return
}
}

View File

@@ -1,17 +0,0 @@
package internal
import (
"hash/crc32"
pb "github.com/prologic/bitcask/internal/proto"
)
func NewEntry(key, value []byte) pb.Entry {
checksum := crc32.ChecksumIEEE(value)
return pb.Entry{
Checksum: checksum,
Key: key,
Value: value,
}
}

23
internal/model/entry.go Normal file
View File

@@ -0,0 +1,23 @@
package model
import (
"hash/crc32"
)
// Entry represents a key/value in the database
type Entry struct {
Checksum uint32
Key []byte
Offset int64
Value []byte
}
func NewEntry(key, value []byte) Entry {
checksum := crc32.ChecksumIEEE(value)
return Entry{
Checksum: checksum,
Key: key,
Value: value,
}
}

View File

@@ -1,3 +0,0 @@
package proto
//go:generate protoc --go_out=. entry.proto

View File

@@ -1,97 +0,0 @@
package streampb
import (
"bufio"
"encoding/binary"
"io"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)
const (
// prefixSize is the number of bytes we preallocate for storing
// our big endian lenth prefix buffer.
prefixSize = 8
)
// NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(prefixBuf); err != nil {
return 0, errors.Wrap(err, "failed writing length prefix")
}
n, err := e.w.Write(buf)
if err != nil {
return 0, errors.Wrap(err, "failed writing marshaled data")
}
if err = e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(n + prefixSize), nil
}
// NewDecoder creates a streaming protobuf decoder.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: r}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// proto decodings on it.
type Decoder struct {
r io.Reader
}
// Decode takes a proto.Message and unmarshals the next payload in the
// underlying io.Reader. It returns an EOF when it's done.
func (d *Decoder) Decode(v proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
n := binary.BigEndian.Uint64(prefixBuf)
buf := make([]byte, n)
idx := uint64(0)
for idx < n {
m, err := d.r.Read(buf[idx:n])
if err != nil {
return 0, errors.Wrap(translateError(err), "failed reading marshaled data")
}
idx += uint64(m)
}
return int64(idx + prefixSize), proto.Unmarshal(buf[:n], v)
}
func translateError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}