Compare commits

..

12 Commits

21 changed files with 400 additions and 219 deletions

View File

@@ -13,11 +13,11 @@ dev: build
build: clean generate build: clean generate
@go build \ @go build \
-tags "netgo static_build" -installsuffix netgo \ -tags "netgo static_build" -installsuffix netgo \
-ldflags "-w -X $(shell go list).Version=$(VERSION) -X $(shell go list).Commit=$(COMMIT)" \ -ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT)" \
./cmd/bitcask/... ./cmd/bitcask/...
@go build \ @go build \
-tags "netgo static_build" -installsuffix netgo \ -tags "netgo static_build" -installsuffix netgo \
-ldflags "-w -X $(shell go list).Version=$(VERSION) -X $(shell go list).Commit=$(COMMIT)" \ -ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT)" \
./cmd/bitcaskd/... ./cmd/bitcaskd/...
generate: generate:

View File

@@ -97,31 +97,33 @@ Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
``` ```
$ make bench $ make bench
... ...
BenchmarkGet/128B-4 200000 5780 ns/op 400 B/op 5 allocs/op BenchmarkGet/128B-4 300000 5178 ns/op 400 B/op 5 allocs/op
BenchmarkGet/256B-4 200000 6138 ns/op 656 B/op 5 allocs/op BenchmarkGet/256B-4 300000 5273 ns/op 656 B/op 5 allocs/op
BenchmarkGet/512B-4 200000 5967 ns/op 1200 B/op 5 allocs/op BenchmarkGet/512B-4 200000 5368 ns/op 1200 B/op 5 allocs/op
BenchmarkGet/1K-4 200000 6290 ns/op 2288 B/op 5 allocs/op BenchmarkGet/1K-4 200000 5800 ns/op 2288 B/op 5 allocs/op
BenchmarkGet/2K-4 200000 6293 ns/op 4464 B/op 5 allocs/op BenchmarkGet/2K-4 200000 6766 ns/op 4464 B/op 5 allocs/op
BenchmarkGet/4K-4 200000 7673 ns/op 9072 B/op 5 allocs/op BenchmarkGet/4K-4 200000 7857 ns/op 9072 B/op 5 allocs/op
BenchmarkGet/8K-4 200000 10373 ns/op 17776 B/op 5 allocs/op BenchmarkGet/8K-4 200000 9538 ns/op 17776 B/op 5 allocs/op
BenchmarkGet/16K-4 100000 14227 ns/op 34928 B/op 5 allocs/op BenchmarkGet/16K-4 100000 13188 ns/op 34928 B/op 5 allocs/op
BenchmarkGet/32K-4 100000 25953 ns/op 73840 B/op 5 allocs/op BenchmarkGet/32K-4 100000 21620 ns/op 73840 B/op 5 allocs/op
BenchmarkPut/128B-4 100000 17353 ns/op 680 B/op 5 allocs/op BenchmarkPut/128B-4 200000 7875 ns/op 409 B/op 6 allocs/op
BenchmarkPut/256B-4 100000 18620 ns/op 808 B/op 5 allocs/op BenchmarkPut/256B-4 200000 8712 ns/op 538 B/op 6 allocs/op
BenchmarkPut/512B-4 100000 19068 ns/op 1096 B/op 5 allocs/op BenchmarkPut/512B-4 200000 9832 ns/op 829 B/op 6 allocs/op
BenchmarkPut/1K-4 100000 23738 ns/op 1673 B/op 5 allocs/op BenchmarkPut/1K-4 100000 13105 ns/op 1410 B/op 6 allocs/op
BenchmarkPut/2K-4 50000 25118 ns/op 2826 B/op 5 allocs/op BenchmarkPut/2K-4 100000 18601 ns/op 2572 B/op 6 allocs/op
BenchmarkPut/4K-4 50000 44605 ns/op 5389 B/op 5 allocs/op BenchmarkPut/4K-4 50000 36631 ns/op 5151 B/op 6 allocs/op
BenchmarkPut/8K-4 30000 55237 ns/op 10001 B/op 5 allocs/op BenchmarkPut/8K-4 30000 56128 ns/op 9798 B/op 6 allocs/op
BenchmarkPut/16K-4 20000 78966 ns/op 18972 B/op 5 allocs/op BenchmarkPut/16K-4 20000 83209 ns/op 18834 B/op 6 allocs/op
BenchmarkPut/32K-4 10000 116253 ns/op 41520 B/op 5 allocs/op BenchmarkPut/32K-4 10000 135899 ns/op 41517 B/op 6 allocs/op
BenchmarkScan-4 1000000 1851 ns/op 493 B/op 25 allocs/op
``` ```
For 128B values: For 128B values:
* ~180,000 reads/sec * ~200,000 reads/sec
* ~60,000 writes/sec * ~130,000 writes/sec
The full benchmark above shows linear performance as you increase key/value sizes. The full benchmark above shows linear performance as you increase key/value sizes.

