Compare commits

...

30 Commits

Author SHA1 Message Date
James Mills
1298240f53 Unexport some internal implemtnation details 2019-03-18 17:31:31 +10:00
James Mills
2a35976cdd Ooops 2019-03-17 14:00:15 +10:00
James Mills
6fe6fe0689 Refactored configuration option handling. Fixes #3 2019-03-17 13:53:30 +10:00
James Mills
e83608b903 Fixed missing error handling opening new Datafile(s) during Put() Fixes #4 2019-03-17 13:47:07 +10:00
James Mills
67ab944db7 Refactored some internals and removed timestamp field (unsure why it was needed in the original paper) 2019-03-16 12:40:24 +10:00
James Mills
cb00b11dd7 Increase no. of goroutines to catch more race conditions in tests 2019-03-16 12:33:07 +10:00
James Mills
e9c858d43f Add CRC Checksum checks on reading values back 2019-03-16 12:16:23 +10:00
James Mills
120e854444 Improved error messages 2019-03-16 11:47:22 +10:00
James Mills
d2f44d1513 Fix a race condition + Use my fork of trie 2019-03-16 11:22:55 +10:00
James Mills
c0f178c4f7 Improved read/write performance by another ~2x by not calling Stat() on every read/write 2019-03-16 08:15:07 +10:00
James Mills
2585222830 Improve write performance by ~33% to 80,000 writes/sec buf reducing syscalls and using a bufio.Writer 2019-03-16 07:41:37 +10:00
James Mills
3f1d6635c4 Add prefix scan for keys using a Trie 2019-03-15 23:48:50 +10:00
James Mills
67840ffb57 Call Close() at end of sub-commands 2019-03-14 21:50:41 +10:00
James Mills
9f0a357ca0 Remove lock file on Close() 2019-03-14 21:50:23 +10:00
James Mills
52b6c74a21 Fixed compile error in CLI 2019-03-14 21:33:40 +10:00
James Mills
d24a01797a Added WithMaxKeySize() and WithMaxValueSize() options 2019-03-14 21:31:23 +10:00
James Mills
bc8f6c6718 Change locking error message 2019-03-14 21:31:01 +10:00
James Mills
b6c212d60c Refactored option handling 2019-03-14 21:24:31 +10:00
James Mills
3f1b90eb23 Update README.md 2019-03-14 18:18:57 +10:00
James Mills
71a42800fe Improved benchmark test suite for various key/value sizes 2019-03-14 18:17:20 +10:00
James Mills
3b9627aeb8 Fix concurrent read bug 2019-03-14 17:58:06 +10:00
James Mills
e0c4c4fdae Fix concurrent write bug with multiple goroutines writing to the to the active datafile 2019-03-14 17:58:06 +10:00
James Mills
fb50eb2f82 Update README.md 2019-03-14 15:36:37 +10:00
James Mills
fb2335e3c1 Fixed tests 2019-03-14 07:46:59 +10:00
James Mills
9a8aca55ba Updated README 2019-03-13 21:40:43 +10:00
James Mills
32b782b229 Fixed arg handling in bitcaskd 2019-03-13 21:39:23 +10:00
James Mills
146f777683 Fixed versioning during build time 2019-03-13 21:31:21 +10:00
James Mills
809a14fbdc Fix usage output of bitcaskd 2019-03-13 21:25:26 +10:00
James Mills
238ff6ab59 Add a simple Redis compatible server daemon (bitcaskd) 2019-03-13 21:19:46 +10:00
James Mills
6a39d742b7 Update README.md 2019-03-13 20:27:27 +10:00
27 changed files with 845 additions and 230 deletions

1
.gitignore vendored
View File

@@ -3,5 +3,6 @@
/coverage.txt
/bitcask
/bitcaskd
/tmp
/dist

View File

@@ -1,10 +1,18 @@
builds:
-
binary: bitcask
main: ./cmd/bitcask
flags: -tags "static_build"
ldflags: -w -X .Version={{.Version}} -X .Commit={{.Commit}}
env:
- CGO_ENABLED=0
-
binary: bitcaskd
main: ./cmd/bitcaskd
flags: -tags "static_build"
ldflags: -w -X .Version={{.Version}} -X .Commit={{.Commit}}
env:
- CGO_ENABLED=0
sign:
artifacts: checksum
archive:

View File

@@ -1,18 +1,24 @@
.PHONY: dev build generate install image release profile bench test clean
CGO_ENABLED=0
VERSION=$(shell git describe --abbrev=0 --tags)
COMMIT=$(shell git rev-parse --short HEAD)
all: dev
dev: build
@./bitcask --version
@./bitcaskd --version
build: clean generate
@go build \
-tags "netgo static_build" -installsuffix netgo \
-ldflags "-w -X $(shell go list)/.Commit=$(COMMIT)" \
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT)" \
./cmd/bitcask/...
@go build \
-tags "netgo static_build" -installsuffix netgo \
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT)" \
./cmd/bitcaskd/...
generate:
@go generate $(shell go list)/...

