Compare commits

..

8 Commits

Author SHA1 Message Date
James Mills
2a35976cdd Ooops 2019-03-17 14:00:15 +10:00
James Mills
6fe6fe0689 Refactored configuration option handling. Fixes #3 2019-03-17 13:53:30 +10:00
James Mills
e83608b903 Fixed missing error handling opening new Datafile(s) during Put() Fixes #4 2019-03-17 13:47:07 +10:00
James Mills
67ab944db7 Refactored some internals and removed timestamp field (unsure why it was needed in the original paper) 2019-03-16 12:40:24 +10:00
James Mills
cb00b11dd7 Increase no. of goroutines to catch more race conditions in tests 2019-03-16 12:33:07 +10:00
James Mills
e9c858d43f Add CRC Checksum checks on reading values back 2019-03-16 12:16:23 +10:00
James Mills
120e854444 Improved error messages 2019-03-16 11:47:22 +10:00
James Mills
d2f44d1513 Fix a race condition + Use my fork of trie 2019-03-16 11:22:55 +10:00
10 changed files with 99 additions and 107 deletions

View File

@@ -1,29 +1,22 @@
package bitcask
import (
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
"github.com/derekparker/trie"
"github.com/gofrs/flock"
)
var (
ErrKeyNotFound = errors.New("error: key not found")
ErrKeyTooLarge = errors.New("error: key too large")
ErrValueTooLarge = errors.New("error: value too large")
ErrDatabaseLocked = errors.New("error: database locked")
"github.com/prologic/trie"
)
type Bitcask struct {
*flock.Flock
opts Options
config *config
path string
curr *Datafile
keydir *Keydir
@@ -54,7 +47,7 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
item, ok := b.keydir.Get(key)
if !ok {
return nil, ErrKeyNotFound
return nil, fmt.Errorf("error: key not found %s", key)
}
if item.FileID == b.curr.id {
@@ -63,28 +56,33 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
df = b.datafiles[item.FileID]
}
e, err := df.ReadAt(item.Index)
e, err := df.ReadAt(item.Offset)
if err != nil {
return nil, err
}
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return nil, fmt.Errorf("error: checksum falied %s %d != %d", key, e.Checksum, checksum)
}
return e.Value, nil
}
func (b *Bitcask) Put(key string, value []byte) error {
if len(key) > b.opts.MaxKeySize {
return ErrKeyTooLarge
if len(key) > b.config.MaxKeySize {
return fmt.Errorf("error: key too large %d > %d", len(key), b.config.MaxKeySize)
}
if len(value) > b.opts.MaxValueSize {
return ErrValueTooLarge
if len(value) > b.config.MaxValueSize {
return fmt.Errorf("error: value too large %d > %d", len(value), b.config.MaxValueSize)
}
index, err := b.put(key, value)
offset, err := b.put(key, value)
if err != nil {
return err
}
item := b.keydir.Add(key, b.curr.id, index, time.Now().Unix())
item := b.keydir.Add(key, b.curr.id, offset)
b.trie.Add(key, item)
return nil
@@ -134,6 +132,10 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) {
}
df, err := NewDatafile(b.path, b.curr.id, true)
if err != nil {
return -1, err
}
b.datafiles = append(b.datafiles, df)
id := b.curr.id + 1
@@ -212,7 +214,7 @@ func Merge(path string, force bool) error {
continue
}
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp)
keydir.Add(e.Key, ids[i], e.Offset)
}
tempdf, err := NewDatafile(temp, id, false)
@@ -223,7 +225,7 @@ func Merge(path string, force bool) error {
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
e, err := df.ReadAt(item.Index)
e, err := df.ReadAt(item.Offset)
if err != nil {
return err
}
@@ -259,7 +261,7 @@ func Merge(path string, force bool) error {
return nil
}
func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
func Open(path string, options ...option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
@@ -305,7 +307,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
for key := range hint.Keys() {
item, _ := hint.Get(key)
_ = keydir.Add(key, item.FileID, item.Index, item.Timestamp)
_ = keydir.Add(key, item.FileID, item.Offset)
trie.Add(key, item)
}
} else {
@@ -324,7 +326,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
continue
}
item := keydir.Add(e.Key, ids[i], e.Index, e.Timestamp)
item := keydir.Add(e.Key, ids[i], e.Offset)
trie.Add(e.Key, item)
}
}
@@ -342,7 +344,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")),
opts: NewDefaultOptions(),
config: NewDefaultConfig(),
path: path,
curr: curr,
keydir: keydir,
@@ -352,8 +354,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
maxDatafileSize: DefaultMaxDatafileSize,
}
for _, option := range options {
err = option(bitcask)
for _, opt := range options {
err = opt(bitcask.config)
if err != nil {
return nil, err
}
@@ -365,7 +367,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
}
if !locked {
return nil, ErrDatabaseLocked
return nil, fmt.Errorf("error: database locked %s", path)
}
return bitcask, nil

View File

@@ -45,7 +45,7 @@ func TestAll(t *testing.T) {
assert.NoError(err)
_, err = db.Get("foo")
assert.Error(err)
assert.Equal(err.Error(), "error: key not found")
assert.Equal("error: key not found foo", err.Error())
})
t.Run("Sync", func(t *testing.T) {
@@ -92,7 +92,7 @@ func TestDeletedKeys(t *testing.T) {
assert.NoError(err)
_, err = db.Get("foo")
assert.Error(err)
assert.Equal("error: key not found", err.Error())
assert.Equal("error: key not found foo", err.Error())
})
t.Run("Sync", func(t *testing.T) {
@@ -120,7 +120,7 @@ func TestDeletedKeys(t *testing.T) {
t.Run("Get", func(t *testing.T) {
_, err = db.Get("foo")
assert.Error(err)
assert.Equal("error: key not found", err.Error())
assert.Equal("error: key not found foo", err.Error())
})
t.Run("Close", func(t *testing.T) {
@@ -138,19 +138,17 @@ func TestMaxKeySize(t *testing.T) {
var db *Bitcask
size := 16
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxKeySize(size))
db, err = Open(testdir, WithMaxKeySize(16))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
key := strings.Repeat(" ", size+1)
key := strings.Repeat(" ", 17)
value := []byte("foobar")
err = db.Put(key, value)
assert.Error(err)
assert.Equal("error: key too large", err.Error())
assert.Equal("error: key too large 17 > 16", err.Error())
})
}
@@ -162,19 +160,17 @@ func TestMaxValueSize(t *testing.T) {
var db *Bitcask
size := 16
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxValueSize(size))
db, err = Open(testdir, WithMaxValueSize(16))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
key := "foo"
value := []byte(strings.Repeat(" ", size+1))
value := []byte(strings.Repeat(" ", 17))
err = db.Put(key, value)
assert.Error(err)
assert.Equal("error: value too large", err.Error())
assert.Equal("error: value too large 17 > 16", err.Error())
})
}
@@ -291,10 +287,9 @@ func TestConcurrent(t *testing.T) {
wg := &sync.WaitGroup{}
go f(wg, 2)
wg.Add(1)
go f(wg, 3)
wg.Add(1)
go f(wg, 5)
wg.Add(3)
wg.Wait()
})
@@ -314,10 +309,9 @@ func TestConcurrent(t *testing.T) {
wg := &sync.WaitGroup{}
go f(wg, 100)
wg.Add(1)
go f(wg, 100)
wg.Add(1)
go f(wg, 100)
wg.Add(3)
wg.Wait()
})
@@ -393,7 +387,7 @@ func TestLocking(t *testing.T) {
_, err = Open(testdir)
assert.Error(err)
assert.Equal("error: database locked", err.Error())
assert.Equal(fmt.Sprintf("error: database locked %s", testdir), err.Error())
}
type benchmarkTestCase struct {

View File

@@ -5,7 +5,6 @@ import (
"os"
"path/filepath"
"sync"
"time"
"github.com/pkg/errors"
@@ -96,6 +95,8 @@ func (df *Datafile) Sync() error {
}
func (df *Datafile) Size() (int64, error) {
df.RLock()
defer df.RUnlock()
return df.offset, nil
}
@@ -123,17 +124,16 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
return -1, ErrReadonly
}
e.Index = df.offset
e.Timestamp = time.Now().Unix()
df.Lock()
n, err := df.enc.Encode(&e)
df.Unlock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(&e)
if err != nil {
return -1, err
}
df.offset += n
return e.Index, nil
return e.Offset, nil
}