View File

@@ -1,32 +1,29 @@
package bitcask package bitcask
import ( import (
"errors" "fmt"
"hash/crc32"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/gofrs/flock" "github.com/gofrs/flock"
) "github.com/prologic/trie"
var ( "github.com/prologic/bitcask/internal"
ErrKeyNotFound = errors.New("error: key not found")
ErrKeyTooLarge = errors.New("error: key too large")
ErrValueTooLarge = errors.New("error: value too large")
ErrDatabaseLocked = errors.New("error: database locked")
) )
type Bitcask struct { type Bitcask struct {
*flock.Flock *flock.Flock
opts Options config *config
path string path string
curr *Datafile curr *internal.Datafile
keydir *Keydir keydir *internal.Keydir
datafiles []*Datafile datafiles []*internal.Datafile
trie *trie.Trie
maxDatafileSize int64 maxDatafileSize int64
} }
@@ -48,41 +45,47 @@ func (b *Bitcask) Sync() error {
} }
func (b *Bitcask) Get(key string) ([]byte, error) { func (b *Bitcask) Get(key string) ([]byte, error) {
var df *Datafile var df *internal.Datafile
item, ok := b.keydir.Get(key) item, ok := b.keydir.Get(key)
if !ok { if !ok {
return nil, ErrKeyNotFound return nil, fmt.Errorf("error: key not found %s", key)
} }
if item.FileID == b.curr.id { if item.FileID == b.curr.FileID() {
df = b.curr df = b.curr
} else { } else {
df = b.datafiles[item.FileID] df = b.datafiles[item.FileID]
} }
e, err := df.ReadAt(item.Index) e, err := df.ReadAt(item.Offset)
if err != nil { if err != nil {
return nil, err return nil, err
} }
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return nil, fmt.Errorf("error: checksum falied %s %d != %d", key, e.Checksum, checksum)
}
return e.Value, nil return e.Value, nil
} }
func (b *Bitcask) Put(key string, value []byte) error { func (b *Bitcask) Put(key string, value []byte) error {
if len(key) > b.opts.MaxKeySize { if len(key) > b.config.MaxKeySize {
return ErrKeyTooLarge return fmt.Errorf("error: key too large %d > %d", len(key), b.config.MaxKeySize)
} }
if len(value) > b.opts.MaxValueSize { if len(value) > b.config.MaxValueSize {
return ErrValueTooLarge return fmt.Errorf("error: value too large %d > %d", len(value), b.config.MaxValueSize)
} }
index, err := b.put(key, value) offset, err := b.put(key, value)
if err != nil { if err != nil {
return err return err
} }
b.keydir.Add(key, b.curr.id, index, time.Now().Unix()) item := b.keydir.Add(key, b.curr.FileID(), offset)
b.trie.Add(key, item)
return nil return nil
} }
@@ -94,10 +97,21 @@ func (b *Bitcask) Delete(key string) error {
} }
b.keydir.Delete(key) b.keydir.Delete(key)
b.trie.Remove(key)
return nil return nil
} }
func (b *Bitcask) Scan(prefix string, f func(key string) error) error {
keys := b.trie.PrefixSearch(prefix)
for _, key := range keys {
if err := f(key); err != nil {
return err
}
}
return nil
}
func (b *Bitcask) Fold(f func(key string) error) error { func (b *Bitcask) Fold(f func(key string) error) error {
for key := range b.keydir.Keys() { for key := range b.keydir.Keys() {
if err := f(key); err != nil { if err := f(key); err != nil {
@@ -119,18 +133,22 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) {
return -1, err return -1, err
} }
df, err := NewDatafile(b.path, b.curr.id, true) df, err := internal.NewDatafile(b.path, b.curr.FileID(), true)
if err != nil {
return -1, err
}
b.datafiles = append(b.datafiles, df) b.datafiles = append(b.datafiles, df)
id := b.curr.id + 1 id := b.curr.FileID() + 1
curr, err := NewDatafile(b.path, id, false) curr, err := internal.NewDatafile(b.path, id, false)
if err != nil { if err != nil {
return -1, err return -1, err
} }
b.curr = curr b.curr = curr
} }
e := NewEntry(key, value) e := internal.NewEntry(key, value)
return b.curr.Write(e) return b.curr.Write(e)
} }
@@ -140,12 +158,12 @@ func (b *Bitcask) setMaxDatafileSize(size int64) error {
} }
func Merge(path string, force bool) error { func Merge(path string, force bool) error {
fns, err := getDatafiles(path) fns, err := internal.GetDatafiles(path)
if err != nil { if err != nil {
return err return err
} }
ids, err := parseIds(fns) ids, err := internal.ParseIds(fns)
if err != nil { if err != nil {
return err return err
} }
@@ -175,9 +193,9 @@ func Merge(path string, force bool) error {
id := ids[i] id := ids[i]
keydir := NewKeydir() keydir := internal.NewKeydir()
df, err := NewDatafile(path, id, true) df, err := internal.NewDatafile(path, id, true)
if err != nil { if err != nil {
return err return err
} }
@@ -198,10 +216,10 @@ func Merge(path string, force bool) error {
continue continue
} }
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) keydir.Add(e.Key, ids[i], e.Offset)
} }
tempdf, err := NewDatafile(temp, id, false) tempdf, err := internal.NewDatafile(temp, id, false)
if err != nil { if err != nil {
return err return err
} }
@@ -209,7 +227,7 @@ func Merge(path string, force bool) error {
for key := range keydir.Keys() { for key := range keydir.Keys() {
item, _ := keydir.Get(key) item, _ := keydir.Get(key)
e, err := df.ReadAt(item.Index) e, err := df.ReadAt(item.Offset)
if err != nil { if err != nil {
return err return err
} }
@@ -245,7 +263,7 @@ func Merge(path string, force bool) error {
return nil return nil
} }
func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { func Open(path string, options ...option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil { if err := os.MkdirAll(path, 0755); err != nil {
return nil, err return nil, err
} }
@@ -255,21 +273,23 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
return nil, err return nil, err
} }
fns, err := getDatafiles(path) fns, err := internal.GetDatafiles(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ids, err := parseIds(fns) ids, err := internal.ParseIds(fns)
if err != nil { if err != nil {
return nil, err return nil, err
} }
keydir := NewKeydir() var datafiles []*internal.Datafile
var datafiles []*Datafile
keydir := internal.NewKeydir()
trie := trie.New()
for i, fn := range fns { for i, fn := range fns {
df, err := NewDatafile(path, ids[i], true) df, err := internal.NewDatafile(path, ids[i], true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -282,14 +302,15 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
} }
defer f.Close() defer f.Close()
hint, err := NewKeydirFromBytes(f) hint, err := internal.NewKeydirFromBytes(f)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for key := range hint.Keys() { for key := range hint.Keys() {
item, _ := hint.Get(key) item, _ := hint.Get(key)
keydir.Add(key, item.FileID, item.Index, item.Timestamp) _ = keydir.Add(key, item.FileID, item.Offset)
trie.Add(key, item)
} }
} else { } else {
for { for {
@@ -307,7 +328,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
continue continue
} }
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) item := keydir.Add(e.Key, ids[i], e.Offset)
trie.Add(e.Key, item)
} }
} }
} }
@@ -317,24 +339,25 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
id = ids[(len(ids) - 1)] id = ids[(len(ids) - 1)]
} }
curr, err := NewDatafile(path, id, false) curr, err := internal.NewDatafile(path, id, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
bitcask := &Bitcask{ bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")), Flock: flock.New(filepath.Join(path, "lock")),
opts: NewDefaultOptions(), config: NewDefaultConfig(),
path: path, path: path,
curr: curr, curr: curr,
keydir: keydir, keydir: keydir,
datafiles: datafiles, datafiles: datafiles,
trie: trie,
maxDatafileSize: DefaultMaxDatafileSize, maxDatafileSize: DefaultMaxDatafileSize,
} }
for _, option := range options { for _, opt := range options {
err = option(bitcask) err = opt(bitcask.config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -346,7 +369,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
} }
if !locked { if !locked {
return nil, ErrDatabaseLocked return nil, fmt.Errorf("error: database locked %s", path)
} }
return bitcask, nil return bitcask, nil

View File

@@ -3,6 +3,8 @@ package bitcask
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"reflect"
"sort"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@@ -43,7 +45,7 @@ func TestAll(t *testing.T) {
assert.NoError(err) assert.NoError(err)
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal(err.Error(), "error: key not found") assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Sync", func(t *testing.T) { t.Run("Sync", func(t *testing.T) {
@@ -90,7 +92,7 @@ func TestDeletedKeys(t *testing.T) {
assert.NoError(err) assert.NoError(err)
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal("error: key not found", err.Error()) assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Sync", func(t *testing.T) { t.Run("Sync", func(t *testing.T) {
@@ -118,7 +120,7 @@ func TestDeletedKeys(t *testing.T) {
t.Run("Get", func(t *testing.T) { t.Run("Get", func(t *testing.T) {
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal("error: key not found", err.Error()) assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Close", func(t *testing.T) { t.Run("Close", func(t *testing.T) {
@@ -136,19 +138,17 @@ func TestMaxKeySize(t *testing.T) {
var db *Bitcask var db *Bitcask
size := 16
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxKeySize(size)) db, err = Open(testdir, WithMaxKeySize(16))
assert.NoError(err) assert.NoError(err)
}) })
t.Run("Put", func(t *testing.T) { t.Run("Put", func(t *testing.T) {
key := strings.Repeat(" ", size+1) key := strings.Repeat(" ", 17)
value := []byte("foobar") value := []byte("foobar")
err = db.Put(key, value) err = db.Put(key, value)
assert.Error(err) assert.Error(err)
assert.Equal("error: key too large", err.Error()) assert.Equal("error: key too large 17 > 16", err.Error())
}) })
} }
@@ -160,19 +160,17 @@ func TestMaxValueSize(t *testing.T) {
var db *Bitcask var db *Bitcask
size := 16
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxValueSize(size)) db, err = Open(testdir, WithMaxValueSize(16))
assert.NoError(err) assert.NoError(err)
}) })
t.Run("Put", func(t *testing.T) { t.Run("Put", func(t *testing.T) {
key := "foo" key := "foo"
value := []byte(strings.Repeat(" ", size+1)) value := []byte(strings.Repeat(" ", 17))
err = db.Put(key, value) err = db.Put(key, value)
assert.Error(err) assert.Error(err)
assert.Equal("error: value too large", err.Error()) assert.Equal("error: value too large 17 > 16", err.Error())
}) })
} }
@@ -289,10 +287,9 @@ func TestConcurrent(t *testing.T) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
go f(wg, 2) go f(wg, 2)
wg.Add(1)
go f(wg, 3) go f(wg, 3)
wg.Add(1) go f(wg, 5)
wg.Add(3)
wg.Wait() wg.Wait()
}) })
@@ -312,10 +309,9 @@ func TestConcurrent(t *testing.T) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
go f(wg, 100) go f(wg, 100)
wg.Add(1)
go f(wg, 100) go f(wg, 100)
wg.Add(1) go f(wg, 100)
wg.Add(3)
wg.Wait() wg.Wait()
}) })
@@ -327,6 +323,58 @@ func TestConcurrent(t *testing.T) {
}) })
} }
func TestScan(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte("foo"),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err = db.Put(k, v)
assert.NoError(err)
}
})
})
t.Run("Scan", func(t *testing.T) {
var (
vals []string
expected = []string{
"foo",
"fooz ball",
"pizza",
}
)
err = db.Scan("fo", func(key string) error {
val, err := db.Get(key)
assert.NoError(err)
vals = append(vals, string(val))
return nil
})
sort.Strings(vals)
assert.Equal(expected, vals)
})
}
func TestLocking(t *testing.T) { func TestLocking(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
@@ -339,7 +387,7 @@ func TestLocking(t *testing.T) {
_, err = Open(testdir) _, err = Open(testdir)
assert.Error(err) assert.Error(err)
assert.Equal("error: database locked", err.Error()) assert.Equal(fmt.Sprintf("error: database locked %s", testdir), err.Error())
} }
type benchmarkTestCase struct { type benchmarkTestCase struct {
@@ -433,3 +481,47 @@ func BenchmarkPut(b *testing.B) {
}) })
} }
} }
func BenchmarkScan(b *testing.B) {
testdir, err := ioutil.TempDir("", "bitcask")
if err != nil {
b.Fatal(err)
}
db, err := Open(testdir)
if err != nil {
b.Fatal(err)
}
defer db.Close()
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte("foo"),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err := db.Put(k, v)
if err != nil {
b.Fatal(err)
}
}
var expected = []string{"foo", "food", "fooz"}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var keys []string
err = db.Scan("fo", func(key string) error {
keys = append(keys, key)
return nil
})
sort.Strings(keys)
if !reflect.DeepEqual(expected, keys) {
b.Fatal(fmt.Errorf("expected keys=#%v got=%#v", expected, keys))
}
}
}

