Compare commits

...

7 Commits

Author SHA1 Message Date
James Mills
2a35976cdd Ooops 2019-03-17 14:00:15 +10:00
James Mills
6fe6fe0689 Refactored configuration option handling. Fixes #3 2019-03-17 13:53:30 +10:00
James Mills
e83608b903 Fixed missing error handling opening new Datafile(s) during Put() Fixes #4 2019-03-17 13:47:07 +10:00
James Mills
67ab944db7 Refactored some internals and removed timestamp field (unsure why it was needed in the original paper) 2019-03-16 12:40:24 +10:00
James Mills
cb00b11dd7 Increase no. of goroutines to catch more race conditions in tests 2019-03-16 12:33:07 +10:00
James Mills
e9c858d43f Add CRC Checksum checks on reading values back 2019-03-16 12:16:23 +10:00
James Mills
120e854444 Improved error messages 2019-03-16 11:47:22 +10:00
8 changed files with 90 additions and 103 deletions

View File

@@ -1,29 +1,22 @@
package bitcask package bitcask
import ( import (
"errors" "fmt"
"hash/crc32"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/gofrs/flock" "github.com/gofrs/flock"
"github.com/prologic/trie" "github.com/prologic/trie"
) )
var (
ErrKeyNotFound = errors.New("error: key not found")
ErrKeyTooLarge = errors.New("error: key too large")
ErrValueTooLarge = errors.New("error: value too large")
ErrDatabaseLocked = errors.New("error: database locked")
)
type Bitcask struct { type Bitcask struct {
*flock.Flock *flock.Flock
opts Options config *config
path string path string
curr *Datafile curr *Datafile
keydir *Keydir keydir *Keydir
@@ -54,7 +47,7 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
item, ok := b.keydir.Get(key) item, ok := b.keydir.Get(key)
if !ok { if !ok {
return nil, ErrKeyNotFound return nil, fmt.Errorf("error: key not found %s", key)
} }
if item.FileID == b.curr.id { if item.FileID == b.curr.id {
@@ -63,28 +56,33 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
df = b.datafiles[item.FileID] df = b.datafiles[item.FileID]
} }
e, err := df.ReadAt(item.Index) e, err := df.ReadAt(item.Offset)
if err != nil { if err != nil {
return nil, err return nil, err
} }
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return nil, fmt.Errorf("error: checksum falied %s %d != %d", key, e.Checksum, checksum)
}
return e.Value, nil return e.Value, nil
} }
func (b *Bitcask) Put(key string, value []byte) error { func (b *Bitcask) Put(key string, value []byte) error {
if len(key) > b.opts.MaxKeySize { if len(key) > b.config.MaxKeySize {
return ErrKeyTooLarge return fmt.Errorf("error: key too large %d > %d", len(key), b.config.MaxKeySize)
} }
if len(value) > b.opts.MaxValueSize { if len(value) > b.config.MaxValueSize {
return ErrValueTooLarge return fmt.Errorf("error: value too large %d > %d", len(value), b.config.MaxValueSize)
} }
index, err := b.put(key, value) offset, err := b.put(key, value)
if err != nil { if err != nil {
return err return err
} }
item := b.keydir.Add(key, b.curr.id, index, time.Now().Unix()) item := b.keydir.Add(key, b.curr.id, offset)
b.trie.Add(key, item) b.trie.Add(key, item)
return nil return nil
@@ -134,6 +132,10 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) {
} }
df, err := NewDatafile(b.path, b.curr.id, true) df, err := NewDatafile(b.path, b.curr.id, true)
if err != nil {
return -1, err
}
b.datafiles = append(b.datafiles, df) b.datafiles = append(b.datafiles, df)
id := b.curr.id + 1 id := b.curr.id + 1
@@ -212,7 +214,7 @@ func Merge(path string, force bool) error {
continue continue
} }
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) keydir.Add(e.Key, ids[i], e.Offset)
} }
tempdf, err := NewDatafile(temp, id, false) tempdf, err := NewDatafile(temp, id, false)
@@ -223,7 +225,7 @@ func Merge(path string, force bool) error {
for key := range keydir.Keys() { for key := range keydir.Keys() {
item, _ := keydir.Get(key) item, _ := keydir.Get(key)
e, err := df.ReadAt(item.Index) e, err := df.ReadAt(item.Offset)
if err != nil { if err != nil {
return err return err
} }
@@ -259,7 +261,7 @@ func Merge(path string, force bool) error {
return nil return nil
} }
func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { func Open(path string, options ...option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil { if err := os.MkdirAll(path, 0755); err != nil {
return nil, err return nil, err
} }
@@ -305,7 +307,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
for key := range hint.Keys() { for key := range hint.Keys() {
item, _ := hint.Get(key) item, _ := hint.Get(key)
_ = keydir.Add(key, item.FileID, item.Index, item.Timestamp) _ = keydir.Add(key, item.FileID, item.Offset)
trie.Add(key, item) trie.Add(key, item)
} }
} else { } else {
@@ -324,7 +326,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
continue continue
} }
item := keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) item := keydir.Add(e.Key, ids[i], e.Offset)
trie.Add(e.Key, item) trie.Add(e.Key, item)
} }
} }
@@ -342,7 +344,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
bitcask := &Bitcask{ bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")), Flock: flock.New(filepath.Join(path, "lock")),
opts: NewDefaultOptions(), config: NewDefaultConfig(),
path: path, path: path,
curr: curr, curr: curr,
keydir: keydir, keydir: keydir,
@@ -352,8 +354,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
maxDatafileSize: DefaultMaxDatafileSize, maxDatafileSize: DefaultMaxDatafileSize,
} }
for _, option := range options { for _, opt := range options {
err = option(bitcask) err = opt(bitcask.config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -365,7 +367,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
} }
if !locked { if !locked {
return nil, ErrDatabaseLocked return nil, fmt.Errorf("error: database locked %s", path)
} }
return bitcask, nil return bitcask, nil

View File

@@ -45,7 +45,7 @@ func TestAll(t *testing.T) {
assert.NoError(err) assert.NoError(err)
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal(err.Error(), "error: key not found") assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Sync", func(t *testing.T) { t.Run("Sync", func(t *testing.T) {
@@ -92,7 +92,7 @@ func TestDeletedKeys(t *testing.T) {
assert.NoError(err) assert.NoError(err)
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal("error: key not found", err.Error()) assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Sync", func(t *testing.T) { t.Run("Sync", func(t *testing.T) {
@@ -120,7 +120,7 @@ func TestDeletedKeys(t *testing.T) {
t.Run("Get", func(t *testing.T) { t.Run("Get", func(t *testing.T) {
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal("error: key not found", err.Error()) assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Close", func(t *testing.T) { t.Run("Close", func(t *testing.T) {
@@ -138,19 +138,17 @@ func TestMaxKeySize(t *testing.T) {
var db *Bitcask var db *Bitcask
size := 16
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxKeySize(size)) db, err = Open(testdir, WithMaxKeySize(16))
assert.NoError(err) assert.NoError(err)
}) })
t.Run("Put", func(t *testing.T) { t.Run("Put", func(t *testing.T) {
key := strings.Repeat(" ", size+1) key := strings.Repeat(" ", 17)
value := []byte("foobar") value := []byte("foobar")
err = db.Put(key, value) err = db.Put(key, value)
assert.Error(err) assert.Error(err)
assert.Equal("error: key too large", err.Error()) assert.Equal("error: key too large 17 > 16", err.Error())
}) })
} }
@@ -162,19 +160,17 @@ func TestMaxValueSize(t *testing.T) {
var db *Bitcask var db *Bitcask
size := 16
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxValueSize(size)) db, err = Open(testdir, WithMaxValueSize(16))
assert.NoError(err) assert.NoError(err)
}) })
t.Run("Put", func(t *testing.T) { t.Run("Put", func(t *testing.T) {
key := "foo" key := "foo"
value := []byte(strings.Repeat(" ", size+1)) value := []byte(strings.Repeat(" ", 17))
err = db.Put(key, value) err = db.Put(key, value)
assert.Error(err) assert.Error(err)
assert.Equal("error: value too large", err.Error()) assert.Equal("error: value too large 17 > 16", err.Error())
}) })
} }
@@ -291,10 +287,9 @@ func TestConcurrent(t *testing.T) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
go f(wg, 2) go f(wg, 2)
wg.Add(1)
go f(wg, 3) go f(wg, 3)
wg.Add(1) go f(wg, 5)
wg.Add(3)
wg.Wait() wg.Wait()
}) })
@@ -314,10 +309,9 @@ func TestConcurrent(t *testing.T) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
go f(wg, 100) go f(wg, 100)
wg.Add(1)
go f(wg, 100) go f(wg, 100)
wg.Add(1) go f(wg, 100)
wg.Add(3)
wg.Wait() wg.Wait()
}) })
@@ -393,7 +387,7 @@ func TestLocking(t *testing.T) {
_, err = Open(testdir) _, err = Open(testdir)
assert.Error(err) assert.Error(err)
assert.Equal("error: database locked", err.Error()) assert.Equal(fmt.Sprintf("error: database locked %s", testdir), err.Error())
} }
type benchmarkTestCase struct { type benchmarkTestCase struct {

View File

@@ -5,7 +5,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
@@ -128,8 +127,7 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
df.Lock() df.Lock()
defer df.Unlock() defer df.Unlock()
e.Index = df.offset e.Offset = df.offset
e.Timestamp = time.Now().Unix()
n, err := df.enc.Encode(&e) n, err := df.enc.Encode(&e)
if err != nil { if err != nil {
@@ -137,5 +135,5 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
} }
df.offset += n df.offset += n
return e.Index, nil return e.Offset, nil
} }