View File

@@ -7,11 +7,11 @@ import (
)
func NewEntry(key string, value []byte) pb.Entry {
crc := crc32.ChecksumIEEE(value)
checksum := crc32.ChecksumIEEE(value)
return pb.Entry{
CRC: crc,
Key: key,
Value: value,
Checksum: checksum,
Key: key,
Value: value,
}
}

1
go.mod
View File

@@ -10,6 +10,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.8.1
github.com/prologic/msgbus v0.1.1
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705
github.com/prometheus/client_golang v0.9.2 // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3

2
go.sum
View File

@@ -42,6 +42,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705 h1:2J+cSlAeECj0lfMKSmM7n5OlIio+yLovaKLZJzwLc6U=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705/go.mod h1:LFuDmpHJGmciXd8Rl5YMhVlLMps9gz2GtYLzwxrFhzs=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=

View File

@@ -9,9 +9,8 @@ import (
)
type Item struct {
FileID int
Index int64
Timestamp int64
FileID int
Offset int64
}
type Keydir struct {
@@ -25,11 +24,10 @@ func NewKeydir() *Keydir {
}
}
func (k *Keydir) Add(key string, fileid int, index, timestamp int64) Item {
func (k *Keydir) Add(key string, fileid int, offset int64) Item {
item := Item{
FileID: fileid,
Index: index,
Timestamp: timestamp,
FileID: fileid,
Offset: offset,
}
k.Lock()

View File

@@ -6,37 +6,42 @@ const (
DefaultMaxValueSize = 1 << 16 // 65KB
)
type Options struct {
// Option ...
type Option option
type option func(*config) error
type config struct {
MaxDatafileSize int
MaxKeySize int
MaxValueSize int
}
func NewDefaultOptions() Options {
return Options{
func NewDefaultConfig() *config {
return &config{
MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize,
}
}
func WithMaxDatafileSize(size int) func(*Bitcask) error {
return func(b *Bitcask) error {
b.opts.MaxDatafileSize = size
func WithMaxDatafileSize(size int) option {
return func(cfg *config) error {
cfg.MaxDatafileSize = size
return nil
}
}
func WithMaxKeySize(size int) func(*Bitcask) error {
return func(b *Bitcask) error {
b.opts.MaxKeySize = size
func WithMaxKeySize(size int) option {
return func(cfg *config) error {
cfg.MaxKeySize = size
return nil
}
}
func WithMaxValueSize(size int) func(*Bitcask) error {
return func(b *Bitcask) error {
b.opts.MaxValueSize = size
func WithMaxValueSize(size int) option {
return func(cfg *config) error {
cfg.MaxValueSize = size
return nil
}
}

View File

@@ -19,11 +19,10 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct {
CRC uint32 `protobuf:"varint,1,opt,name=CRC,proto3" json:"CRC,omitempty"`
Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Index int64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"`
Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
Timestamp int64 `protobuf:"varint,5,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@@ -33,7 +32,7 @@ func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_4f5906245d08394f, []int{0}
return fileDescriptor_entry_3e91842c99935ae2, []int{0}
}
func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b)
@@ -53,9 +52,9 @@ func (m *Entry) XXX_DiscardUnknown() {
var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetCRC() uint32 {
func (m *Entry) GetChecksum() uint32 {
if m != nil {
return m.CRC
return m.Checksum
}
return 0
}
@@ -67,9 +66,9 @@ func (m *Entry) GetKey() string {
return ""
}
func (m *Entry) GetIndex() int64 {
func (m *Entry) GetOffset() int64 {
if m != nil {
return m.Index
return m.Offset
}
return 0
}
@@ -81,28 +80,20 @@ func (m *Entry) GetValue() []byte {
return nil
}
func (m *Entry) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry")
}
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_4f5906245d08394f) }
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_3e91842c99935ae2) }
var fileDescriptor_entry_4f5906245d08394f = []byte{
// 134 bytes of a gzipped FileDescriptorProto
var fileDescriptor_entry_3e91842c99935ae2 = []byte{
// 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xa5, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x01, 0x2e, 0x66, 0xe7, 0x20, 0x67, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xde, 0x20,
0x10, 0x13, 0x24, 0xe2, 0x9d, 0x5a, 0x29, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a,
0x89, 0x70, 0xb1, 0x7a, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0x30, 0x07, 0x41,
0x38, 0x20, 0xd1, 0xb0, 0xc4, 0x9c, 0xd2, 0x54, 0x09, 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x08,
0x47, 0x48, 0x86, 0x8b, 0x33, 0x24, 0x33, 0x37, 0xb5, 0xb8, 0x24, 0x31, 0xb7, 0x40, 0x82, 0x15,
0xac, 0x1e, 0x21, 0x90, 0xc4, 0x06, 0xb6, 0xdd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x07, 0x99,
0x47, 0xb9, 0x93, 0x00, 0x00, 0x00,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
}

View File

@@ -3,9 +3,8 @@ syntax = "proto3";
package proto;
message Entry {
uint32 CRC = 1;
uint32 Checksum = 1;
string Key = 2;
int64 Index = 3;
int64 Offset = 3;
bytes Value = 4;
int64 Timestamp = 5;
}