View File

@@ -8,13 +8,13 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/prologic/bitcask" "github.com/prologic/bitcask/internal"
) )
// RootCmd represents the base command when called without any subcommands // RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{ var RootCmd = &cobra.Command{
Use: "bitcask", Use: "bitcask",
Version: bitcask.FullVersion(), Version: internal.FullVersion(),
Short: "Command-line tools for bitcask", Short: "Command-line tools for bitcask",
Long: `This is the command-line tool to interact with a bitcask database. Long: `This is the command-line tool to interact with a bitcask database.

60
cmd/bitcask/scan.go Normal file
View File

@@ -0,0 +1,60 @@
package main
import (
"fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
)
var scanCmd = &cobra.Command{
Use: "scan <prefix>",
Aliases: []string{"search", "find"},
Short: "Perform a prefis scan for keys",
Long: `This performa a prefix scan for keys starting with the given
prefix. This uses a Trie to search for matching keys and returns all matched
keys.`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
prefix := args[0]
os.Exit(scan(path, prefix))
},
}
func init() {
RootCmd.AddCommand(scanCmd)
}
func scan(path, prefix string) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
err = db.Scan(prefix, func(key string) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).Error("error reading key")
return err
}
fmt.Printf("%s\n", string(value))
log.WithField("key", key).WithField("value", value).Debug("key/value")
return nil
})
if err != nil {
log.WithError(err).Error("error scanning keys")
return 1
}
return 0
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/tidwall/redcon" "github.com/tidwall/redcon"
"github.com/prologic/bitcask" "github.com/prologic/bitcask"
"github.com/prologic/bitcask/internal"
) )
var ( var (
@@ -43,7 +44,7 @@ func main() {
} }
if version { if version {
fmt.Printf("bitcaskd version %s", bitcask.FullVersion()) fmt.Printf("bitcaskd version %s", internal.FullVersion())
os.Exit(0) os.Exit(0)
} }
@@ -60,7 +61,7 @@ func main() {
os.Exit(1) os.Exit(1)
} }
log.WithField("bind", bind).WithField("path", path).Infof("starting bitcaskd v%s", bitcask.FullVersion()) log.WithField("bind", bind).WithField("path", path).Infof("starting bitcaskd v%s", internal.FullVersion())
err = redcon.ListenAndServe(bind, err = redcon.ListenAndServe(bind,
func(conn redcon.Conn, cmd redcon.Command) { func(conn redcon.Conn, cmd redcon.Command) {

View File

@@ -1,17 +0,0 @@
package bitcask
import (
"hash/crc32"
pb "github.com/prologic/bitcask/proto"
)
func NewEntry(key string, value []byte) pb.Entry {
crc := crc32.ChecksumIEEE(value)
return pb.Entry{
CRC: crc,
Key: key,
Value: value,
}
}

2
go.mod
View File

@@ -1,6 +1,7 @@
module github.com/prologic/bitcask module github.com/prologic/bitcask
require ( require (
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7
github.com/gofrs/flock v0.7.1 github.com/gofrs/flock v0.7.1
github.com/gogo/protobuf v1.2.1 github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.2.0 github.com/golang/protobuf v1.2.0
@@ -9,6 +10,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
github.com/prologic/msgbus v0.1.1 github.com/prologic/msgbus v0.1.1
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705
github.com/prometheus/client_golang v0.9.2 // indirect github.com/prometheus/client_golang v0.9.2 // indirect
github.com/sirupsen/logrus v1.3.0 github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3 github.com/spf13/cobra v0.0.3

4
go.sum
View File

@@ -8,6 +8,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7 h1:Cab9yoTQh1TxObKfis1DzZ6vFLK5kbeenMjRES/UE3o=
github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc= github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
@@ -40,6 +42,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk= github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705 h1:2J+cSlAeECj0lfMKSmM7n5OlIio+yLovaKLZJzwLc6U=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705/go.mod h1:LFuDmpHJGmciXd8Rl5YMhVlLMps9gz2GtYLzwxrFhzs=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=

View File

@@ -1,15 +1,15 @@
package bitcask package internal
import ( import (
"errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
pb "github.com/prologic/bitcask/proto" "github.com/pkg/errors"
"github.com/prologic/bitcask/streampb"
pb "github.com/prologic/bitcask/internal/proto"
"github.com/prologic/bitcask/internal/streampb"
) )
const ( const (
@@ -23,11 +23,12 @@ var (
type Datafile struct { type Datafile struct {
sync.RWMutex sync.RWMutex
id int id int
r *os.File r *os.File
w *os.File w *os.File
dec *streampb.Decoder offset int64
enc *streampb.Encoder dec *streampb.Decoder
enc *streampb.Encoder
} }
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) { func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
@@ -50,19 +51,30 @@ func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
offset := stat.Size()
dec := streampb.NewDecoder(r) dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w) enc := streampb.NewEncoder(w)
return &Datafile{ return &Datafile{
id: id, id: id,
r: r, r: r,
w: w, w: w,
dec: dec, offset: offset,
enc: enc, dec: dec,
enc: enc,
}, nil }, nil
} }
func (df *Datafile) FileID() int {
return df.id
}
func (df *Datafile) Name() string { func (df *Datafile) Name() string {
return df.r.Name() return df.r.Name()
} }
@@ -87,22 +99,9 @@ func (df *Datafile) Sync() error {
} }
func (df *Datafile) Size() (int64, error) { func (df *Datafile) Size() (int64, error) {
var ( df.RLock()
stat os.FileInfo defer df.RUnlock()
err error return df.offset, nil
)
if df.w == nil {
stat, err = df.r.Stat()
} else {
stat, err = df.w.Stat()
}
if err != nil {
return -1, err
}
return stat.Size(), nil
} }
func (df *Datafile) Read() (e pb.Entry, err error) { func (df *Datafile) Read() (e pb.Entry, err error) {
@@ -129,23 +128,16 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
return -1, ErrReadonly return -1, ErrReadonly
} }
stat, err := df.w.Stat()
if err != nil {
return -1, err
}
index := stat.Size()
e.Index = index
e.Timestamp = time.Now().Unix()
df.Lock() df.Lock()
err = df.enc.Encode(&e) defer df.Unlock()
df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(&e)
if err != nil { if err != nil {
return -1, err return -1, err
} }
df.offset += n
return index, nil return e.Offset, nil
} }

17
internal/entry.go Normal file
View File

@@ -0,0 +1,17 @@
package internal
import (
"hash/crc32"
pb "github.com/prologic/bitcask/internal/proto"
)
func NewEntry(key string, value []byte) pb.Entry {
checksum := crc32.ChecksumIEEE(value)
return pb.Entry{
Checksum: checksum,
Key: key,
Value: value,
}
}

View File

@@ -1,4 +1,4 @@
package bitcask package internal
import ( import (
"bytes" "bytes"
@@ -9,9 +9,8 @@ import (
) )
type Item struct { type Item struct {
FileID int FileID int
Index int64 Offset int64
Timestamp int64
} }
type Keydir struct { type Keydir struct {
@@ -25,15 +24,17 @@ func NewKeydir() *Keydir {
} }
} }
func (k *Keydir) Add(key string, fileid int, index, timestamp int64) { func (k *Keydir) Add(key string, fileid int, offset int64) Item {
k.Lock() item := Item{
defer k.Unlock() FileID: fileid,
Offset: offset,
k.kv[key] = Item{
FileID: fileid,
Index: index,
Timestamp: timestamp,
} }
k.Lock()
k.kv[key] = item
k.Unlock()
return item
} }
func (k *Keydir) Get(key string) (Item, bool) { func (k *Keydir) Get(key string) (Item, bool) {

View File

@@ -19,11 +19,10 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct { type Entry struct {
CRC uint32 `protobuf:"varint,1,opt,name=CRC,proto3" json:"CRC,omitempty"` Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Index int64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"` Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"` Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
Timestamp int64 `protobuf:"varint,5,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@@ -33,7 +32,7 @@ func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) } func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {} func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) { func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_4f5906245d08394f, []int{0} return fileDescriptor_entry_3e91842c99935ae2, []int{0}
} }
func (m *Entry) XXX_Unmarshal(b []byte) error { func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b) return xxx_messageInfo_Entry.Unmarshal(m, b)
@@ -53,9 +52,9 @@ func (m *Entry) XXX_DiscardUnknown() {
var xxx_messageInfo_Entry proto.InternalMessageInfo var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetCRC() uint32 { func (m *Entry) GetChecksum() uint32 {
if m != nil { if m != nil {
return m.CRC return m.Checksum
} }
return 0 return 0
} }
@@ -67,9 +66,9 @@ func (m *Entry) GetKey() string {
return "" return ""
} }
func (m *Entry) GetIndex() int64 { func (m *Entry) GetOffset() int64 {
if m != nil { if m != nil {
return m.Index return m.Offset
} }
return 0 return 0
} }
@@ -81,28 +80,20 @@ func (m *Entry) GetValue() []byte {
return nil return nil
} }
func (m *Entry) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func init() { func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry") proto.RegisterType((*Entry)(nil), "proto.Entry")
} }
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_4f5906245d08394f) } func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_3e91842c99935ae2) }
var fileDescriptor_entry_4f5906245d08394f = []byte{ var fileDescriptor_entry_3e91842c99935ae2 = []byte{
// 134 bytes of a gzipped FileDescriptorProto // 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xa5, 0x5c, 0xac, 0xae, 0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x01, 0x2e, 0x66, 0xe7, 0x20, 0x67, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x10, 0x13, 0x24, 0xe2, 0x9d, 0x5a, 0x29, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x89, 0x70, 0xb1, 0x7a, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0x30, 0x07, 0x41, 0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x38, 0x20, 0xd1, 0xb0, 0xc4, 0x9c, 0xd2, 0x54, 0x09, 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x08, 0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x47, 0x48, 0x86, 0x8b, 0x33, 0x24, 0x33, 0x37, 0xb5, 0xb8, 0x24, 0x31, 0xb7, 0x40, 0x82, 0x15, 0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0xac, 0x1e, 0x21, 0x90, 0xc4, 0x06, 0xb6, 0xdd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x07, 0x99, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
0x47, 0xb9, 0x93, 0x00, 0x00, 0x00,
} }

