Compare commits

...

11 Commits

Author SHA1 Message Date
James Mills
2400dd86d5 Add docs for bitcask 2019-03-21 17:46:53 +10:00
James Mills
27eb922ba2 Add docs for options 2019-03-21 17:20:53 +10:00
James Mills
34ad78efc0 Add KeYS command to server (bitraftd) 2019-03-21 10:49:53 +10:00
James Mills
352c32ee12 Add Len() to exported API (extended API) 2019-03-21 10:47:50 +10:00
James Mills
aaea7273c3 Add Keys() to exported API (extended API) 2019-03-21 10:41:56 +10:00
James Mills
01cb269a51 Add EXISTS command to server (bitraftd) 2019-03-21 10:29:18 +10:00
James Mills
962e53af17 Add Has() to exported API (extended API) 2019-03-21 10:24:48 +10:00
James Mills
7a427a237a Update README.md 2019-03-21 08:36:17 +10:00
James Mills
8bf169c96f Add MergeOpen test case 2019-03-20 17:10:24 +10:00
James Mills
c1488fed2a Added Fold() test case 2019-03-20 16:55:59 +10:00
James Mills
d6e806e655 Update README.md 2019-03-20 15:30:08 +10:00
6 changed files with 224 additions and 48 deletions

View File

@@ -6,13 +6,13 @@
[![GoDoc](https://godoc.org/github.com/prologic/bitcask?status.svg)](https://godoc.org/github.com/prologic/bitcask)
[![Sourcegraph](https://sourcegraph.com/github.com/prologic/bitcask/-/badge.svg)](https://sourcegraph.com/github.com/prologic/bitcask?badge)
A Bitcask (LSM+WAL) Key/Value Store written in Go.
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/).
## Features
* Embeddable
* Builtin CLI
* Builtin Redis-compatible server
* Embeddable (`import "github.com/prologic/bitcask"`)
* Builtin CLI (`bitcask`)
* Builtin Redis-compatible server (`bitcaskd`)
* Predictable read/write performance
* Low latecny
* High throughput (See: [Performance](README.md#Performance)
@@ -34,16 +34,13 @@ $ go get github.com/prologic/bitcask
```#!go
package main
import (
"log"
"github.com/prologic/bitcask"
)
import "github.com/prologic/bitcask"
func main() {
db, _ := bitcask.Open("/tmp/db")
defer db.Close()
db.Set("Hello", []byte("World"))
db.Close()
val, _ := db.Get("hello")
}
```

View File

@@ -16,13 +16,29 @@ import (
)
var (
ErrKeyNotFound = errors.New("error: key not found")
ErrKeyTooLarge = errors.New("error: key too large")
ErrValueTooLarge = errors.New("error: value too large")
// ErrKeyNotFound is the error returned when a key is not found
ErrKeyNotFound = errors.New("error: key not found")
// ErrKeyTooLarge is the error returned for a key that exceeds the
// maximum allowed key size (configured with WithMaxKeySize).
ErrKeyTooLarge = errors.New("error: key too large")
// ErrValueTooLarge is the error returned for a value that exceeds the
// maximum allowed value size (configured with WithMaxValueSize).
ErrValueTooLarge = errors.New("error: value too large")
// ErrChecksumFailed is the error returned if a key/valie retrieved does
// not match its CRC checksum
ErrChecksumFailed = errors.New("error: checksum failed")
// ErrDatabaseLocked is the error returned if the database is locked
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
)
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
// and in-memory hash of key/value pairs as per the Bitcask paper and seen
// in the Riak database.
type Bitcask struct {
*flock.Flock
@@ -32,10 +48,11 @@ type Bitcask struct {
keydir *internal.Keydir
datafiles []*internal.Datafile
trie *trie.Trie
maxDatafileSize int64
}
// Close closes the database and removes the lock. It is important to call
// Close() as this is the only wat to cleanup the lock held by the open
// database.
func (b *Bitcask) Close() error {
defer func() {
b.Flock.Unlock()
@@ -48,10 +65,13 @@ func (b *Bitcask) Close() error {
return b.curr.Close()
}
// Sync flushes all buffers to disk ensuring all data is written
func (b *Bitcask) Sync() error {
return b.curr.Sync()
}
// Get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returend along with the error.
func (b *Bitcask) Get(key string) ([]byte, error) {
var df *internal.Datafile
@@ -79,11 +99,18 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
return e.Value, nil
}
// Has returns true if the key exists in the database, false otherwise.
func (b *Bitcask) Has(key string) bool {
_, ok := b.keydir.Get(key)
return ok
}
// Put stores the key and value in the database.
func (b *Bitcask) Put(key string, value []byte) error {
if len(key) > b.config.MaxKeySize {
if len(key) > b.config.maxKeySize {
return ErrKeyTooLarge
}
if len(value) > b.config.MaxValueSize {
if len(value) > b.config.maxValueSize {
return ErrValueTooLarge
}
@@ -98,6 +125,8 @@ func (b *Bitcask) Put(key string, value []byte) error {
return nil
}
// Delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
func (b *Bitcask) Delete(key string) error {
_, err := b.put(key, []byte{})
if err != nil {
@@ -110,6 +139,9 @@ func (b *Bitcask) Delete(key string) error {
return nil
}
// Scan performa a prefix scan of keys matching the given prefix and calling
// the function `f` with the keys found. If the function returns an error
// no further keys are processed and the first error returned.
func (b *Bitcask) Scan(prefix string, f func(key string) error) error {
keys := b.trie.PrefixSearch(prefix)
for _, key := range keys {
@@ -120,6 +152,19 @@ func (b *Bitcask) Scan(prefix string, f func(key string) error) error {
return nil
}
// Len returns the total number of keys in the database
func (b *Bitcask) Len() int {
return b.keydir.Len()
}
// Keys returns all keys in the database as a channel of string(s)
func (b *Bitcask) Keys() chan string {
return b.keydir.Keys()
}
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
func (b *Bitcask) Fold(f func(key string) error) error {
for key := range b.keydir.Keys() {
if err := f(key); err != nil {
@@ -135,7 +180,7 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) {
return -1, err
}
if size >= b.maxDatafileSize {
if size >= int64(b.config.maxDatafileSize) {
err := b.curr.Close()
if err != nil {
return -1, err
@@ -160,11 +205,9 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) {
return b.curr.Write(e)
}
func (b *Bitcask) setMaxDatafileSize(size int64) error {
b.maxDatafileSize = size
return nil
}
// Merge merges all datafiles in the database creating hint files for faster
// startup. Old keys are squashed and deleted keys removes. Call this function
// periodically to reclaim disk space.
func Merge(path string, force bool) error {
fns, err := internal.GetDatafiles(path)
if err != nil {
@@ -271,7 +314,10 @@ func Merge(path string, force bool) error {
return nil
}
func Open(path string, options ...option) (*Bitcask, error) {
// Open opens the database at the given path with optional options.
// Options can be provided with the `WithXXX` functions that provide
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
@@ -360,8 +406,6 @@ func Open(path string, options ...option) (*Bitcask, error) {
keydir: keydir,
datafiles: datafiles,
trie: trie,
maxDatafileSize: DefaultMaxDatafileSize,
}
for _, opt := range options {

View File

@@ -40,6 +40,42 @@ func TestAll(t *testing.T) {
assert.Equal([]byte("bar"), val)
})
t.Run("Len", func(t *testing.T) {
assert.Equal(1, db.Len())
})
t.Run("Has", func(t *testing.T) {
assert.True(db.Has("foo"))
})
t.Run("Keys", func(t *testing.T) {
keys := make([]string, 0)
for key := range db.Keys() {
keys = append(keys, key)
}
assert.Equal([]string{"foo"}, keys)
})
t.Run("Fold", func(t *testing.T) {
var (
keys []string
values [][]byte
)
err := db.Fold(func(key string) error {
value, err := db.Get(key)
if err != nil {
return err
}
keys = append(keys, key)
values = append(values, value)
return nil
})
assert.NoError(err)
assert.Equal([]string{"foo"}, keys)
assert.Equal([][]byte{[]byte("bar")}, values)
})
t.Run("Delete", func(t *testing.T) {
err := db.Delete("foo")
assert.NoError(err)
@@ -174,7 +210,7 @@ func TestMaxValueSize(t *testing.T) {
})
}
func TestMerge(t *testing.T) {
func TestOpenMerge(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
@@ -187,7 +223,7 @@ func TestMerge(t *testing.T) {
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxDatafileSize(1024))
db, err = Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
})
@@ -245,6 +281,77 @@ func TestMerge(t *testing.T) {
})
}
func TestMergeOpen(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
for i := 0; i < 1024; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err)
}
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err)
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
})
t.Run("Sync", func(t *testing.T) {
err = db.Sync()
assert.NoError(err)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
t.Run("Merge", func(t *testing.T) {
t.Run("Merge", func(t *testing.T) {
err = Merge(testdir, true)
assert.NoError(err)
})
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestConcurrent(t *testing.T) {
var (
db *Bitcask

View File

@@ -96,6 +96,22 @@ func main() {
} else {
conn.WriteBulk(value)
}
case "keys":
conn.WriteArray(db.Len())
for key := range db.Keys() {
conn.WriteBulk([]byte(key))
}
case "exists":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
if db.Has(key) {
conn.WriteInt(1)
} else {
conn.WriteInt(0)
}
case "del":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")

View File

@@ -52,11 +52,17 @@ func (k *Keydir) Delete(key string) {
delete(k.kv, key)
}
func (k *Keydir) Len() int {
return len(k.kv)
}
func (k *Keydir) Keys() chan string {
ch := make(chan string)
go func() {
for k := range k.kv {
ch <- k
k.RLock()
defer k.RUnlock()
for key := range k.kv {
ch <- key
}
close(ch)
}()

View File

@@ -1,47 +1,53 @@
package bitcask
const (
// DefaultMaxDatafileSize is the default maximum datafile size in bytes
DefaultMaxDatafileSize = 1 << 20 // 1MB
DefaultMaxKeySize = 64 // 64 bytes
DefaultMaxValueSize = 1 << 16 // 65KB
// DefaultMaxKeySize is the default maximum key size in bytes
DefaultMaxKeySize = 64 // 64 bytes
// DefaultMaxValueSize is the default value size in bytes
DefaultMaxValueSize = 1 << 16 // 65KB
)
// Option ...
type Option option
type option func(*config) error
// Option is a function that takes a config struct and modifies it
type Option func(*config) error
type config struct {
MaxDatafileSize int
MaxKeySize int
MaxValueSize int
maxDatafileSize int
maxKeySize int
maxValueSize int
}
func newDefaultConfig() *config {
return &config{
MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize,
maxDatafileSize: DefaultMaxDatafileSize,
maxKeySize: DefaultMaxKeySize,
maxValueSize: DefaultMaxValueSize,
}
}
func WithMaxDatafileSize(size int) option {
// WithMaxDatafileSize sets the maximum datafile size option
func WithMaxDatafileSize(size int) Option {
return func(cfg *config) error {
cfg.MaxDatafileSize = size
cfg.maxDatafileSize = size
return nil
}
}
func WithMaxKeySize(size int) option {
// WithMaxKeySize sets the maximum key size option
func WithMaxKeySize(size int) Option {
return func(cfg *config) error {
cfg.MaxKeySize = size
cfg.maxKeySize = size
return nil
}
}
func WithMaxValueSize(size int) option {
// WithMaxValueSize sets the maximum value size option
func WithMaxValueSize(size int) Option {
return func(cfg *config) error {
cfg.MaxValueSize = size
cfg.maxValueSize = size
return nil
}
}