Compare commits

...

23 Commits

Author SHA1 Message Date
James Mills
67ab944db7 Refactored some internals and removed timestamp field (unsure why it was needed in the original paper) 2019-03-16 12:40:24 +10:00
James Mills
cb00b11dd7 Increase no. of goroutines to catch more race conditions in tests 2019-03-16 12:33:07 +10:00
James Mills
e9c858d43f Add CRC Checksum checks on reading values back 2019-03-16 12:16:23 +10:00
James Mills
120e854444 Improved error messages 2019-03-16 11:47:22 +10:00
James Mills
d2f44d1513 Fix a race condition + Use my fork of trie 2019-03-16 11:22:55 +10:00
James Mills
c0f178c4f7 Improved read/write performance by another ~2x by not calling Stat() on every read/write 2019-03-16 08:15:07 +10:00
James Mills
2585222830 Improve write performance by ~33% to 80,000 writes/sec buf reducing syscalls and using a bufio.Writer 2019-03-16 07:41:37 +10:00
James Mills
3f1d6635c4 Add prefix scan for keys using a Trie 2019-03-15 23:48:50 +10:00
James Mills
67840ffb57 Call Close() at end of sub-commands 2019-03-14 21:50:41 +10:00
James Mills
9f0a357ca0 Remove lock file on Close() 2019-03-14 21:50:23 +10:00
James Mills
52b6c74a21 Fixed compile error in CLI 2019-03-14 21:33:40 +10:00
James Mills
d24a01797a Added WithMaxKeySize() and WithMaxValueSize() options 2019-03-14 21:31:23 +10:00
James Mills
bc8f6c6718 Change locking error message 2019-03-14 21:31:01 +10:00
James Mills
b6c212d60c Refactored option handling 2019-03-14 21:24:31 +10:00
James Mills
3f1b90eb23 Update README.md 2019-03-14 18:18:57 +10:00
James Mills
71a42800fe Improved benchmark test suite for various key/value sizes 2019-03-14 18:17:20 +10:00
James Mills
3b9627aeb8 Fix concurrent read bug 2019-03-14 17:58:06 +10:00
James Mills
e0c4c4fdae Fix concurrent write bug with multiple goroutines writing to the to the active datafile 2019-03-14 17:58:06 +10:00
James Mills
fb50eb2f82 Update README.md 2019-03-14 15:36:37 +10:00
James Mills
fb2335e3c1 Fixed tests 2019-03-14 07:46:59 +10:00
James Mills
9a8aca55ba Updated README 2019-03-13 21:40:43 +10:00
James Mills
32b782b229 Fixed arg handling in bitcaskd 2019-03-13 21:39:23 +10:00
James Mills
146f777683 Fixed versioning during build time 2019-03-13 21:31:21 +10:00
20 changed files with 649 additions and 193 deletions

View File

@@ -1,6 +1,7 @@
.PHONY: dev build generate install image release profile bench test clean .PHONY: dev build generate install image release profile bench test clean
CGO_ENABLED=0 CGO_ENABLED=0
VERSION=$(shell git describe --abbrev=0 --tags)
COMMIT=$(shell git rev-parse --short HEAD) COMMIT=$(shell git rev-parse --short HEAD)
all: dev all: dev
@@ -12,11 +13,11 @@ dev: build
build: clean generate build: clean generate
@go build \ @go build \
-tags "netgo static_build" -installsuffix netgo \ -tags "netgo static_build" -installsuffix netgo \
-ldflags "-w -X $(shell go list)/.Commit=$(COMMIT)" \ -ldflags "-w -X $(shell go list).Version=$(VERSION) -X $(shell go list).Commit=$(COMMIT)" \
./cmd/bitcask/... ./cmd/bitcask/...
@go build \ @go build \
-tags "netgo static_build" -installsuffix netgo \ -tags "netgo static_build" -installsuffix netgo \
-ldflags "-w -X $(shell go list)/.Commit=$(COMMIT)" \ -ldflags "-w -X $(shell go list).Version=$(VERSION) -X $(shell go list).Commit=$(COMMIT)" \
./cmd/bitcaskd/... ./cmd/bitcaskd/...
generate: generate:

View File