View File

@@ -12,6 +12,10 @@ A Bitcask (LSM+WAL) Key/Value Store written in Go.
* Embeddable
* Builtin CLI
* Builtin Redis-compatible server
* Predictable read/write performance
* Low latecny
* High throughput (See: [Performance](README.md#Performance)
## Install
@@ -54,6 +58,38 @@ $ bitcask -p /tmp/db get Hello
World
```
## Usage (server)
There is also a builtin very simple Redis-compatible server called `bitcaskd`:
```#!bash
$ ./bitcaskd ./tmp
INFO[0000] starting bitcaskd v0.0.7@146f777 bind=":6379" path=./tmp
```
Example session:
```
$ telnet localhost 6379
Trying ::1...
Connected to localhost.
Escape character is '^]'.
SET foo bar
+OK
GET foo
$3
bar
DEL foo
:1
GET foo
$-1
PING
+PONG
QUIT
+OK
Connection closed by foreign host.
```
## Performance
Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
@@ -61,13 +97,35 @@ Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
```
$ make bench
...
BenchmarkGet-4 300000 5065 ns/op 144 B/op 4 allocs/op
BenchmarkPut-4 100000 14640 ns/op 699 B/op 7 allocs/op
BenchmarkGet/128B-4 300000 5178 ns/op 400 B/op 5 allocs/op
BenchmarkGet/256B-4 300000 5273 ns/op 656 B/op 5 allocs/op
BenchmarkGet/512B-4 200000 5368 ns/op 1200 B/op 5 allocs/op
BenchmarkGet/1K-4 200000 5800 ns/op 2288 B/op 5 allocs/op
BenchmarkGet/2K-4 200000 6766 ns/op 4464 B/op 5 allocs/op
BenchmarkGet/4K-4 200000 7857 ns/op 9072 B/op 5 allocs/op
BenchmarkGet/8K-4 200000 9538 ns/op 17776 B/op 5 allocs/op
BenchmarkGet/16K-4 100000 13188 ns/op 34928 B/op 5 allocs/op
BenchmarkGet/32K-4 100000 21620 ns/op 73840 B/op 5 allocs/op
BenchmarkPut/128B-4 200000 7875 ns/op 409 B/op 6 allocs/op
BenchmarkPut/256B-4 200000 8712 ns/op 538 B/op 6 allocs/op
BenchmarkPut/512B-4 200000 9832 ns/op 829 B/op 6 allocs/op
BenchmarkPut/1K-4 100000 13105 ns/op 1410 B/op 6 allocs/op
BenchmarkPut/2K-4 100000 18601 ns/op 2572 B/op 6 allocs/op
BenchmarkPut/4K-4 50000 36631 ns/op 5151 B/op 6 allocs/op
BenchmarkPut/8K-4 30000 56128 ns/op 9798 B/op 6 allocs/op
BenchmarkPut/16K-4 20000 83209 ns/op 18834 B/op 6 allocs/op
BenchmarkPut/32K-4 10000 135899 ns/op 41517 B/op 6 allocs/op
BenchmarkScan-4 1000000 1851 ns/op 493 B/op 25 allocs/op
```
* ~30,000 reads/sec for non-active data
* ~180,000 reads/sec for active data
* ~60,000 writes/sec
For 128B values:
* ~200,000 reads/sec
* ~130,000 writes/sec
The full benchmark above shows linear performance as you increase key/value sizes.
## License

View File

@@ -1,36 +1,29 @@
package bitcask
import (
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"github.com/gofrs/flock"
)
"github.com/prologic/trie"
const (
DefaultMaxDatafileSize = 1 << 20 // 1MB
)
var (
ErrKeyNotFound = errors.New("error: key not found")
ErrCannotAcquireLock = errors.New("error: cannot acquire lock")
"github.com/prologic/bitcask/internal"
)
type Bitcask struct {
*flock.Flock
config *config
path string
curr *Datafile
keydir *Keydir
datafiles []*Datafile
curr *internal.Datafile
keydir *internal.Keydir
datafiles []*internal.Datafile
trie *trie.Trie
maxDatafileSize int64
}
@@ -38,6 +31,7 @@ type Bitcask struct {
func (b *Bitcask) Close() error {
defer func() {
b.Flock.Unlock()
os.Remove(b.Flock.Path())
}()
for _, df := range b.datafiles {
@@ -51,34 +45,47 @@ func (b *Bitcask) Sync() error {
}
func (b *Bitcask) Get(key string) ([]byte, error) {
var df *Datafile
var df *internal.Datafile
item, ok := b.keydir.Get(key)
if !ok {
return nil, ErrKeyNotFound
return nil, fmt.Errorf("error: key not found %s", key)
}
if item.FileID == b.curr.id {
if item.FileID == b.curr.FileID() {
df = b.curr
} else {
df = b.datafiles[item.FileID]
}
e, err := df.ReadAt(item.Index)
e, err := df.ReadAt(item.Offset)
if err != nil {
return nil, err
}
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return nil, fmt.Errorf("error: checksum falied %s %d != %d", key, e.Checksum, checksum)
}
return e.Value, nil
}
func (b *Bitcask) Put(key string, value []byte) error {
index, err := b.put(key, value)
if len(key) > b.config.MaxKeySize {
return fmt.Errorf("error: key too large %d > %d", len(key), b.config.MaxKeySize)
}
if len(value) > b.config.MaxValueSize {
return fmt.Errorf("error: value too large %d > %d", len(value), b.config.MaxValueSize)
}
offset, err := b.put(key, value)
if err != nil {
return err
}
b.keydir.Add(key, b.curr.id, index, time.Now().Unix())
item := b.keydir.Add(key, b.curr.FileID(), offset)
b.trie.Add(key, item)
return nil
}
@@ -90,10 +97,21 @@ func (b *Bitcask) Delete(key string) error {
}
b.keydir.Delete(key)
b.trie.Remove(key)
return nil
}
func (b *Bitcask) Scan(prefix string, f func(key string) error) error {
keys := b.trie.PrefixSearch(prefix)
for _, key := range keys {
if err := f(key); err != nil {
return err
}
}
return nil
}
func (b *Bitcask) Fold(f func(key string) error) error {
for key := range b.keydir.Keys() {
if err := f(key); err != nil {
@@ -115,18 +133,22 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) {
return -1, err
}
df, err := NewDatafile(b.path, b.curr.id, true)
df, err := internal.NewDatafile(b.path, b.curr.FileID(), true)
if err != nil {
return -1, err
}
b.datafiles = append(b.datafiles, df)
id := b.curr.id + 1
curr, err := NewDatafile(b.path, id, false)
id := b.curr.FileID() + 1
curr, err := internal.NewDatafile(b.path, id, false)
if err != nil {
return -1, err
}
b.curr = curr
}
e := NewEntry(key, value)
e := internal.NewEntry(key, value)
return b.curr.Write(e)
}
@@ -135,46 +157,13 @@ func (b *Bitcask) setMaxDatafileSize(size int64) error {
return nil
}
func MaxDatafileSize(size int64) func(*Bitcask) error {
return func(b *Bitcask) error {
return b.setMaxDatafileSize(size)
}
}
func getDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {
return nil, err
}
sort.Strings(fns)
return fns, nil
}
func parseIds(fns []string) ([]int, error) {
var ids []int
for _, fn := range fns {
fn = filepath.Base(fn)
ext := filepath.Ext(fn)
if ext != ".data" {
continue
}
id, err := strconv.ParseInt(strings.TrimSuffix(fn, ext), 10, 32)
if err != nil {
return nil, err
}
ids = append(ids, int(id))
}
sort.Ints(ids)
return ids, nil
}
func Merge(path string, force bool) error {
fns, err := getDatafiles(path)
fns, err := internal.GetDatafiles(path)
if err != nil {
return err
}
ids, err := parseIds(fns)
ids, err := internal.ParseIds(fns)
if err != nil {
return err
}
@@ -204,9 +193,9 @@ func Merge(path string, force bool) error {
id := ids[i]
keydir := NewKeydir()
keydir := internal.NewKeydir()
df, err := NewDatafile(path, id, true)
df, err := internal.NewDatafile(path, id, true)
if err != nil {
return err
}
@@ -227,10 +216,10 @@ func Merge(path string, force bool) error {
continue
}
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp)
keydir.Add(e.Key, ids[i], e.Offset)
}
tempdf, err := NewDatafile(temp, id, false)
tempdf, err := internal.NewDatafile(temp, id, false)
if err != nil {
return err
}
@@ -238,7 +227,7 @@ func Merge(path string, force bool) error {
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
e, err := df.ReadAt(item.Index)
e, err := df.ReadAt(item.Offset)
if err != nil {
return err
}
@@ -274,7 +263,7 @@ func Merge(path string, force bool) error {
return nil
}
func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
func Open(path string, options ...option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
@@ -284,21 +273,23 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
return nil, err
}
fns, err := getDatafiles(path)
fns, err := internal.GetDatafiles(path)
if err != nil {
return nil, err
}
ids, err := parseIds(fns)
ids, err := internal.ParseIds(fns)
if err != nil {
return nil, err
}
keydir := NewKeydir()
var datafiles []*Datafile
var datafiles []*internal.Datafile
keydir := internal.NewKeydir()
trie := trie.New()
for i, fn := range fns {
df, err := NewDatafile(path, ids[i], true)
df, err := internal.NewDatafile(path, ids[i], true)
if err != nil {
return nil, err
}
@@ -311,14 +302,15 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
}
defer f.Close()
hint, err := NewKeydirFromBytes(f)
hint, err := internal.NewKeydirFromBytes(f)
if err != nil {
return nil, err
}
for key := range hint.Keys() {
item, _ := hint.Get(key)
keydir.Add(key, item.FileID, item.Index, item.Timestamp)
_ = keydir.Add(key, item.FileID, item.Offset)
trie.Add(key, item)
}
} else {
for {
@@ -336,7 +328,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
continue
}
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp)
item := keydir.Add(e.Key, ids[i], e.Offset)
trie.Add(e.Key, item)
}
}
}
@@ -346,23 +339,25 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
id = ids[(len(ids) - 1)]
}
curr, err := NewDatafile(path, id, false)
curr, err := internal.NewDatafile(path, id, false)
if err != nil {
return nil, err
}
bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")),
config: NewDefaultConfig(),
path: path,
curr: curr,
keydir: keydir,
datafiles: datafiles,
trie: trie,
maxDatafileSize: DefaultMaxDatafileSize,
}
for _, option := range options {
err = option(bitcask)
for _, opt := range options {
err = opt(bitcask.config)
if err != nil {
return nil, err
}
@@ -374,7 +369,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
}
if !locked {
return nil, ErrCannotAcquireLock
return nil, fmt.Errorf("error: database locked %s", path)
}
return bitcask, nil

View File

@@ -3,7 +3,10 @@ package bitcask
import (
"fmt"
"io/ioutil"
"reflect"
"sort"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@@ -42,7 +45,7 @@ func TestAll(t *testing.T) {
assert.NoError(err)
_, err = db.Get("foo")
assert.Error(err)
assert.Equal(err.Error(), "error: key not found")
assert.Equal("error: key not found foo", err.Error())
})
t.Run("Sync", func(t *testing.T) {
@@ -89,7 +92,7 @@ func TestDeletedKeys(t *testing.T) {
assert.NoError(err)
_, err = db.Get("foo")
assert.Error(err)
assert.Equal("error: key not found", err.Error())
assert.Equal("error: key not found foo", err.Error())
})
t.Run("Sync", func(t *testing.T) {
@@ -117,7 +120,7 @@ func TestDeletedKeys(t *testing.T) {
t.Run("Get", func(t *testing.T) {
_, err = db.Get("foo")
assert.Error(err)
assert.Equal("error: key not found", err.Error())
assert.Equal("error: key not found foo", err.Error())
})
t.Run("Close", func(t *testing.T) {
@@ -127,6 +130,50 @@ func TestDeletedKeys(t *testing.T) {
})
}
func TestMaxKeySize(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxKeySize(16))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
key := strings.Repeat(" ", 17)
value := []byte("foobar")
err = db.Put(key, value)
assert.Error(err)
assert.Equal("error: key too large 17 > 16", err.Error())
})
}
func TestMaxValueSize(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxValueSize(16))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
key := "foo"
value := []byte(strings.Repeat(" ", 17))
err = db.Put(key, value)
assert.Error(err)
assert.Equal("error: value too large 17 > 16", err.Error())
})
}
func TestMerge(t *testing.T) {
assert := assert.New(t)
@@ -140,7 +187,7 @@ func TestMerge(t *testing.T) {
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, MaxDatafileSize(1024))
db, err = Open(testdir, WithMaxDatafileSize(1024))
assert.NoError(err)
})
@@ -198,6 +245,136 @@ func TestMerge(t *testing.T) {
})
}
func TestConcurrent(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
err = db.Put("foo", []byte("bar"))
assert.NoError(err)
})
})
t.Run("Concurrent", func(t *testing.T) {
t.Run("Put", func(t *testing.T) {
f := func(wg *sync.WaitGroup, x int) {
defer func() {
wg.Done()
}()
for i := 0; i <= 100; i++ {
if i%x == 0 {
key := fmt.Sprintf("k%d", i)
value := []byte(fmt.Sprintf("v%d", i))
err := db.Put(key, value)
assert.NoError(err)
}
}
}
wg := &sync.WaitGroup{}
go f(wg, 2)
go f(wg, 3)
go f(wg, 5)
wg.Add(3)
wg.Wait()
})
t.Run("Get", func(t *testing.T) {
f := func(wg *sync.WaitGroup, N int) {
defer func() {
wg.Done()
}()
for i := 0; i <= N; i++ {
value, err := db.Get("foo")
assert.NoError(err)
assert.Equal([]byte("bar"), value)
}
}
wg := &sync.WaitGroup{}
go f(wg, 100)
go f(wg, 100)
go f(wg, 100)
wg.Add(3)
wg.Wait()
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestScan(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte("foo"),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err = db.Put(k, v)
assert.NoError(err)
}
})
})
t.Run("Scan", func(t *testing.T) {
var (
vals []string
expected = []string{
"foo",
"fooz ball",
"pizza",
}
)
err = db.Scan("fo", func(key string) error {
val, err := db.Get(key)
assert.NoError(err)
vals = append(vals, string(val))
return nil
})
sort.Strings(vals)
assert.Equal(expected, vals)
})
}
func TestLocking(t *testing.T) {
assert := assert.New(t)
@@ -210,7 +387,12 @@ func TestLocking(t *testing.T) {
_, err = Open(testdir)
assert.Error(err)
assert.Equal("error: cannot acquire lock", err.Error())
assert.Equal(fmt.Sprintf("error: database locked %s", testdir), err.Error())
}
type benchmarkTestCase struct {
name string
size int
}
func BenchmarkGet(b *testing.B) {
@@ -225,20 +407,39 @@ func BenchmarkGet(b *testing.B) {
}
defer db.Close()
err = db.Put("foo", []byte("bar"))
if err != nil {
b.Fatal(err)
tests := []benchmarkTestCase{
{"128B", 128},
{"256B", 256},
{"512B", 512},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
val, err := db.Get("foo")
if err != nil {
b.Fatal(err)
}
if string(val) != "bar" {
b.Errorf("expected val=bar got=%s", val)
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
key := "foo"
value := []byte(strings.Repeat(" ", tt.size))
err = db.Put(key, value)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
val, err := db.Get(key)
if err != nil {
b.Fatal(err)
}
if string(val) != string(value) {
b.Errorf("unexpected value")
}
}
})
}
}
@@ -254,11 +455,73 @@ func BenchmarkPut(b *testing.B) {
}
defer db.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := db.Put(fmt.Sprintf("key%d", i), []byte("bar"))
tests := []benchmarkTestCase{
{"128B", 128},
{"256B", 256},
{"512B", 512},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
key := "foo"
value := []byte(strings.Repeat(" ", tt.size))
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := db.Put(key, value)
if err != nil {
b.Fatal(err)
}
}
})
}
}
func BenchmarkScan(b *testing.B) {
testdir, err := ioutil.TempDir("", "bitcask")
if err != nil {
b.Fatal(err)
}
db, err := Open(testdir)
if err != nil {
b.Fatal(err)
}
defer db.Close()
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte("foo"),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err := db.Put(k, v)
if err != nil {
b.Fatal(err)
}
}
var expected = []string{"foo", "food", "fooz"}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var keys []string
err = db.Scan("fo", func(key string) error {
keys = append(keys, key)
return nil
})
sort.Strings(keys)
if !reflect.DeepEqual(expected, keys) {
b.Fatal(fmt.Errorf("expected keys=#%v got=%#v", expected, keys))
}
}
}

View File

@@ -35,6 +35,7 @@ func del(path, key string) int {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
err = db.Delete(key)
if err != nil {

View File

@@ -36,6 +36,7 @@ func get(path, key string) int {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
value, err := db.Get(key)
if err != nil {

View File

@@ -34,6 +34,7 @@ func keys(path string) int {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
err = db.Fold(func(key string) error {
fmt.Printf("%s\n", key)

View File

@@ -8,13 +8,13 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
"github.com/prologic/bitcask/internal"
)
// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
Use: "bitcask",
Version: bitcask.FullVersion(),
Version: internal.FullVersion(),
Short: "Command-line tools for bitcask",
Long: `This is the command-line tool to interact with a bitcask database.

60
cmd/bitcask/scan.go Normal file
View File

@@ -0,0 +1,60 @@
package main
import (
"fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
)
var scanCmd = &cobra.Command{
Use: "scan <prefix>",
Aliases: []string{"search", "find"},
Short: "Perform a prefis scan for keys",
Long: `This performa a prefix scan for keys starting with the given
prefix. This uses a Trie to search for matching keys and returns all matched
keys.`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
prefix := args[0]
os.Exit(scan(path, prefix))
},
}
func init() {
RootCmd.AddCommand(scanCmd)
}
func scan(path, prefix string) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
err = db.Scan(prefix, func(key string) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).Error("error reading key")
return err
}
fmt.Printf("%s\n", string(value))
log.WithField("key", key).WithField("value", value).Debug("key/value")
return nil
})
if err != nil {
log.WithError(err).Error("error scanning keys")
return 1
}
return 0
}

