1
0
mirror of https://github.com/taigrr/bitcask synced 2025-01-18 04:03:17 -08:00
bitcask/internal/datafile.go
Ignacio Hagopian a407905ae2 Improve Get/Put performance with optional mempooling (#36)
* avoid unnecessary use of encoder/decoder to decrease memory allocations

* add an optional configurable mempool to avoid extra allocs

* add doc.go with examples
2019-08-05 07:23:07 +10:00

206 lines
3.3 KiB
Go

package internal
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/oxtoacart/bpool"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
"github.com/gogo/protobuf/proto"
pb "github.com/prologic/bitcask/internal/proto"
"github.com/prologic/bitcask/internal/streampb"
)
const (
DefaultDatafileFilename = "%09d.data"
prefixSize = 8
)
var (
ErrReadonly = errors.New("error: read only datafile")
ErrReadError = errors.New("error: read error")
memPool *bpool.BufferPool
mxMemPool sync.RWMutex
)
type Datafile struct {
sync.RWMutex
id int
r *os.File
ra *mmap.ReaderAt
w *os.File
offset int64
dec *streampb.Decoder
enc *streampb.Encoder
}
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
var (
r *os.File
ra *mmap.ReaderAt
w *os.File
err error
)
fn := filepath.Join(path, fmt.Sprintf(DefaultDatafileFilename, id))
if !readonly {
w, err = os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
if err != nil {
return nil, err
}
}
r, err = os.Open(fn)
if err != nil {
return nil, err
}
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
ra, err = mmap.Open(fn)
if err != nil {
return nil, err
}
offset := stat.Size()
dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w)
return &Datafile{
id: id,
r: r,
ra: ra,
w: w,
offset: offset,
dec: dec,
enc: enc,
}, nil
}
func (df *Datafile) FileID() int {
return df.id
}
func (df *Datafile) Name() string {
return df.r.Name()
}
func (df *Datafile) Close() error {
defer func() {
df.ra.Close()
df.r.Close()
}()
// Readonly Datafile -- Nothing further to close on the write side
if df.w == nil {
return nil
}
err := df.Sync()
if err != nil {
return err
}
return df.w.Close()
}
func (df *Datafile) Sync() error {
if df.w == nil {
return nil
}
return df.w.Sync()
}
func (df *Datafile) Size() int64 {
df.RLock()
defer df.RUnlock()
return df.offset
}
func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
n, err = df.dec.Decode(&e)
if err != nil {
return
}
return
}
func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
var n int
var b []byte
if memPool == nil {
b = make([]byte, size)
} else {
poolSlice := memPool.Get()
if poolSlice.Cap() < int(size) {
poolSlice.Grow(int(size) - poolSlice.Cap())
}
defer memPool.Put(poolSlice)
b = poolSlice.Bytes()[:size]
}
if df.w == nil {
n, err = df.ra.ReadAt(b, index)
} else {
n, err = df.r.ReadAt(b, index)
}
if err != nil {
return
}
if int64(n) != size {
err = ErrReadError
return
}
err = proto.Unmarshal(b[prefixSize:], &e)
if err != nil {
return
}
return
}
func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, ErrReadonly
}
df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(&e)
if err != nil {
return -1, 0, err
}
df.offset += n
return e.Offset, n, nil
}
// ConfigureMemPool configurate the mempool accordingly
func ConfigureMemPool(maxConcurrency *int) {
mxMemPool.Lock()
defer mxMemPool.Unlock()
if maxConcurrency == nil {
memPool = nil
} else {
memPool = bpool.NewBufferPool(*maxConcurrency)
}
return
}