View File

@@ -7,11 +7,11 @@ import (
) )
func NewEntry(key string, value []byte) pb.Entry { func NewEntry(key string, value []byte) pb.Entry {
crc := crc32.ChecksumIEEE(value) checksum := crc32.ChecksumIEEE(value)
return pb.Entry{ return pb.Entry{
CRC: crc, Checksum: checksum,
Key: key, Key: key,
Value: value, Value: value,
} }
} }

View File

@@ -9,9 +9,8 @@ import (
) )
type Item struct { type Item struct {
FileID int FileID int
Index int64 Offset int64
Timestamp int64
} }
type Keydir struct { type Keydir struct {
@@ -25,11 +24,10 @@ func NewKeydir() *Keydir {
} }
} }
func (k *Keydir) Add(key string, fileid int, index, timestamp int64) Item { func (k *Keydir) Add(key string, fileid int, offset int64) Item {
item := Item{ item := Item{
FileID: fileid, FileID: fileid,
Index: index, Offset: offset,
Timestamp: timestamp,
} }
k.Lock() k.Lock()

View File

@@ -6,37 +6,42 @@ const (
DefaultMaxValueSize = 1 << 16 // 65KB DefaultMaxValueSize = 1 << 16 // 65KB
) )
type Options struct { // Option ...
type Option option
type option func(*config) error
type config struct {
MaxDatafileSize int MaxDatafileSize int
MaxKeySize int MaxKeySize int
MaxValueSize int MaxValueSize int
} }
func NewDefaultOptions() Options { func NewDefaultConfig() *config {
return Options{ return &config{
MaxDatafileSize: DefaultMaxDatafileSize, MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize, MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize, MaxValueSize: DefaultMaxValueSize,
} }
} }
func WithMaxDatafileSize(size int) func(*Bitcask) error { func WithMaxDatafileSize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxDatafileSize = size cfg.MaxDatafileSize = size
return nil return nil
} }
} }
func WithMaxKeySize(size int) func(*Bitcask) error { func WithMaxKeySize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxKeySize = size cfg.MaxKeySize = size
return nil return nil
} }
} }
func WithMaxValueSize(size int) func(*Bitcask) error { func WithMaxValueSize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxValueSize = size cfg.MaxValueSize = size
return nil return nil
} }
} }