@@ -12,6 +12,7 @@ A Bitcask (LSM+WAL) Key/Value Store written in Go.
* Embeddable * Embeddable
* Builtin CLI * Builtin CLI
* Builtin Redis-compatible server
* Predictable read/write performance * Predictable read/write performance
* Low latecny * Low latecny
* High throughput (See: [Performance](README.md#Performance) * High throughput (See: [Performance](README.md#Performance)
@@ -57,6 +58,38 @@ $ bitcask -p /tmp/db get Hello
World World
``` ```
## Usage (server)
There is also a builtin very simple Redis-compatible server called `bitcaskd`:
```#!bash
$ ./bitcaskd ./tmp
INFO[0000] starting bitcaskd v0.0.7@146f777 bind=":6379" path=./tmp
```
Example session:
```
$ telnet localhost 6379
Trying ::1...
Connected to localhost.
Escape character is '^]'.
SET foo bar
+OK
GET foo
$3
bar
DEL foo
:1
GET foo
$-1
PING
+PONG
QUIT
+OK
Connection closed by foreign host.
```
## Performance ## Performance
Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7: Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
@@ -64,12 +97,35 @@ Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
``` ```
$ make bench $ make bench
... ...
BenchmarkGet-4 300000 5065 ns/op 144 B/op 4 allocs/op BenchmarkGet/128B-4 300000 5178 ns/op 400 B/op 5 allocs/op
BenchmarkPut-4 100000 14640 ns/op 699 B/op 7 allocs/op BenchmarkGet/256B-4 300000 5273 ns/op 656 B/op 5 allocs/op
BenchmarkGet/512B-4 200000 5368 ns/op 1200 B/op 5 allocs/op
BenchmarkGet/1K-4 200000 5800 ns/op 2288 B/op 5 allocs/op
BenchmarkGet/2K-4 200000 6766 ns/op 4464 B/op 5 allocs/op
BenchmarkGet/4K-4 200000 7857 ns/op 9072 B/op 5 allocs/op
BenchmarkGet/8K-4 200000 9538 ns/op 17776 B/op 5 allocs/op
BenchmarkGet/16K-4 100000 13188 ns/op 34928 B/op 5 allocs/op
BenchmarkGet/32K-4 100000 21620 ns/op 73840 B/op 5 allocs/op
BenchmarkPut/128B-4 200000 7875 ns/op 409 B/op 6 allocs/op
BenchmarkPut/256B-4 200000 8712 ns/op 538 B/op 6 allocs/op
BenchmarkPut/512B-4 200000 9832 ns/op 829 B/op 6 allocs/op
BenchmarkPut/1K-4 100000 13105 ns/op 1410 B/op 6 allocs/op
BenchmarkPut/2K-4 100000 18601 ns/op 2572 B/op 6 allocs/op
BenchmarkPut/4K-4 50000 36631 ns/op 5151 B/op 6 allocs/op
BenchmarkPut/8K-4 30000 56128 ns/op 9798 B/op 6 allocs/op
BenchmarkPut/16K-4 20000 83209 ns/op 18834 B/op 6 allocs/op
BenchmarkPut/32K-4 10000 135899 ns/op 41517 B/op 6 allocs/op
BenchmarkScan-4 1000000 1851 ns/op 493 B/op 25 allocs/op
``` ```
* ~180,000 reads/sec For 128B values:
* ~60,000 writes/sec
* ~200,000 reads/sec
* ~130,000 writes/sec
The full benchmark above shows linear performance as you increase key/value sizes.
## License ## License

View File

@@ -1,36 +1,27 @@
package bitcask package bitcask
import ( import (
"errors"
"fmt" "fmt"
"hash/crc32"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strconv"
"strings" "strings"
"time"
"github.com/gofrs/flock" "github.com/gofrs/flock"
) "github.com/prologic/trie"
const (
DefaultMaxDatafileSize = 1 << 20 // 1MB
)
var (
ErrKeyNotFound = errors.New("error: key not found")
ErrCannotAcquireLock = errors.New("error: cannot acquire lock")
) )
type Bitcask struct { type Bitcask struct {
*flock.Flock *flock.Flock
opts Options
path string path string
curr *Datafile curr *Datafile
keydir *Keydir keydir *Keydir
datafiles []*Datafile datafiles []*Datafile
trie *trie.Trie
maxDatafileSize int64 maxDatafileSize int64
} }
@@ -38,6 +29,7 @@ type Bitcask struct {
func (b *Bitcask) Close() error { func (b *Bitcask) Close() error {
defer func() { defer func() {
b.Flock.Unlock() b.Flock.Unlock()
os.Remove(b.Flock.Path())
}() }()
for _, df := range b.datafiles { for _, df := range b.datafiles {
@@ -55,7 +47,7 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
item, ok := b.keydir.Get(key) item, ok := b.keydir.Get(key)
if !ok { if !ok {
return nil, ErrKeyNotFound return nil, fmt.Errorf("error: key not found %s", key)
} }
if item.FileID == b.curr.id { if item.FileID == b.curr.id {
@@ -64,21 +56,34 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
df = b.datafiles[item.FileID] df = b.datafiles[item.FileID]
} }
e, err := df.ReadAt(item.Index) e, err := df.ReadAt(item.Offset)
if err != nil { if err != nil {
return nil, err return nil, err
} }
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return nil, fmt.Errorf("error: checksum falied %s %d != %d", key, e.Checksum, checksum)
}
return e.Value, nil return e.Value, nil
} }
func (b *Bitcask) Put(key string, value []byte) error { func (b *Bitcask) Put(key string, value []byte) error {
index, err := b.put(key, value) if len(key) > b.opts.MaxKeySize {
return fmt.Errorf("error: key too large %d > %d", len(key), b.opts.MaxKeySize)
}
if len(value) > b.opts.MaxValueSize {
return fmt.Errorf("error: value too large %d > %d", len(value), b.opts.MaxValueSize)
}
offset, err := b.put(key, value)
if err != nil { if err != nil {
return err return err
} }
b.keydir.Add(key, b.curr.id, index, time.Now().Unix()) item := b.keydir.Add(key, b.curr.id, offset)
b.trie.Add(key, item)
return nil return nil
} }
@@ -90,10 +95,21 @@ func (b *Bitcask) Delete(key string) error {
} }
b.keydir.Delete(key) b.keydir.Delete(key)
b.trie.Remove(key)
return nil return nil
} }
func (b *Bitcask) Scan(prefix string, f func(key string) error) error {
keys := b.trie.PrefixSearch(prefix)
for _, key := range keys {
if err := f(key); err != nil {
return err
}
}
return nil
}
func (b *Bitcask) Fold(f func(key string) error) error { func (b *Bitcask) Fold(f func(key string) error) error {
for key := range b.keydir.Keys() { for key := range b.keydir.Keys() {
if err := f(key); err != nil { if err := f(key); err != nil {
@@ -135,39 +151,6 @@ func (b *Bitcask) setMaxDatafileSize(size int64) error {
return nil return nil
} }
func WithMaxDatafileSize(size int64) func(*Bitcask) error {
return func(b *Bitcask) error {
return b.setMaxDatafileSize(size)
}
}
func getDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {
return nil, err
}
sort.Strings(fns)
return fns, nil
}
func parseIds(fns []string) ([]int, error) {
var ids []int
for _, fn := range fns {
fn = filepath.Base(fn)
ext := filepath.Ext(fn)
if ext != ".data" {
continue
}
id, err := strconv.ParseInt(strings.TrimSuffix(fn, ext), 10, 32)
if err != nil {
return nil, err
}
ids = append(ids, int(id))
}
sort.Ints(ids)
return ids, nil
}
func Merge(path string, force bool) error { func Merge(path string, force bool) error {
fns, err := getDatafiles(path) fns, err := getDatafiles(path)
if err != nil { if err != nil {
@@ -227,7 +210,7 @@ func Merge(path string, force bool) error {
continue continue
} }
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) keydir.Add(e.Key, ids[i], e.Offset)
} }
tempdf, err := NewDatafile(temp, id, false) tempdf, err := NewDatafile(temp, id, false)
@@ -238,7 +221,7 @@ func Merge(path string, force bool) error {
for key := range keydir.Keys() { for key := range keydir.Keys() {
item, _ := keydir.Get(key) item, _ := keydir.Get(key)
e, err := df.ReadAt(item.Index) e, err := df.ReadAt(item.Offset)
if err != nil { if err != nil {
return err return err
} }
@@ -294,9 +277,11 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
return nil, err return nil, err
} }
keydir := NewKeydir()
var datafiles []*Datafile var datafiles []*Datafile
keydir := NewKeydir()
trie := trie.New()
for i, fn := range fns { for i, fn := range fns {
df, err := NewDatafile(path, ids[i], true) df, err := NewDatafile(path, ids[i], true)
if err != nil { if err != nil {
@@ -318,7 +303,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
for key := range hint.Keys() { for key := range hint.Keys() {
item, _ := hint.Get(key) item, _ := hint.Get(key)
keydir.Add(key, item.FileID, item.Index, item.Timestamp) _ = keydir.Add(key, item.FileID, item.Offset)
trie.Add(key, item)
} }
} else { } else {
for { for {
@@ -336,7 +322,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
continue continue
} }
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) item := keydir.Add(e.Key, ids[i], e.Offset)
trie.Add(e.Key, item)
} }
} }
} }
@@ -353,10 +340,12 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
bitcask := &Bitcask{ bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")), Flock: flock.New(filepath.Join(path, "lock")),
opts: NewDefaultOptions(),
path: path, path: path,
curr: curr, curr: curr,
keydir: keydir, keydir: keydir,
datafiles: datafiles, datafiles: datafiles,
trie: trie,
maxDatafileSize: DefaultMaxDatafileSize, maxDatafileSize: DefaultMaxDatafileSize,
} }
@@ -374,7 +363,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
} }
if !locked { if !locked {
return nil, ErrCannotAcquireLock return nil, fmt.Errorf("error: database locked %s", path)
} }
return bitcask, nil return bitcask, nil