View File

@@ -47,6 +47,7 @@ func set(path, key string, value io.Reader) int {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
data, err := ioutil.ReadAll(value)
if err != nil {

124
cmd/bitcaskd/main.go Normal file
View File

@@ -0,0 +1,124 @@
package main
import (
"fmt"
"os"
"strings"
log "github.com/sirupsen/logrus"
flag "github.com/spf13/pflag"
"github.com/tidwall/redcon"
"github.com/prologic/bitcask"
"github.com/prologic/bitcask/internal"
)
var (
bind string
debug bool
version bool
maxDatafileSize int
)
func init() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [options] <path>\n", os.Args[0])
flag.PrintDefaults()
}
flag.BoolVarP(&version, "version", "v", false, "display version information")
flag.BoolVarP(&debug, "debug", "d", false, "enable debug logging")
flag.StringVarP(&bind, "bind", "b", ":6379", "interface and port to bind to")
flag.IntVar(&maxDatafileSize, "max-datafile-size", 1<<20, "maximum datafile size in bytes")
}
func main() {
flag.Parse()
if debug {
log.SetLevel(log.DebugLevel)
} else {
log.SetLevel(log.InfoLevel)
}
if version {
fmt.Printf("bitcaskd version %s", internal.FullVersion())
os.Exit(0)
}
if len(flag.Args()) < 1 {
flag.Usage()
os.Exit(1)
}
path := flag.Arg(0)
db, err := bitcask.Open(path, bitcask.WithMaxDatafileSize(maxDatafileSize))
if err != nil {
log.WithError(err).WithField("path", path).Error("error opening database")
os.Exit(1)
}
log.WithField("bind", bind).WithField("path", path).Infof("starting bitcaskd v%s", internal.FullVersion())
err = redcon.ListenAndServe(bind,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
value := cmd.Args[2]
err = db.Put(key, value)
if err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
} else {
conn.WriteString("OK")
}
case "get":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
value, err := db.Get(key)
if err != nil {
conn.WriteNull()
} else {
conn.WriteBulk(value)
}
case "del":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
err := db.Delete(key)
if err != nil {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
}
},
func(conn redcon.Conn) bool {
return true
},
func(conn redcon.Conn, err error) {
},
)
if err != nil {
log.WithError(err).Fatal("oops")
}
}

