diff --git a/bitcask.go b/bitcask.go index d0b61fe..aad8a2f 100644 --- a/bitcask.go +++ b/bitcask.go @@ -35,6 +35,10 @@ var ( // ErrDatabaseLocked is the error returned if the database is locked // (typically opened by another process) ErrDatabaseLocked = errors.New("error: database locked") + + // ErrCreatingMemPool is the error returned when trying to configurate + // the mempool fails + ErrCreatingMemPool = errors.New("error: creating the mempool failed") ) // Bitcask is a struct that represents a on-disk LSM and WAL data structure @@ -420,6 +424,8 @@ func Open(path string, options ...Option) (*Bitcask, error) { } } + internal.ConfigureMemPool(bitcask.config.maxConcurrency) + locked, err := bitcask.Flock.TryLock() if err != nil { return nil, err diff --git a/bitcask_test.go b/bitcask_test.go index 0a6d587..8f6a3a8 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -468,8 +468,9 @@ func TestLocking(t *testing.T) { } type benchmarkTestCase struct { - name string - size int + name string + size int + withPool bool } func BenchmarkGet(b *testing.B) { @@ -484,22 +485,25 @@ func BenchmarkGet(b *testing.B) { } defer os.RemoveAll(testdir) - db, err := Open(testdir) - if err != nil { - b.Fatal(err) - } - defer db.Close() - tests := []benchmarkTestCase{ - {"128B", 128}, - {"256B", 256}, - {"512B", 512}, - {"1K", 1024}, - {"2K", 2048}, - {"4K", 4096}, - {"8K", 8192}, - {"16K", 16384}, - {"32K", 32768}, + {"128B", 128, false}, + {"128BWithPool", 128, true}, + {"256B", 256, false}, + {"256BWithPool", 256, true}, + {"512B", 512, false}, + {"512BWithPool", 512, true}, + {"1K", 1024, false}, + {"1KWithPool", 1024, true}, + {"2K", 2048, false}, + {"2KWithPool", 2048, true}, + {"4K", 4096, false}, + {"4KWithPool", 4096, true}, + {"8K", 8192, false}, + {"8KWithPool", 8192, true}, + {"16K", 16384, false}, + {"16KWithPool", 16384, true}, + {"32K", 32768, false}, + {"32KWithPool", 32768, true}, } for _, tt := range tests { @@ -509,6 +513,18 @@ func BenchmarkGet(b *testing.B) { key := "foo" value := []byte(strings.Repeat(" ", tt.size)) + options := []Option{ + WithMaxKeySize(len(key)), + WithMaxValueSize(tt.size), + } + if tt.withPool { + options = append(options, WithMemPool(1)) + } + db, err := Open(testdir, options...) + if err != nil { + b.Fatal(err) + } + err = db.Put(key, value) if err != nil { b.Fatal(err) @@ -524,6 +540,8 @@ func BenchmarkGet(b *testing.B) { b.Errorf("unexpected value") } } + b.StopTimer() + db.Close() }) } } @@ -547,15 +565,15 @@ func BenchmarkPut(b *testing.B) { defer db.Close() tests := []benchmarkTestCase{ - {"128B", 128}, - {"256B", 256}, - {"512B", 512}, - {"1K", 1024}, - {"2K", 2048}, - {"4K", 4096}, - {"8K", 8192}, - {"16K", 16384}, - {"32K", 32768}, + {"128B", 128, false}, + {"256B", 256, false}, + {"512B", 512, false}, + {"1K", 1024, false}, + {"2K", 2048, false}, + {"4K", 4096, false}, + {"8K", 8192, false}, + {"16K", 16384, false}, + {"32K", 32768, false}, } for _, tt := range tests { diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..b401b0c --- /dev/null +++ b/doc.go @@ -0,0 +1,13 @@ +// Package bitcask implements a high-performance key-value store based on a +// WAL and LSM. +// +// By default, the client assumes a default configuration regarding maximum key size, +// maximum value size, maximum datafile size, and memory pools to avoid allocations. +// Refer to Constants section to know default values. +// +// For extra performance, configure the memory pool option properly. This option +// requires to specify the maximum number of concurrent use of the package. Failing to +// set a high-enough value would impact latency and throughput. Likewise, overestimating +// would yield in an unnecessary big memory footprint. +// The default configuration doesn't use a memory pool. +package bitcask diff --git a/doc_test.go b/doc_test.go new file mode 100644 index 0000000..1a9d921 --- /dev/null +++ b/doc_test.go @@ -0,0 +1,14 @@ +package bitcask + +func Example() { + _, _ = Open("path/to/db") +} + +func Example_withOptions() { + opts := []Option{ + WithMaxKeySize(1024), + WithMaxValueSize(4096), + WithMemPool(10), + } + _, _ = Open("path/to/db", opts...) +} diff --git a/go.mod b/go.mod index fd2eb86..3848ebf 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/golang/protobuf v1.3.2 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/magiconair/properties v1.8.1 // indirect + github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/pelletier/go-toml v1.4.0 // indirect github.com/pkg/errors v0.8.1 github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5 diff --git a/go.sum b/go.sum index 130c883..0ab38f9 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= +github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg= diff --git a/internal/datafile.go b/internal/datafile.go index ad96637..cca0edf 100644 --- a/internal/datafile.go +++ b/internal/datafile.go @@ -1,26 +1,31 @@ package internal import ( - "bytes" "fmt" "os" "path/filepath" "sync" + "github.com/oxtoacart/bpool" "github.com/pkg/errors" "golang.org/x/exp/mmap" + "github.com/gogo/protobuf/proto" pb "github.com/prologic/bitcask/internal/proto" "github.com/prologic/bitcask/internal/streampb" ) const ( DefaultDatafileFilename = "%09d.data" + prefixSize = 8 ) var ( ErrReadonly = errors.New("error: read only datafile") ErrReadError = errors.New("error: read error") + + memPool *bpool.BufferPool + mxMemPool sync.RWMutex ) type Datafile struct { @@ -136,7 +141,17 @@ func (df *Datafile) Read() (e pb.Entry, n int64, err error) { func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) { var n int - b := make([]byte, size) + var b []byte + if memPool == nil { + b = make([]byte, size) + } else { + poolSlice := memPool.Get() + if poolSlice.Cap() < int(size) { + poolSlice.Grow(int(size) - poolSlice.Cap()) + } + defer memPool.Put(poolSlice) + b = poolSlice.Bytes()[:size] + } if df.w == nil { n, err = df.ra.ReadAt(b, index) @@ -151,9 +166,10 @@ func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) { return } - buf := bytes.NewBuffer(b) - dec := streampb.NewDecoder(buf) - _, err = dec.Decode(&e) + err = proto.Unmarshal(b[prefixSize:], &e) + if err != nil { + return + } return } @@ -175,3 +191,15 @@ func (df *Datafile) Write(e pb.Entry) (int64, int64, error) { return e.Offset, n, nil } + +// ConfigureMemPool configurate the mempool accordingly +func ConfigureMemPool(maxConcurrency *int) { + mxMemPool.Lock() + defer mxMemPool.Unlock() + if maxConcurrency == nil { + memPool = nil + } else { + memPool = bpool.NewBufferPool(*maxConcurrency) + } + return +} diff --git a/options.go b/options.go index a9c3eff..8b32733 100644 --- a/options.go +++ b/options.go @@ -1,5 +1,7 @@ package bitcask +import "errors" + const ( // DefaultMaxDatafileSize is the default maximum datafile size in bytes DefaultMaxDatafileSize = 1 << 20 // 1MB @@ -11,6 +13,12 @@ const ( DefaultMaxValueSize = 1 << 16 // 65KB ) +var ( + // ErrMaxConcurrencyLowerEqZero is the error returned for + // maxConcurrency option not greater than zero + ErrMaxConcurrencyLowerEqZero = errors.New("error: maxConcurrency must be greater than zero") +) + // Option is a function that takes a config struct and modifies it type Option func(*config) error @@ -18,6 +26,7 @@ type config struct { maxDatafileSize int maxKeySize int maxValueSize int + maxConcurrency *int } func newDefaultConfig() *config { @@ -51,3 +60,14 @@ func WithMaxValueSize(size int) Option { return nil } } + +// WithMemPool configures usage of a memory pool to avoid allocations +func WithMemPool(maxConcurrency int) Option { + return func(cfg *config) error { + if maxConcurrency <= 0 { + return ErrMaxConcurrencyLowerEqZero + } + cfg.maxConcurrency = &maxConcurrency + return nil + } +}