View File

@@ -3,7 +3,10 @@ package bitcask
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"reflect"
"sort"
"strings" "strings"
"sync"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -42,7 +45,7 @@ func TestAll(t *testing.T) {
assert.NoError(err) assert.NoError(err)
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal(err.Error(), "error: key not found") assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Sync", func(t *testing.T) { t.Run("Sync", func(t *testing.T) {
@@ -89,7 +92,7 @@ func TestDeletedKeys(t *testing.T) {
assert.NoError(err) assert.NoError(err)
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal("error: key not found", err.Error()) assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Sync", func(t *testing.T) { t.Run("Sync", func(t *testing.T) {
@@ -117,7 +120,7 @@ func TestDeletedKeys(t *testing.T) {
t.Run("Get", func(t *testing.T) { t.Run("Get", func(t *testing.T) {
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal("error: key not found", err.Error()) assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Close", func(t *testing.T) { t.Run("Close", func(t *testing.T) {
@@ -127,6 +130,50 @@ func TestDeletedKeys(t *testing.T) {
}) })
} }
func TestMaxKeySize(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxKeySize(16))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
key := strings.Repeat(" ", 17)
value := []byte("foobar")
err = db.Put(key, value)
assert.Error(err)
assert.Equal("error: key too large 17 > 16", err.Error())
})
}
func TestMaxValueSize(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxValueSize(16))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
key := "foo"
value := []byte(strings.Repeat(" ", 17))
err = db.Put(key, value)
assert.Error(err)
assert.Equal("error: value too large 17 > 16", err.Error())
})
}
func TestMerge(t *testing.T) { func TestMerge(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
@@ -140,7 +187,7 @@ func TestMerge(t *testing.T) {
) )
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, MaxDatafileSize(1024)) db, err = Open(testdir, WithMaxDatafileSize(1024))
assert.NoError(err) assert.NoError(err)
}) })
@@ -198,6 +245,136 @@ func TestMerge(t *testing.T) {
}) })
} }
func TestConcurrent(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
err = db.Put("foo", []byte("bar"))
assert.NoError(err)
})
})
t.Run("Concurrent", func(t *testing.T) {
t.Run("Put", func(t *testing.T) {
f := func(wg *sync.WaitGroup, x int) {
defer func() {
wg.Done()
}()
for i := 0; i <= 100; i++ {
if i%x == 0 {
key := fmt.Sprintf("k%d", i)
value := []byte(fmt.Sprintf("v%d", i))
err := db.Put(key, value)
assert.NoError(err)
}
}
}
wg := &sync.WaitGroup{}
go f(wg, 2)
go f(wg, 3)
go f(wg, 5)
wg.Add(3)
wg.Wait()
})
t.Run("Get", func(t *testing.T) {
f := func(wg *sync.WaitGroup, N int) {
defer func() {
wg.Done()
}()
for i := 0; i <= N; i++ {
value, err := db.Get("foo")
assert.NoError(err)
assert.Equal([]byte("bar"), value)
}
}
wg := &sync.WaitGroup{}
go f(wg, 100)
go f(wg, 100)
go f(wg, 100)
wg.Add(3)
wg.Wait()
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestScan(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte("foo"),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err = db.Put(k, v)
assert.NoError(err)
}
})
})
t.Run("Scan", func(t *testing.T) {
var (
vals []string
expected = []string{
"foo",
"fooz ball",
"pizza",
}
)
err = db.Scan("fo", func(key string) error {
val, err := db.Get(key)
assert.NoError(err)
vals = append(vals, string(val))
return nil
})
sort.Strings(vals)
assert.Equal(expected, vals)
})
}
func TestLocking(t *testing.T) { func TestLocking(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
@@ -210,7 +387,12 @@ func TestLocking(t *testing.T) {
_, err = Open(testdir) _, err = Open(testdir)
assert.Error(err) assert.Error(err)
assert.Equal("error: cannot acquire lock", err.Error()) assert.Equal(fmt.Sprintf("error: database locked %s", testdir), err.Error())
}
type benchmarkTestCase struct {
name string
size int
} }
func BenchmarkGet(b *testing.B) { func BenchmarkGet(b *testing.B) {
@@ -225,20 +407,39 @@ func BenchmarkGet(b *testing.B) {
} }
defer db.Close() defer db.Close()
err = db.Put("foo", []byte("bar")) tests := []benchmarkTestCase{
if err != nil { {"128B", 128},
b.Fatal(err) {"256B", 256},
{"512B", 512},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
} }
b.ResetTimer() for _, tt := range tests {
for i := 0; i < b.N; i++ { b.Run(tt.name, func(b *testing.B) {
val, err := db.Get("foo") key := "foo"
if err != nil { value := []byte(strings.Repeat(" ", tt.size))
b.Fatal(err)
} err = db.Put(key, value)
if string(val) != "bar" { if err != nil {
b.Errorf("expected val=bar got=%s", val) b.Fatal(err)
} }
b.ResetTimer()
for i := 0; i < b.N; i++ {
val, err := db.Get(key)
if err != nil {
b.Fatal(err)
}
if string(val) != string(value) {
b.Errorf("unexpected value")
}
}
})
} }
} }
@@ -254,11 +455,73 @@ func BenchmarkPut(b *testing.B) {
} }
defer db.Close() defer db.Close()
b.ResetTimer() tests := []benchmarkTestCase{
for i := 0; i < b.N; i++ { {"128B", 128},
err := db.Put(fmt.Sprintf("key%d", i), []byte("bar")) {"256B", 256},
{"512B", 512},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
key := "foo"
value := []byte(strings.Repeat(" ", tt.size))
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := db.Put(key, value)
if err != nil {
b.Fatal(err)
}
}
})
}
}
func BenchmarkScan(b *testing.B) {
testdir, err := ioutil.TempDir("", "bitcask")
if err != nil {
b.Fatal(err)
}
db, err := Open(testdir)
if err != nil {
b.Fatal(err)
}
defer db.Close()
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte("foo"),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err := db.Put(k, v)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }
var expected = []string{"foo", "food", "fooz"}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var keys []string
err = db.Scan("fo", func(key string) error {
keys = append(keys, key)
return nil
})
sort.Strings(keys)
if !reflect.DeepEqual(expected, keys) {
b.Fatal(fmt.Errorf("expected keys=#%v got=%#v", expected, keys))
}
}
} }