View File

@@ -1,17 +0,0 @@
package bitcask
import (
"hash/crc32"
pb "github.com/prologic/bitcask/proto"
)
func NewEntry(key string, value []byte) pb.Entry {
crc := crc32.ChecksumIEEE(value)
return pb.Entry{
CRC: crc,
Key: key,
Value: value,
}
}

4
go.mod
View File

@@ -1,6 +1,7 @@
module github.com/prologic/bitcask
require (
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7
github.com/gofrs/flock v0.7.1
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.2.0
@@ -9,10 +10,13 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.8.1
github.com/prologic/msgbus v0.1.1
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705
github.com/prometheus/client_golang v0.9.2 // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.3.1
github.com/stretchr/testify v1.3.0
github.com/tidwall/redcon v0.9.0
gopkg.in/vmihailenco/msgpack.v2 v2.9.1
)

6
go.sum
View File

@@ -8,6 +8,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7 h1:Cab9yoTQh1TxObKfis1DzZ6vFLK5kbeenMjRES/UE3o=
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
@@ -40,6 +42,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705 h1:2J+cSlAeECj0lfMKSmM7n5OlIio+yLovaKLZJzwLc6U=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705/go.mod h1:LFuDmpHJGmciXd8Rl5YMhVlLMps9gz2GtYLzwxrFhzs=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
@@ -67,6 +71,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tidwall/redcon v0.9.0 h1:tiT9DLAoohsdNaFg9Si5dRsv9+FjvZYnhMOEtSFwBqA=
github.com/tidwall/redcon v0.9.0/go.mod h1:bdYBm4rlcWpst2XMwKVzWDF9CoUxEbUmM7CQrKeOZas=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

