mirror of
https://github.com/taigrr/bitcask
synced 2025-01-18 04:03:17 -08:00
Improve Get/Put performance with optional mempooling (#36)
* avoid unnecessary use of encoder/decoder to decrease memory allocations * add an optional configurable mempool to avoid extra allocs * add doc.go with examples
This commit is contained in:
parent
6ceeccfd64
commit
a407905ae2
@ -35,6 +35,10 @@ var (
|
|||||||
// ErrDatabaseLocked is the error returned if the database is locked
|
// ErrDatabaseLocked is the error returned if the database is locked
|
||||||
// (typically opened by another process)
|
// (typically opened by another process)
|
||||||
ErrDatabaseLocked = errors.New("error: database locked")
|
ErrDatabaseLocked = errors.New("error: database locked")
|
||||||
|
|
||||||
|
// ErrCreatingMemPool is the error returned when trying to configurate
|
||||||
|
// the mempool fails
|
||||||
|
ErrCreatingMemPool = errors.New("error: creating the mempool failed")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
|
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
|
||||||
@ -420,6 +424,8 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal.ConfigureMemPool(bitcask.config.maxConcurrency)
|
||||||
|
|
||||||
locked, err := bitcask.Flock.TryLock()
|
locked, err := bitcask.Flock.TryLock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -468,8 +468,9 @@ func TestLocking(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type benchmarkTestCase struct {
|
type benchmarkTestCase struct {
|
||||||
name string
|
name string
|
||||||
size int
|
size int
|
||||||
|
withPool bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkGet(b *testing.B) {
|
func BenchmarkGet(b *testing.B) {
|
||||||
@ -484,22 +485,25 @@ func BenchmarkGet(b *testing.B) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(testdir)
|
defer os.RemoveAll(testdir)
|
||||||
|
|
||||||
db, err := Open(testdir)
|
|
||||||
if err != nil {
|
|
||||||
b.Fatal(err)
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
tests := []benchmarkTestCase{
|
tests := []benchmarkTestCase{
|
||||||
{"128B", 128},
|
{"128B", 128, false},
|
||||||
{"256B", 256},
|
{"128BWithPool", 128, true},
|
||||||
{"512B", 512},
|
{"256B", 256, false},
|
||||||
{"1K", 1024},
|
{"256BWithPool", 256, true},
|
||||||
{"2K", 2048},
|
{"512B", 512, false},
|
||||||
{"4K", 4096},
|
{"512BWithPool", 512, true},
|
||||||
{"8K", 8192},
|
{"1K", 1024, false},
|
||||||
{"16K", 16384},
|
{"1KWithPool", 1024, true},
|
||||||
{"32K", 32768},
|
{"2K", 2048, false},
|
||||||
|
{"2KWithPool", 2048, true},
|
||||||
|
{"4K", 4096, false},
|
||||||
|
{"4KWithPool", 4096, true},
|
||||||
|
{"8K", 8192, false},
|
||||||
|
{"8KWithPool", 8192, true},
|
||||||
|
{"16K", 16384, false},
|
||||||
|
{"16KWithPool", 16384, true},
|
||||||
|
{"32K", 32768, false},
|
||||||
|
{"32KWithPool", 32768, true},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
@ -509,6 +513,18 @@ func BenchmarkGet(b *testing.B) {
|
|||||||
key := "foo"
|
key := "foo"
|
||||||
value := []byte(strings.Repeat(" ", tt.size))
|
value := []byte(strings.Repeat(" ", tt.size))
|
||||||
|
|
||||||
|
options := []Option{
|
||||||
|
WithMaxKeySize(len(key)),
|
||||||
|
WithMaxValueSize(tt.size),
|
||||||
|
}
|
||||||
|
if tt.withPool {
|
||||||
|
options = append(options, WithMemPool(1))
|
||||||
|
}
|
||||||
|
db, err := Open(testdir, options...)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
err = db.Put(key, value)
|
err = db.Put(key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
@ -524,6 +540,8 @@ func BenchmarkGet(b *testing.B) {
|
|||||||
b.Errorf("unexpected value")
|
b.Errorf("unexpected value")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
db.Close()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -547,15 +565,15 @@ func BenchmarkPut(b *testing.B) {
|
|||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
tests := []benchmarkTestCase{
|
tests := []benchmarkTestCase{
|
||||||
{"128B", 128},
|
{"128B", 128, false},
|
||||||
{"256B", 256},
|
{"256B", 256, false},
|
||||||
{"512B", 512},
|
{"512B", 512, false},
|
||||||
{"1K", 1024},
|
{"1K", 1024, false},
|
||||||
{"2K", 2048},
|
{"2K", 2048, false},
|
||||||
{"4K", 4096},
|
{"4K", 4096, false},
|
||||||
{"8K", 8192},
|
{"8K", 8192, false},
|
||||||
{"16K", 16384},
|
{"16K", 16384, false},
|
||||||
{"32K", 32768},
|
{"32K", 32768, false},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
13
doc.go
Normal file
13
doc.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
// Package bitcask implements a high-performance key-value store based on a
|
||||||
|
// WAL and LSM.
|
||||||
|
//
|
||||||
|
// By default, the client assumes a default configuration regarding maximum key size,
|
||||||
|
// maximum value size, maximum datafile size, and memory pools to avoid allocations.
|
||||||
|
// Refer to Constants section to know default values.
|
||||||
|
//
|
||||||
|
// For extra performance, configure the memory pool option properly. This option
|
||||||
|
// requires to specify the maximum number of concurrent use of the package. Failing to
|
||||||
|
// set a high-enough value would impact latency and throughput. Likewise, overestimating
|
||||||
|
// would yield in an unnecessary big memory footprint.
|
||||||
|
// The default configuration doesn't use a memory pool.
|
||||||
|
package bitcask
|
14
doc_test.go
Normal file
14
doc_test.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
package bitcask
|
||||||
|
|
||||||
|
func Example() {
|
||||||
|
_, _ = Open("path/to/db")
|
||||||
|
}
|
||||||
|
|
||||||
|
func Example_withOptions() {
|
||||||
|
opts := []Option{
|
||||||
|
WithMaxKeySize(1024),
|
||||||
|
WithMaxValueSize(4096),
|
||||||
|
WithMemPool(10),
|
||||||
|
}
|
||||||
|
_, _ = Open("path/to/db", opts...)
|
||||||
|
}
|
1
go.mod
1
go.mod
@ -6,6 +6,7 @@ require (
|
|||||||
github.com/golang/protobuf v1.3.2
|
github.com/golang/protobuf v1.3.2
|
||||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
|
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
|
||||||
github.com/magiconair/properties v1.8.1 // indirect
|
github.com/magiconair/properties v1.8.1 // indirect
|
||||||
|
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
|
||||||
github.com/pelletier/go-toml v1.4.0 // indirect
|
github.com/pelletier/go-toml v1.4.0 // indirect
|
||||||
github.com/pkg/errors v0.8.1
|
github.com/pkg/errors v0.8.1
|
||||||
github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5
|
github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5
|
||||||
|
2
go.sum
2
go.sum
@ -77,6 +77,8 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
|
|||||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||||
|
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
|
||||||
|
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
|
||||||
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
|
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
|
||||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||||
github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg=
|
github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg=
|
||||||
|
@ -1,26 +1,31 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/oxtoacart/bpool"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/exp/mmap"
|
"golang.org/x/exp/mmap"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
pb "github.com/prologic/bitcask/internal/proto"
|
pb "github.com/prologic/bitcask/internal/proto"
|
||||||
"github.com/prologic/bitcask/internal/streampb"
|
"github.com/prologic/bitcask/internal/streampb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultDatafileFilename = "%09d.data"
|
DefaultDatafileFilename = "%09d.data"
|
||||||
|
prefixSize = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrReadonly = errors.New("error: read only datafile")
|
ErrReadonly = errors.New("error: read only datafile")
|
||||||
ErrReadError = errors.New("error: read error")
|
ErrReadError = errors.New("error: read error")
|
||||||
|
|
||||||
|
memPool *bpool.BufferPool
|
||||||
|
mxMemPool sync.RWMutex
|
||||||
)
|
)
|
||||||
|
|
||||||
type Datafile struct {
|
type Datafile struct {
|
||||||
@ -136,7 +141,17 @@ func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
|
|||||||
func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
|
func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
|
||||||
var n int
|
var n int
|
||||||
|
|
||||||
b := make([]byte, size)
|
var b []byte
|
||||||
|
if memPool == nil {
|
||||||
|
b = make([]byte, size)
|
||||||
|
} else {
|
||||||
|
poolSlice := memPool.Get()
|
||||||
|
if poolSlice.Cap() < int(size) {
|
||||||
|
poolSlice.Grow(int(size) - poolSlice.Cap())
|
||||||
|
}
|
||||||
|
defer memPool.Put(poolSlice)
|
||||||
|
b = poolSlice.Bytes()[:size]
|
||||||
|
}
|
||||||
|
|
||||||
if df.w == nil {
|
if df.w == nil {
|
||||||
n, err = df.ra.ReadAt(b, index)
|
n, err = df.ra.ReadAt(b, index)
|
||||||
@ -151,9 +166,10 @@ func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := bytes.NewBuffer(b)
|
err = proto.Unmarshal(b[prefixSize:], &e)
|
||||||
dec := streampb.NewDecoder(buf)
|
if err != nil {
|
||||||
_, err = dec.Decode(&e)
|
return
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,3 +191,15 @@ func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
|
|||||||
|
|
||||||
return e.Offset, n, nil
|
return e.Offset, n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConfigureMemPool configurate the mempool accordingly
|
||||||
|
func ConfigureMemPool(maxConcurrency *int) {
|
||||||
|
mxMemPool.Lock()
|
||||||
|
defer mxMemPool.Unlock()
|
||||||
|
if maxConcurrency == nil {
|
||||||
|
memPool = nil
|
||||||
|
} else {
|
||||||
|
memPool = bpool.NewBufferPool(*maxConcurrency)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
20
options.go
20
options.go
@ -1,5 +1,7 @@
|
|||||||
package bitcask
|
package bitcask
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// DefaultMaxDatafileSize is the default maximum datafile size in bytes
|
// DefaultMaxDatafileSize is the default maximum datafile size in bytes
|
||||||
DefaultMaxDatafileSize = 1 << 20 // 1MB
|
DefaultMaxDatafileSize = 1 << 20 // 1MB
|
||||||
@ -11,6 +13,12 @@ const (
|
|||||||
DefaultMaxValueSize = 1 << 16 // 65KB
|
DefaultMaxValueSize = 1 << 16 // 65KB
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrMaxConcurrencyLowerEqZero is the error returned for
|
||||||
|
// maxConcurrency option not greater than zero
|
||||||
|
ErrMaxConcurrencyLowerEqZero = errors.New("error: maxConcurrency must be greater than zero")
|
||||||
|
)
|
||||||
|
|
||||||
// Option is a function that takes a config struct and modifies it
|
// Option is a function that takes a config struct and modifies it
|
||||||
type Option func(*config) error
|
type Option func(*config) error
|
||||||
|
|
||||||
@ -18,6 +26,7 @@ type config struct {
|
|||||||
maxDatafileSize int
|
maxDatafileSize int
|
||||||
maxKeySize int
|
maxKeySize int
|
||||||
maxValueSize int
|
maxValueSize int
|
||||||
|
maxConcurrency *int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDefaultConfig() *config {
|
func newDefaultConfig() *config {
|
||||||
@ -51,3 +60,14 @@ func WithMaxValueSize(size int) Option {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMemPool configures usage of a memory pool to avoid allocations
|
||||||
|
func WithMemPool(maxConcurrency int) Option {
|
||||||
|
return func(cfg *config) error {
|
||||||
|
if maxConcurrency <= 0 {
|
||||||
|
return ErrMaxConcurrencyLowerEqZero
|
||||||
|
}
|
||||||
|
cfg.maxConcurrency = &maxConcurrency
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user