View File

@@ -3,9 +3,8 @@ syntax = "proto3";
package proto; package proto;
message Entry { message Entry {
uint32 CRC = 1; uint32 Checksum = 1;
string Key = 2; string Key = 2;
int64 Index = 3; int64 Offset = 3;
bytes Value = 4; bytes Value = 4;
int64 Timestamp = 5;
} }

View File

@@ -1,6 +1,7 @@
package streampb package streampb
import ( import (
"bufio"
"encoding/binary" "encoding/binary"
"io" "io"
@@ -16,32 +17,40 @@ const (
// NewEncoder creates a streaming protobuf encoder. // NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder { func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w} return &Encoder{w: bufio.NewWriter(w)}
} }
// Encoder wraps an underlying io.Writer and allows you to stream // Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it. // proto encodings on it.
type Encoder struct { type Encoder struct {
w io.Writer w *bufio.Writer
} }
// Encode takes any proto.Message and streams it to the underlying writer. // Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix. // Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) error { func (e *Encoder) Encode(msg proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize) prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg) buf, err := proto.Marshal(msg)
if err != nil { if err != nil {
return err return 0, err
} }
binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf))) binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(prefixBuf); err != nil { if _, err := e.w.Write(prefixBuf); err != nil {
return errors.Wrap(err, "failed writing length prefix") return 0, errors.Wrap(err, "failed writing length prefix")
} }
_, err = e.w.Write(buf) n, err := e.w.Write(buf)
return errors.Wrap(err, "failed writing marshaled data") if err != nil {
return 0, errors.Wrap(err, "failed writing marshaled data")
}
if err = e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(n + prefixSize), nil
} }
// NewDecoder creates a streaming protobuf decoder. // NewDecoder creates a streaming protobuf decoder.