View File

@@ -1,14 +1,15 @@
package bitcask
package internal
import (
"errors"
"fmt"
"os"
"path/filepath"
"time"
"sync"
pb "github.com/prologic/bitcask/proto"
"github.com/prologic/bitcask/streampb"
"github.com/pkg/errors"
pb "github.com/prologic/bitcask/internal/proto"
"github.com/prologic/bitcask/internal/streampb"
)
const (
@@ -20,11 +21,14 @@ var (
)
type Datafile struct {
id int
r *os.File
w *os.File
dec *streampb.Decoder
enc *streampb.Encoder
sync.RWMutex
id int
r *os.File
w *os.File
offset int64
dec *streampb.Decoder
enc *streampb.Encoder
}
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
@@ -47,19 +51,30 @@ func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
if err != nil {
return nil, err
}
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
offset := stat.Size()
dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w)
return &Datafile{
id: id,
r: r,
w: w,
dec: dec,
enc: enc,
id: id,
r: r,
w: w,
offset: offset,
dec: dec,
enc: enc,
}, nil
}
func (df *Datafile) FileID() int {
return df.id
}
func (df *Datafile) Name() string {
return df.r.Name()
}
@@ -84,35 +99,28 @@ func (df *Datafile) Sync() error {
}
func (df *Datafile) Size() (int64, error) {
var (
stat os.FileInfo
err error
)
if df.w == nil {
stat, err = df.r.Stat()
} else {
stat, err = df.w.Stat()
}
if err != nil {
return -1, err
}
return stat.Size(), nil
df.RLock()
defer df.RUnlock()
return df.offset, nil
}
func (df *Datafile) Read() (pb.Entry, error) {
var e pb.Entry
func (df *Datafile) Read() (e pb.Entry, err error) {
df.Lock()
defer df.Unlock()
return e, df.dec.Decode(&e)
}
func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) {
df.Lock()
defer df.Unlock()
_, err = df.r.Seek(index, os.SEEK_SET)
if err != nil {
return
}
return df.Read()
return e, df.dec.Decode(&e)
}
func (df *Datafile) Write(e pb.Entry) (int64, error) {
@@ -120,20 +128,16 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
return -1, ErrReadonly
}
stat, err := df.w.Stat()
df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(&e)
if err != nil {
return -1, err
}
df.offset += n
index := stat.Size()
e.Index = index
e.Timestamp = time.Now().Unix()
err = df.enc.Encode(&e)
if err != nil {
return -1, err
}
return index, nil
return e.Offset, nil
}

