diff --git a/bitcask.go b/bitcask.go index 0612f17..edbe3a4 100644 --- a/bitcask.go +++ b/bitcask.go @@ -15,6 +15,7 @@ import ( "github.com/gofrs/flock" "github.com/prologic/bitcask/internal" + "github.com/prologic/bitcask/internal/model" ) var ( @@ -36,10 +37,6 @@ var ( // ErrDatabaseLocked is the error returned if the database is locked // (typically opened by another process) ErrDatabaseLocked = errors.New("error: database locked") - - // ErrCreatingMemPool is the error returned when trying to configurate - // the mempool fails - ErrCreatingMemPool = errors.New("error: creating the mempool failed") ) // Bitcask is a struct that represents a on-disk LSM and WAL data structure @@ -242,7 +239,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) { b.curr = curr } - e := internal.NewEntry(key, value) + e := model.NewEntry(key, value) return b.curr.Write(e) } @@ -457,8 +454,6 @@ func Open(path string, options ...Option) (*Bitcask, error) { } } - internal.ConfigureMemPool(bitcask.config.maxConcurrency) - locked, err := bitcask.Flock.TryLock() if err != nil { return nil, err @@ -491,3 +486,4 @@ func Merge(path string, force bool) error { return db.Merge() } + diff --git a/bitcask_test.go b/bitcask_test.go index 5bb5c2a..df79488 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -495,9 +495,8 @@ func TestLocking(t *testing.T) { } type benchmarkTestCase struct { - name string - size int - withPool bool + name string + size int } func BenchmarkGet(b *testing.B) { @@ -513,24 +512,15 @@ func BenchmarkGet(b *testing.B) { defer os.RemoveAll(testdir) tests := []benchmarkTestCase{ - {"128B", 128, false}, - {"128BWithPool", 128, true}, - {"256B", 256, false}, - {"256BWithPool", 256, true}, - {"512B", 512, false}, - {"512BWithPool", 512, true}, - {"1K", 1024, false}, - {"1KWithPool", 1024, true}, - {"2K", 2048, false}, - {"2KWithPool", 2048, true}, - {"4K", 4096, false}, - {"4KWithPool", 4096, true}, - {"8K", 8192, false}, - {"8KWithPool", 8192, true}, - {"16K", 16384, false}, - {"16KWithPool", 16384, true}, - {"32K", 32768, false}, - {"32KWithPool", 32768, true}, + {"128B", 128}, + {"256B", 256}, + {"512B", 512}, + {"1K", 1024}, + {"2K", 2048}, + {"4K", 4096}, + {"8K", 8192}, + {"16K", 16384}, + {"32K", 32768}, } for _, tt := range tests { @@ -544,9 +534,6 @@ func BenchmarkGet(b *testing.B) { WithMaxKeySize(len(key)), WithMaxValueSize(tt.size), } - if tt.withPool { - options = append(options, WithMemPool(1)) - } db, err := Open(testdir, options...) if err != nil { b.Fatal(err) @@ -592,15 +579,14 @@ func BenchmarkPut(b *testing.B) { defer db.Close() tests := []benchmarkTestCase{ - {"128B", 128, false}, - {"256B", 256, false}, - {"512B", 512, false}, - {"1K", 1024, false}, - {"2K", 2048, false}, - {"4K", 4096, false}, - {"8K", 8192, false}, - {"16K", 16384, false}, - {"32K", 32768, false}, + {"128B", 128}, + {"256B", 256}, + {"1K", 1024}, + {"2K", 2048}, + {"4K", 4096}, + {"8K", 8192}, + {"16K", 16384}, + {"32K", 32768}, } for _, tt := range tests { diff --git a/doc.go b/doc.go index b401b0c..09f36a4 100644 --- a/doc.go +++ b/doc.go @@ -1,13 +1,3 @@ // Package bitcask implements a high-performance key-value store based on a // WAL and LSM. -// -// By default, the client assumes a default configuration regarding maximum key size, -// maximum value size, maximum datafile size, and memory pools to avoid allocations. -// Refer to Constants section to know default values. -// -// For extra performance, configure the memory pool option properly. This option -// requires to specify the maximum number of concurrent use of the package. Failing to -// set a high-enough value would impact latency and throughput. Likewise, overestimating -// would yield in an unnecessary big memory footprint. -// The default configuration doesn't use a memory pool. package bitcask diff --git a/doc_test.go b/doc_test.go index 1a9d921..8cc50e5 100644 --- a/doc_test.go +++ b/doc_test.go @@ -8,7 +8,6 @@ func Example_withOptions() { opts := []Option{ WithMaxKeySize(1024), WithMaxValueSize(4096), - WithMemPool(10), } _, _ = Open("path/to/db", opts...) } diff --git a/go.mod b/go.mod index 0b8bb0a..cb5c52e 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ require ( github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d github.com/gofrs/flock v0.7.1 github.com/gogo/protobuf v1.2.1 - github.com/golang/protobuf v1.3.2 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/magiconair/properties v1.8.1 // indirect github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c diff --git a/go.sum b/go.sum index 1bcd00a..fee9543 100644 --- a/go.sum +++ b/go.sum @@ -43,8 +43,6 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= diff --git a/internal/codec/codec.go b/internal/codec/codec.go new file mode 100644 index 0000000..0aa4ed3 --- /dev/null +++ b/internal/codec/codec.go @@ -0,0 +1,113 @@ +package codec + +import ( + "bufio" + "encoding/binary" + "io" + + "github.com/pkg/errors" + "github.com/prologic/bitcask/internal/model" +) + +const ( + KeySize = 4 + ValueSize = 8 + checksumSize = 4 +) + +// NewEncoder creates a streaming Entry encoder. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w: bufio.NewWriter(w)} +} + +// Encoder wraps an underlying io.Writer and allows you to stream +// Entry encodings on it. +type Encoder struct { + w *bufio.Writer +} + +// Encode takes any Entry and streams it to the underlying writer. +// Messages are framed with a key-length and value-length prefix. +func (e *Encoder) Encode(msg model.Entry) (int64, error) { + var bufKeyValue = make([]byte, ValueSize) + + bufKeySize := bufKeyValue[:KeySize] + binary.BigEndian.PutUint32(bufKeySize, uint32(len(msg.Key))) + if _, err := e.w.Write(bufKeySize); err != nil { + return 0, errors.Wrap(err, "failed writing key length prefix") + } + + bufValueSize := bufKeyValue[:ValueSize] + binary.BigEndian.PutUint64(bufValueSize, uint64(len(msg.Value))) + if _, err := e.w.Write(bufValueSize); err != nil { + return 0, errors.Wrap(err, "failed writing value length prefix") + } + + if _, err := e.w.Write([]byte(msg.Key)); err != nil { + return 0, errors.Wrap(err, "failed writing key data") + } + if _, err := e.w.Write(msg.Value); err != nil { + return 0, errors.Wrap(err, "failed writing value data") + } + + bufChecksumSize := make([]byte, checksumSize) + binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum) + if _, err := e.w.Write(bufChecksumSize); err != nil { + return 0, errors.Wrap(err, "failed writing checksum data") + } + + if err := e.w.Flush(); err != nil { + return 0, errors.Wrap(err, "failed flushing data") + } + + return int64(KeySize + ValueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil +} + +// NewDecoder creates a streaming Entry decoder. +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r: r} +} + +// Decoder wraps an underlying io.Reader and allows you to stream +// Entry decodings on it. +type Decoder struct { + r io.Reader +} + +func (d *Decoder) Decode(v *model.Entry) (int64, error) { + prefixBuf := make([]byte, KeySize+ValueSize) + + _, err := io.ReadFull(d.r, prefixBuf) + if err != nil { + return 0, err + } + + actualKeySize, actualValueSize := GetKeyValueSizes(prefixBuf) + buf := make([]byte, actualKeySize+actualValueSize+checksumSize) + if _, err = io.ReadFull(d.r, buf); err != nil { + return 0, errors.Wrap(translateError(err), "failed reading saved data") + } + + DecodeWithoutPrefix(buf, actualValueSize, v) + return int64(KeySize + ValueSize + actualKeySize + actualValueSize + checksumSize), nil +} + +func GetKeyValueSizes(buf []byte) (uint64, uint64) { + actualKeySize := binary.BigEndian.Uint32(buf[:KeySize]) + actualValueSize := binary.BigEndian.Uint64(buf[KeySize:]) + + return uint64(actualKeySize), actualValueSize +} + +func DecodeWithoutPrefix(buf []byte, valueOffset uint64, v *model.Entry) { + v.Key = buf[:valueOffset] + v.Value = buf[valueOffset : len(buf)-checksumSize] + v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:]) +} + +func translateError(err error) error { + if err == io.EOF { + return io.ErrUnexpectedEOF + } + return err +} diff --git a/internal/datafile.go b/internal/datafile.go index cca0edf..b4735e1 100644 --- a/internal/datafile.go +++ b/internal/datafile.go @@ -6,25 +6,21 @@ import ( "path/filepath" "sync" - "github.com/oxtoacart/bpool" "github.com/pkg/errors" "golang.org/x/exp/mmap" - "github.com/gogo/protobuf/proto" - pb "github.com/prologic/bitcask/internal/proto" - "github.com/prologic/bitcask/internal/streampb" + "github.com/prologic/bitcask/internal/codec" + "github.com/prologic/bitcask/internal/model" ) const ( DefaultDatafileFilename = "%09d.data" - prefixSize = 8 ) var ( ErrReadonly = errors.New("error: read only datafile") ErrReadError = errors.New("error: read error") - memPool *bpool.BufferPool mxMemPool sync.RWMutex ) @@ -36,8 +32,8 @@ type Datafile struct { ra *mmap.ReaderAt w *os.File offset int64 - dec *streampb.Decoder - enc *streampb.Encoder + dec *codec.Decoder + enc *codec.Encoder } func NewDatafile(path string, id int, readonly bool) (*Datafile, error) { @@ -73,8 +69,8 @@ func NewDatafile(path string, id int, readonly bool) (*Datafile, error) { offset := stat.Size() - dec := streampb.NewDecoder(r) - enc := streampb.NewEncoder(w) + dec := codec.NewDecoder(r) + enc := codec.NewEncoder(w) return &Datafile{ id: id, @@ -126,7 +122,7 @@ func (df *Datafile) Size() int64 { return df.offset } -func (df *Datafile) Read() (e pb.Entry, n int64, err error) { +func (df *Datafile) Read() (e model.Entry, n int64, err error) { df.Lock() defer df.Unlock() @@ -138,20 +134,10 @@ func (df *Datafile) Read() (e pb.Entry, n int64, err error) { return } -func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) { +func (df *Datafile) ReadAt(index, size int64) (e model.Entry, err error) { var n int - var b []byte - if memPool == nil { - b = make([]byte, size) - } else { - poolSlice := memPool.Get() - if poolSlice.Cap() < int(size) { - poolSlice.Grow(int(size) - poolSlice.Cap()) - } - defer memPool.Put(poolSlice) - b = poolSlice.Bytes()[:size] - } + b := make([]byte, size) if df.w == nil { n, err = df.ra.ReadAt(b, index) @@ -166,14 +152,13 @@ func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) { return } - err = proto.Unmarshal(b[prefixSize:], &e) - if err != nil { - return - } + valueOffset, _ := codec.GetKeyValueSizes(b) + codec.DecodeWithoutPrefix(b[codec.KeySize+codec.ValueSize:], valueOffset, &e) + return } -func (df *Datafile) Write(e pb.Entry) (int64, int64, error) { +func (df *Datafile) Write(e model.Entry) (int64, int64, error) { if df.w == nil { return -1, 0, ErrReadonly } @@ -183,23 +168,11 @@ func (df *Datafile) Write(e pb.Entry) (int64, int64, error) { e.Offset = df.offset - n, err := df.enc.Encode(&e) + n, err := df.enc.Encode(e) if err != nil { return -1, 0, err } df.offset += n return e.Offset, n, nil -} - -// ConfigureMemPool configurate the mempool accordingly -func ConfigureMemPool(maxConcurrency *int) { - mxMemPool.Lock() - defer mxMemPool.Unlock() - if maxConcurrency == nil { - memPool = nil - } else { - memPool = bpool.NewBufferPool(*maxConcurrency) - } - return -} +} \ No newline at end of file diff --git a/internal/entry.go b/internal/entry.go deleted file mode 100644 index 4d0de13..0000000 --- a/internal/entry.go +++ /dev/null @@ -1,17 +0,0 @@ -package internal - -import ( - "hash/crc32" - - pb "github.com/prologic/bitcask/internal/proto" -) - -func NewEntry(key, value []byte) pb.Entry { - checksum := crc32.ChecksumIEEE(value) - - return pb.Entry{ - Checksum: checksum, - Key: key, - Value: value, - } -} diff --git a/internal/model/entry.go b/internal/model/entry.go new file mode 100644 index 0000000..860ea15 --- /dev/null +++ b/internal/model/entry.go @@ -0,0 +1,23 @@ +package model + +import ( + "hash/crc32" +) +// Entry represents a key/value in the database +type Entry struct { + Checksum uint32 + Key []byte + Offset int64 + Value []byte + } + + +func NewEntry(key, value []byte) Entry { + checksum := crc32.ChecksumIEEE(value) + + return Entry{ + Checksum: checksum, + Key: key, + Value: value, + } +} diff --git a/internal/proto/doc.go b/internal/proto/doc.go deleted file mode 100644 index 3fe484d..0000000 --- a/internal/proto/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -package proto - -//go:generate protoc --go_out=. entry.proto diff --git a/internal/streampb/stream.go b/internal/streampb/stream.go deleted file mode 100644 index 37791f4..0000000 --- a/internal/streampb/stream.go +++ /dev/null @@ -1,97 +0,0 @@ -package streampb - -import ( - "bufio" - "encoding/binary" - "io" - - "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" -) - -const ( - // prefixSize is the number of bytes we preallocate for storing - // our big endian lenth prefix buffer. - prefixSize = 8 -) - -// NewEncoder creates a streaming protobuf encoder. -func NewEncoder(w io.Writer) *Encoder { - return &Encoder{w: bufio.NewWriter(w)} -} - -// Encoder wraps an underlying io.Writer and allows you to stream -// proto encodings on it. -type Encoder struct { - w *bufio.Writer -} - -// Encode takes any proto.Message and streams it to the underlying writer. -// Messages are framed with a length prefix. -func (e *Encoder) Encode(msg proto.Message) (int64, error) { - prefixBuf := make([]byte, prefixSize) - - buf, err := proto.Marshal(msg) - if err != nil { - return 0, err - } - binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf))) - - if _, err := e.w.Write(prefixBuf); err != nil { - return 0, errors.Wrap(err, "failed writing length prefix") - } - - n, err := e.w.Write(buf) - if err != nil { - return 0, errors.Wrap(err, "failed writing marshaled data") - } - - if err = e.w.Flush(); err != nil { - return 0, errors.Wrap(err, "failed flushing data") - } - - return int64(n + prefixSize), nil -} - -// NewDecoder creates a streaming protobuf decoder. -func NewDecoder(r io.Reader) *Decoder { - return &Decoder{r: r} -} - -// Decoder wraps an underlying io.Reader and allows you to stream -// proto decodings on it. -type Decoder struct { - r io.Reader -} - -// Decode takes a proto.Message and unmarshals the next payload in the -// underlying io.Reader. It returns an EOF when it's done. -func (d *Decoder) Decode(v proto.Message) (int64, error) { - prefixBuf := make([]byte, prefixSize) - - _, err := io.ReadFull(d.r, prefixBuf) - if err != nil { - return 0, err - } - - n := binary.BigEndian.Uint64(prefixBuf) - - buf := make([]byte, n) - - idx := uint64(0) - for idx < n { - m, err := d.r.Read(buf[idx:n]) - if err != nil { - return 0, errors.Wrap(translateError(err), "failed reading marshaled data") - } - idx += uint64(m) - } - return int64(idx + prefixSize), proto.Unmarshal(buf[:n], v) -} - -func translateError(err error) error { - if err == io.EOF { - return io.ErrUnexpectedEOF - } - return err -} diff --git a/options.go b/options.go index d38894d..20b267d 100644 --- a/options.go +++ b/options.go @@ -31,7 +31,6 @@ type config struct { maxDatafileSize int maxKeySize int maxValueSize int - maxConcurrency *int } func (c *config) MarshalJSON() ([]byte, error) { @@ -102,14 +101,3 @@ func WithMaxValueSize(size int) Option { return nil } } - -// WithMemPool configures usage of a memory pool to avoid allocations -func WithMemPool(maxConcurrency int) Option { - return func(cfg *config) error { - if maxConcurrency <= 0 { - return ErrMaxConcurrencyLowerEqZero - } - cfg.maxConcurrency = &maxConcurrency - return nil - } -}