View File

@@ -35,6 +35,7 @@ func del(path, key string) int {
log.WithError(err).Error("error opening database") log.WithError(err).Error("error opening database")
return 1 return 1
} }
defer db.Close()
err = db.Delete(key) err = db.Delete(key)
if err != nil { if err != nil {

View File

@@ -36,6 +36,7 @@ func get(path, key string) int {
log.WithError(err).Error("error opening database") log.WithError(err).Error("error opening database")
return 1 return 1
} }
defer db.Close()
value, err := db.Get(key) value, err := db.Get(key)
if err != nil { if err != nil {

View File

@@ -34,6 +34,7 @@ func keys(path string) int {
log.WithError(err).Error("error opening database") log.WithError(err).Error("error opening database")
return 1 return 1
} }
defer db.Close()
err = db.Fold(func(key string) error { err = db.Fold(func(key string) error {
fmt.Printf("%s\n", key) fmt.Printf("%s\n", key)

60
cmd/bitcask/scan.go Normal file
View File

@@ -0,0 +1,60 @@
package main
import (
"fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
)
var scanCmd = &cobra.Command{
Use: "scan <prefix>",
Aliases: []string{"search", "find"},
Short: "Perform a prefis scan for keys",
Long: `This performa a prefix scan for keys starting with the given
prefix. This uses a Trie to search for matching keys and returns all matched
keys.`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
prefix := args[0]
os.Exit(scan(path, prefix))
},
}
func init() {
RootCmd.AddCommand(scanCmd)
}
func scan(path, prefix string) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
err = db.Scan(prefix, func(key string) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).Error("error reading key")
return err
}
fmt.Printf("%s\n", string(value))
log.WithField("key", key).WithField("value", value).Debug("key/value")
return nil
})
if err != nil {
log.WithError(err).Error("error scanning keys")
return 1
}
return 0
}

View File

@@ -47,6 +47,7 @@ func set(path, key string, value io.Reader) int {
log.WithError(err).Error("error opening database") log.WithError(err).Error("error opening database")
return 1 return 1
} }
defer db.Close()
data, err := ioutil.ReadAll(value) data, err := ioutil.ReadAll(value)
if err != nil { if err != nil {

View File

@@ -16,7 +16,7 @@ var (
bind string bind string
debug bool debug bool
version bool version bool
maxDatafileSize int64 maxDatafileSize int
) )
func init() { func init() {
@@ -30,7 +30,7 @@ func init() {
flag.StringVarP(&bind, "bind", "b", ":6379", "interface and port to bind to") flag.StringVarP(&bind, "bind", "b", ":6379", "interface and port to bind to")
flag.Int64Var(&maxDatafileSize, "max-datafile-size", 1<<20, "maximum datafile size in bytes") flag.IntVar(&maxDatafileSize, "max-datafile-size", 1<<20, "maximum datafile size in bytes")
} }
func main() { func main() {
@@ -75,8 +75,8 @@ func main() {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return return
} }
key := string(cmd.Args[0]) key := string(cmd.Args[1])
value := cmd.Args[1] value := cmd.Args[2]
err = db.Put(key, value) err = db.Put(key, value)
if err != nil { if err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err)) conn.WriteString(fmt.Sprintf("ERR: %s", err))
@@ -88,7 +88,7 @@ func main() {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return return
} }
key := string(cmd.Args[0]) key := string(cmd.Args[1])
value, err := db.Get(key) value, err := db.Get(key)
if err != nil { if err != nil {
conn.WriteNull() conn.WriteNull()
@@ -100,7 +100,7 @@ func main() {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return return
} }
key := string(cmd.Args[0]) key := string(cmd.Args[1])
err := db.Delete(key) err := db.Delete(key)
if err != nil { if err != nil {
conn.WriteInt(0) conn.WriteInt(0)

View File

@@ -1,11 +1,12 @@
package bitcask package bitcask
import ( import (
"errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"time" "sync"
"github.com/pkg/errors"
pb "github.com/prologic/bitcask/proto" pb "github.com/prologic/bitcask/proto"
"github.com/prologic/bitcask/streampb" "github.com/prologic/bitcask/streampb"
@@ -20,11 +21,14 @@ var (
) )
type Datafile struct { type Datafile struct {
id int sync.RWMutex
r *os.File
w *os.File id int
dec *streampb.Decoder r *os.File
enc *streampb.Encoder w *os.File
offset int64
dec *streampb.Decoder
enc *streampb.Encoder
} }
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) { func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
@@ -47,16 +51,23 @@ func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
offset := stat.Size()
dec := streampb.NewDecoder(r) dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w) enc := streampb.NewEncoder(w)
return &Datafile{ return &Datafile{
id: id, id: id,
r: r, r: r,
w: w, w: w,
dec: dec, offset: offset,
enc: enc, dec: dec,
enc: enc,
}, nil }, nil
} }
@@ -84,35 +95,28 @@ func (df *Datafile) Sync() error {
} }
func (df *Datafile) Size() (int64, error) { func (df *Datafile) Size() (int64, error) {
var ( df.RLock()
stat os.FileInfo defer df.RUnlock()
err error return df.offset, nil
)
if df.w == nil {
stat, err = df.r.Stat()
} else {
stat, err = df.w.Stat()
}
if err != nil {
return -1, err
}
return stat.Size(), nil
} }
func (df *Datafile) Read() (pb.Entry, error) { func (df *Datafile) Read() (e pb.Entry, err error) {
var e pb.Entry df.Lock()
defer df.Unlock()
return e, df.dec.Decode(&e) return e, df.dec.Decode(&e)
} }
func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) { func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) {
df.Lock()
defer df.Unlock()
_, err = df.r.Seek(index, os.SEEK_SET) _, err = df.r.Seek(index, os.SEEK_SET)
if err != nil { if err != nil {
return return
} }
return df.Read()
return e, df.dec.Decode(&e)
} }
func (df *Datafile) Write(e pb.Entry) (int64, error) { func (df *Datafile) Write(e pb.Entry) (int64, error) {
@@ -120,20 +124,16 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
return -1, ErrReadonly return -1, ErrReadonly
} }
stat, err := df.w.Stat() df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(&e)
if err != nil { if err != nil {
return -1, err return -1, err
} }
df.offset += n
index := stat.Size() return e.Offset, nil
e.Index = index
e.Timestamp = time.Now().Unix()
err = df.enc.Encode(&e)
if err != nil {
return -1, err
}
return index, nil
} }