17
internal/entry.go Normal file
View File

@@ -0,0 +1,17 @@
package internal
import (
"hash/crc32"
pb "github.com/prologic/bitcask/internal/proto"
)
func NewEntry(key string, value []byte) pb.Entry {
checksum := crc32.ChecksumIEEE(value)
return pb.Entry{
Checksum: checksum,
Key: key,
Value: value,
}
}

View File

@@ -1,4 +1,4 @@
package bitcask
package internal
import (
"bytes"
@@ -9,9 +9,8 @@ import (
)
type Item struct {
FileID int
Index int64
Timestamp int64
FileID int
Offset int64
}
type Keydir struct {
@@ -25,15 +24,17 @@ func NewKeydir() *Keydir {
}
}
func (k *Keydir) Add(key string, fileid int, index, timestamp int64) {
k.Lock()
defer k.Unlock()
k.kv[key] = Item{
FileID: fileid,
Index: index,
Timestamp: timestamp,
func (k *Keydir) Add(key string, fileid int, offset int64) Item {
item := Item{
FileID: fileid,
Offset: offset,
}
k.Lock()
k.kv[key] = item
k.Unlock()
return item
}
func (k *Keydir) Get(key string) (Item, bool) {

View File

@@ -19,11 +19,10 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct {
CRC uint32 `protobuf:"varint,1,opt,name=CRC,proto3" json:"CRC,omitempty"`
Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Index int64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"`
Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
Timestamp int64 `protobuf:"varint,5,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@@ -33,7 +32,7 @@ func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_4f5906245d08394f, []int{0}
return fileDescriptor_entry_3e91842c99935ae2, []int{0}
}
func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b)
@@ -53,9 +52,9 @@ func (m *Entry) XXX_DiscardUnknown() {
var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetCRC() uint32 {
func (m *Entry) GetChecksum() uint32 {
if m != nil {
return m.CRC
return m.Checksum
}
return 0
}
@@ -67,9 +66,9 @@ func (m *Entry) GetKey() string {
return ""
}
func (m *Entry) GetIndex() int64 {
func (m *Entry) GetOffset() int64 {
if m != nil {
return m.Index
return m.Offset
}
return 0
}
@@ -81,28 +80,20 @@ func (m *Entry) GetValue() []byte {
return nil
}
func (m *Entry) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry")
}
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_4f5906245d08394f) }
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_3e91842c99935ae2) }
var fileDescriptor_entry_4f5906245d08394f = []byte{
// 134 bytes of a gzipped FileDescriptorProto
var fileDescriptor_entry_3e91842c99935ae2 = []byte{
// 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xa5, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x01, 0x2e, 0x66, 0xe7, 0x20, 0x67, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xde, 0x20,
0x10, 0x13, 0x24, 0xe2, 0x9d, 0x5a, 0x29, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a,
0x89, 0x70, 0xb1, 0x7a, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0x30, 0x07, 0x41,
0x38, 0x20, 0xd1, 0xb0, 0xc4, 0x9c, 0xd2, 0x54, 0x09, 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x08,
0x47, 0x48, 0x86, 0x8b, 0x33, 0x24, 0x33, 0x37, 0xb5, 0xb8, 0x24, 0x31, 0xb7, 0x40, 0x82, 0x15,
0xac, 0x1e, 0x21, 0x90, 0xc4, 0x06, 0xb6, 0xdd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x07, 0x99,
0x47, 0xb9, 0x93, 0x00, 0x00, 0x00,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
}

View File

@@ -3,9 +3,8 @@ syntax = "proto3";
package proto;
message Entry {
uint32 CRC = 1;
uint32 Checksum = 1;
string Key = 2;
int64 Index = 3;
int64 Offset = 3;
bytes Value = 4;
int64 Timestamp = 5;
}

View File

@@ -1,6 +1,7 @@
package streampb
import (
"bufio"
"encoding/binary"
"io"
@@ -16,57 +17,64 @@ const (
// NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: w, prefixBuf: make([]byte, prefixSize)}
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it.
type Encoder struct {
w io.Writer
prefixBuf []byte
w *bufio.Writer
}
// Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) error {
func (e *Encoder) Encode(msg proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg)
if err != nil {
return err
return 0, err
}
binary.BigEndian.PutUint64(e.prefixBuf, uint64(len(buf)))
binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(e.prefixBuf); err != nil {
return errors.Wrap(err, "failed writing length prefix")
if _, err := e.w.Write(prefixBuf); err != nil {
return 0, errors.Wrap(err, "failed writing length prefix")
}
_, err = e.w.Write(buf)
return errors.Wrap(err, "failed writing marshaled data")
n, err := e.w.Write(buf)
if err != nil {
return 0, errors.Wrap(err, "failed writing marshaled data")
}
if err = e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(n + prefixSize), nil
}
// NewDecoder creates a streaming protobuf decoder.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{
r: r,
prefixBuf: make([]byte, prefixSize),
}
return &Decoder{r: r}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// proto decodings on it.
type Decoder struct {
r io.Reader
prefixBuf []byte
r io.Reader
}
// Decode takes a proto.Message and unmarshals the next payload in the
// underlying io.Reader. It returns an EOF when it's done.
func (d *Decoder) Decode(v proto.Message) error {
_, err := io.ReadFull(d.r, d.prefixBuf)
prefixBuf := make([]byte, prefixSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return err
}
n := binary.BigEndian.Uint64(d.prefixBuf)
n := binary.BigEndian.Uint64(prefixBuf)
buf := make([]byte, n)

36
internal/utils.go Normal file
View File

@@ -0,0 +1,36 @@
package internal
import (
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
)
func GetDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {
return nil, err
}
sort.Strings(fns)
return fns, nil
}
func ParseIds(fns []string) ([]int, error) {
var ids []int
for _, fn := range fns {
fn = filepath.Base(fn)
ext := filepath.Ext(fn)
if ext != ".data" {
continue
}
id, err := strconv.ParseInt(strings.TrimSuffix(fn, ext), 10, 32)
if err != nil {
return nil, err
}
ids = append(ids, int(id))
}
sort.Ints(ids)
return ids, nil
}

View File

@@ -1,4 +1,4 @@
package bitcask
package internal
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package bitcask
package internal
import (
"fmt"

47
options.go Normal file
View File

@@ -0,0 +1,47 @@
package bitcask
const (
DefaultMaxDatafileSize = 1 << 20 // 1MB
DefaultMaxKeySize = 64 // 64 bytes
DefaultMaxValueSize = 1 << 16 // 65KB
)
// Option ...
type Option option
type option func(*config) error
type config struct {
MaxDatafileSize int
MaxKeySize int
MaxValueSize int
}
func NewDefaultConfig() *config {
return &config{
MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize,
}
}
func WithMaxDatafileSize(size int) option {
return func(cfg *config) error {
cfg.MaxDatafileSize = size
return nil
}
}
func WithMaxKeySize(size int) option {
return func(cfg *config) error {
cfg.MaxKeySize = size
return nil
}
}
func WithMaxValueSize(size int) option {
return func(cfg *config) error {
cfg.MaxValueSize = size
return nil
}
}