View File

@@ -1,4 +1,4 @@
package bitcask package internal
import ( import (
"fmt" "fmt"
@@ -8,7 +8,7 @@ import (
"strings" "strings"
) )
func getDatafiles(path string) ([]string, error) { func GetDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path)) fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil { if err != nil {
return nil, err return nil, err
@@ -17,7 +17,7 @@ func getDatafiles(path string) ([]string, error) {
return fns, nil return fns, nil
} }
func parseIds(fns []string) ([]int, error) { func ParseIds(fns []string) ([]int, error) {
var ids []int var ids []int
for _, fn := range fns { for _, fn := range fns {
fn = filepath.Base(fn) fn = filepath.Base(fn)

View File

@@ -1,4 +1,4 @@
package bitcask package internal
import ( import (
"fmt" "fmt"

View File

@@ -1,4 +1,4 @@
package bitcask package internal
import ( import (
"fmt" "fmt"

View File

@@ -6,37 +6,42 @@ const (
DefaultMaxValueSize = 1 << 16 // 65KB DefaultMaxValueSize = 1 << 16 // 65KB
) )
type Options struct { // Option ...
type Option option
type option func(*config) error
type config struct {
MaxDatafileSize int MaxDatafileSize int
MaxKeySize int MaxKeySize int
MaxValueSize int MaxValueSize int
} }
func NewDefaultOptions() Options { func NewDefaultConfig() *config {
return Options{ return &config{
MaxDatafileSize: DefaultMaxDatafileSize, MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize, MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize, MaxValueSize: DefaultMaxValueSize,
} }
} }
func WithMaxDatafileSize(size int) func(*Bitcask) error { func WithMaxDatafileSize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxDatafileSize = size cfg.MaxDatafileSize = size
return nil return nil
} }
} }
func WithMaxKeySize(size int) func(*Bitcask) error { func WithMaxKeySize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxKeySize = size cfg.MaxKeySize = size
return nil return nil
} }
} }
func WithMaxValueSize(size int) func(*Bitcask) error { func WithMaxValueSize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxValueSize = size cfg.MaxValueSize = size
return nil return nil
} }
} }