View File

@@ -7,11 +7,11 @@ import (
) )
func NewEntry(key string, value []byte) pb.Entry { func NewEntry(key string, value []byte) pb.Entry {
crc := crc32.ChecksumIEEE(value) checksum := crc32.ChecksumIEEE(value)
return pb.Entry{ return pb.Entry{
CRC: crc, Checksum: checksum,
Key: key, Key: key,
Value: value, Value: value,
} }
} }

2
go.mod
View File

@@ -1,6 +1,7 @@
module github.com/prologic/bitcask module github.com/prologic/bitcask
require ( require (
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7
github.com/gofrs/flock v0.7.1 github.com/gofrs/flock v0.7.1
github.com/gogo/protobuf v1.2.1 github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.2.0 github.com/golang/protobuf v1.2.0
@@ -9,6 +10,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
github.com/prologic/msgbus v0.1.1 github.com/prologic/msgbus v0.1.1
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705
github.com/prometheus/client_golang v0.9.2 // indirect github.com/prometheus/client_golang v0.9.2 // indirect
github.com/sirupsen/logrus v1.3.0 github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3 github.com/spf13/cobra v0.0.3

4
go.sum
View File

@@ -8,6 +8,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7 h1:Cab9yoTQh1TxObKfis1DzZ6vFLK5kbeenMjRES/UE3o=
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc= github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
@@ -40,6 +42,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk= github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705 h1:2J+cSlAeECj0lfMKSmM7n5OlIio+yLovaKLZJzwLc6U=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705/go.mod h1:LFuDmpHJGmciXd8Rl5YMhVlLMps9gz2GtYLzwxrFhzs=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=

View File

@@ -9,9 +9,8 @@ import (
) )
type Item struct { type Item struct {
FileID int FileID int
Index int64 Offset int64
Timestamp int64
} }
type Keydir struct { type Keydir struct {
@@ -25,15 +24,17 @@ func NewKeydir() *Keydir {
} }
} }
func (k *Keydir) Add(key string, fileid int, index, timestamp int64) { func (k *Keydir) Add(key string, fileid int, offset int64) Item {
k.Lock() item := Item{
defer k.Unlock() FileID: fileid,
Offset: offset,
k.kv[key] = Item{
FileID: fileid,
Index: index,
Timestamp: timestamp,
} }
k.Lock()
k.kv[key] = item
k.Unlock()
return item
} }
func (k *Keydir) Get(key string) (Item, bool) { func (k *Keydir) Get(key string) (Item, bool) {

42
options.go Normal file
View File

@@ -0,0 +1,42 @@
package bitcask
const (
DefaultMaxDatafileSize = 1 << 20 // 1MB
DefaultMaxKeySize = 64 // 64 bytes
DefaultMaxValueSize = 1 << 16 // 65KB
)
type Options struct {
MaxDatafileSize int
MaxKeySize int
MaxValueSize int
}
func NewDefaultOptions() Options {
return Options{
MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize,
}
}
func WithMaxDatafileSize(size int) func(*Bitcask) error {
return func(b *Bitcask) error {
b.opts.MaxDatafileSize = size
return nil
}
}
func WithMaxKeySize(size int) func(*Bitcask) error {
return func(b *Bitcask) error {
b.opts.MaxKeySize = size
return nil
}
}
func WithMaxValueSize(size int) func(*Bitcask) error {
return func(b *Bitcask) error {
b.opts.MaxValueSize = size
return nil
}
}

View File

@@ -19,11 +19,10 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct { type Entry struct {
CRC uint32 `protobuf:"varint,1,opt,name=CRC,proto3" json:"CRC,omitempty"` Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Index int64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"` Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"` Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
Timestamp int64 `protobuf:"varint,5,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@@ -33,7 +32,7 @@ func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) } func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {} func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) { func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_4f5906245d08394f, []int{0} return fileDescriptor_entry_3e91842c99935ae2, []int{0}
} }
func (m *Entry) XXX_Unmarshal(b []byte) error { func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b) return xxx_messageInfo_Entry.Unmarshal(m, b)
@@ -53,9 +52,9 @@ func (m *Entry) XXX_DiscardUnknown() {
var xxx_messageInfo_Entry proto.InternalMessageInfo var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetCRC() uint32 { func (m *Entry) GetChecksum() uint32 {
if m != nil { if m != nil {
return m.CRC return m.Checksum
} }
return 0 return 0
} }
@@ -67,9 +66,9 @@ func (m *Entry) GetKey() string {
return "" return ""
} }
func (m *Entry) GetIndex() int64 { func (m *Entry) GetOffset() int64 {
if m != nil { if m != nil {
return m.Index return m.Offset
} }
return 0 return 0
} }
@@ -81,28 +80,20 @@ func (m *Entry) GetValue() []byte {
return nil return nil
} }
func (m *Entry) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func init() { func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry") proto.RegisterType((*Entry)(nil), "proto.Entry")
} }
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_4f5906245d08394f) } func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_3e91842c99935ae2) }
var fileDescriptor_entry_4f5906245d08394f = []byte{ var fileDescriptor_entry_3e91842c99935ae2 = []byte{
// 134 bytes of a gzipped FileDescriptorProto // 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xa5, 0x5c, 0xac, 0xae, 0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x01, 0x2e, 0x66, 0xe7, 0x20, 0x67, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x10, 0x13, 0x24, 0xe2, 0x9d, 0x5a, 0x29, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x89, 0x70, 0xb1, 0x7a, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0x30, 0x07, 0x41, 0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x38, 0x20, 0xd1, 0xb0, 0xc4, 0x9c, 0xd2, 0x54, 0x09, 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x08, 0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x47, 0x48, 0x86, 0x8b, 0x33, 0x24, 0x33, 0x37, 0xb5, 0xb8, 0x24, 0x31, 0xb7, 0x40, 0x82, 0x15, 0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0xac, 0x1e, 0x21, 0x90, 0xc4, 0x06, 0xb6, 0xdd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x07, 0x99, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
0x47, 0xb9, 0x93, 0x00, 0x00, 0x00,
} }