View File

@@ -19,11 +19,10 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct { type Entry struct {
CRC uint32 `protobuf:"varint,1,opt,name=CRC,proto3" json:"CRC,omitempty"` Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Index int64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"` Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"` Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
Timestamp int64 `protobuf:"varint,5,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@@ -33,7 +32,7 @@ func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) } func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {} func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) { func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_4f5906245d08394f, []int{0} return fileDescriptor_entry_3e91842c99935ae2, []int{0}
} }
func (m *Entry) XXX_Unmarshal(b []byte) error { func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b) return xxx_messageInfo_Entry.Unmarshal(m, b)
@@ -53,9 +52,9 @@ func (m *Entry) XXX_DiscardUnknown() {
var xxx_messageInfo_Entry proto.InternalMessageInfo var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetCRC() uint32 { func (m *Entry) GetChecksum() uint32 {
if m != nil { if m != nil {
return m.CRC return m.Checksum
} }
return 0 return 0
} }
@@ -67,9 +66,9 @@ func (m *Entry) GetKey() string {
return "" return ""
} }
func (m *Entry) GetIndex() int64 { func (m *Entry) GetOffset() int64 {
if m != nil { if m != nil {
return m.Index return m.Offset
} }
return 0 return 0
} }
@@ -81,28 +80,20 @@ func (m *Entry) GetValue() []byte {
return nil return nil
} }
func (m *Entry) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func init() { func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry") proto.RegisterType((*Entry)(nil), "proto.Entry")
} }
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_4f5906245d08394f) } func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_3e91842c99935ae2) }
var fileDescriptor_entry_4f5906245d08394f = []byte{ var fileDescriptor_entry_3e91842c99935ae2 = []byte{
// 134 bytes of a gzipped FileDescriptorProto // 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xa5, 0x5c, 0xac, 0xae, 0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x01, 0x2e, 0x66, 0xe7, 0x20, 0x67, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x10, 0x13, 0x24, 0xe2, 0x9d, 0x5a, 0x29, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x89, 0x70, 0xb1, 0x7a, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0x30, 0x07, 0x41, 0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x38, 0x20, 0xd1, 0xb0, 0xc4, 0x9c, 0xd2, 0x54, 0x09, 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x08, 0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x47, 0x48, 0x86, 0x8b, 0x33, 0x24, 0x33, 0x37, 0xb5, 0xb8, 0x24, 0x31, 0xb7, 0x40, 0x82, 0x15, 0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0xac, 0x1e, 0x21, 0x90, 0xc4, 0x06, 0xb6, 0xdd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x07, 0x99, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
0x47, 0xb9, 0x93, 0x00, 0x00, 0x00,
} }

View File

@@ -3,9 +3,8 @@ syntax = "proto3";
package proto; package proto;
message Entry { message Entry {
uint32 CRC = 1; uint32 Checksum = 1;
string Key = 2; string Key = 2;
int64 Index = 3; int64 Offset = 3;
bytes Value = 4; bytes Value = 4;
int64 Timestamp = 5;
} }