View File

@@ -3,9 +3,8 @@ syntax = "proto3";
package proto; package proto;
message Entry { message Entry {
uint32 CRC = 1; uint32 Checksum = 1;
string Key = 2; string Key = 2;
int64 Index = 3; int64 Offset = 3;
bytes Value = 4; bytes Value = 4;
int64 Timestamp = 5;
} }

View File

@@ -1,6 +1,7 @@
package streampb package streampb
import ( import (
"bufio"
"encoding/binary" "encoding/binary"
"io" "io"
@@ -16,57 +17,64 @@ const (
// NewEncoder creates a streaming protobuf encoder. // NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder { func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: w, prefixBuf: make([]byte, prefixSize)} return &Encoder{w: bufio.NewWriter(w)}
} }
// Encoder wraps an underlying io.Writer and allows you to stream // Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it. // proto encodings on it.
type Encoder struct { type Encoder struct {
w io.Writer w *bufio.Writer
prefixBuf []byte
} }
// Encode takes any proto.Message and streams it to the underlying writer. // Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix. // Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) error { func (e *Encoder) Encode(msg proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg) buf, err := proto.Marshal(msg)
if err != nil { if err != nil {
return err return 0, err
} }
binary.BigEndian.PutUint64(e.prefixBuf, uint64(len(buf))) binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(e.prefixBuf); err != nil { if _, err := e.w.Write(prefixBuf); err != nil {
return errors.Wrap(err, "failed writing length prefix") return 0, errors.Wrap(err, "failed writing length prefix")
} }
_, err = e.w.Write(buf) n, err := e.w.Write(buf)
return errors.Wrap(err, "failed writing marshaled data") if err != nil {
return 0, errors.Wrap(err, "failed writing marshaled data")
}
if err = e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(n + prefixSize), nil
} }
// NewDecoder creates a streaming protobuf decoder. // NewDecoder creates a streaming protobuf decoder.
func NewDecoder(r io.Reader) *Decoder { func NewDecoder(r io.Reader) *Decoder {
return &Decoder{ return &Decoder{r: r}
r: r,
prefixBuf: make([]byte, prefixSize),
}
} }
// Decoder wraps an underlying io.Reader and allows you to stream // Decoder wraps an underlying io.Reader and allows you to stream
// proto decodings on it. // proto decodings on it.
type Decoder struct { type Decoder struct {
r io.Reader r io.Reader
prefixBuf []byte
} }
// Decode takes a proto.Message and unmarshals the next payload in the // Decode takes a proto.Message and unmarshals the next payload in the
// underlying io.Reader. It returns an EOF when it's done. // underlying io.Reader. It returns an EOF when it's done.
func (d *Decoder) Decode(v proto.Message) error { func (d *Decoder) Decode(v proto.Message) error {
_, err := io.ReadFull(d.r, d.prefixBuf) prefixBuf := make([]byte, prefixSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil { if err != nil {
return err return err
} }
n := binary.BigEndian.Uint64(d.prefixBuf) n := binary.BigEndian.Uint64(prefixBuf)
buf := make([]byte, n) buf := make([]byte, n)

36
utils.go Normal file
View File

@@ -0,0 +1,36 @@
package bitcask
import (
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
)
func getDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {
return nil, err
}
sort.Strings(fns)
return fns, nil
}
func parseIds(fns []string) ([]int, error) {
var ids []int
for _, fn := range fns {
fn = filepath.Base(fn)
ext := filepath.Ext(fn)
if ext != ".data" {
continue
}
id, err := strconv.ParseInt(strings.TrimSuffix(fn, ext), 10, 32)
if err != nil {
return nil, err
}
ids = append(ids, int(id))
}
sort.Ints(ids)
return ids, nil
}