Compare commits

...

5 Commits

Author SHA1 Message Date
James Mills
71a42800fe Improved benchmark test suite for various key/value sizes 2019-03-14 18:17:20 +10:00
James Mills
3b9627aeb8 Fix concurrent read bug 2019-03-14 17:58:06 +10:00
James Mills
e0c4c4fdae Fix concurrent write bug with multiple goroutines writing to the to the active datafile 2019-03-14 17:58:06 +10:00
James Mills
fb50eb2f82 Update README.md 2019-03-14 15:36:37 +10:00
James Mills
fb2335e3c1 Fixed tests 2019-03-14 07:46:59 +10:00
4 changed files with 193 additions and 38 deletions

View File

@@ -12,6 +12,7 @@ A Bitcask (LSM+WAL) Key/Value Store written in Go.
* Embeddable * Embeddable
* Builtin CLI * Builtin CLI
* Builtin Redis-compatible server
* Predictable read/write performance * Predictable read/write performance
* Low latecny * Low latecny
* High throughput (See: [Performance](README.md#Performance) * High throughput (See: [Performance](README.md#Performance)
@@ -96,12 +97,32 @@ Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
``` ```
$ make bench $ make bench
... ...
BenchmarkGet-4 300000 5065 ns/op 144 B/op 4 allocs/op BenchmarkGet/128B-4 200000 5780 ns/op 400 B/op 5 allocs/op
BenchmarkPut-4 100000 14640 ns/op 699 B/op 7 allocs/op BenchmarkGet/256B-4 200000 6138 ns/op 656 B/op 5 allocs/op
BenchmarkGet/512B-4 200000 5967 ns/op 1200 B/op 5 allocs/op
BenchmarkGet/1K-4 200000 6290 ns/op 2288 B/op 5 allocs/op
BenchmarkGet/2K-4 200000 6293 ns/op 4464 B/op 5 allocs/op
BenchmarkGet/4K-4 200000 7673 ns/op 9072 B/op 5 allocs/op
BenchmarkGet/8K-4 200000 10373 ns/op 17776 B/op 5 allocs/op
BenchmarkGet/16K-4 100000 14227 ns/op 34928 B/op 5 allocs/op
BenchmarkGet/32K-4 100000 25953 ns/op 73840 B/op 5 allocs/op
BenchmarkPut/128B-4 100000 17353 ns/op 680 B/op 5 allocs/op
BenchmarkPut/256B-4 100000 18620 ns/op 808 B/op 5 allocs/op
BenchmarkPut/512B-4 100000 19068 ns/op 1096 B/op 5 allocs/op
BenchmarkPut/1K-4 100000 23738 ns/op 1673 B/op 5 allocs/op
BenchmarkPut/2K-4 50000 25118 ns/op 2826 B/op 5 allocs/op
BenchmarkPut/4K-4 50000 44605 ns/op 5389 B/op 5 allocs/op
BenchmarkPut/8K-4 30000 55237 ns/op 10001 B/op 5 allocs/op
BenchmarkPut/16K-4 20000 78966 ns/op 18972 B/op 5 allocs/op
BenchmarkPut/32K-4 10000 116253 ns/op 41520 B/op 5 allocs/op
``` ```
For 128B values:
* ~180,000 reads/sec * ~180,000 reads/sec
* ~60,000 writes/sec * ~60,000 writes/sec
The full benchmark above shows linear performance as you increase key/value sizes.
## License ## License

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"strings" "strings"
"sync"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -140,7 +141,7 @@ func TestMerge(t *testing.T) {
) )
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, MaxDatafileSize(1024)) db, err = Open(testdir, WithMaxDatafileSize(1024))
assert.NoError(err) assert.NoError(err)
}) })
@@ -198,6 +199,86 @@ func TestMerge(t *testing.T) {
}) })
} }
func TestConcurrent(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
err = db.Put("foo", []byte("bar"))
assert.NoError(err)
})
})
t.Run("Concurrent", func(t *testing.T) {
t.Run("Put", func(t *testing.T) {
f := func(wg *sync.WaitGroup, x int) {
defer func() {
wg.Done()
}()
for i := 0; i <= 100; i++ {
if i%x == 0 {
key := fmt.Sprintf("k%d", i)
value := []byte(fmt.Sprintf("v%d", i))
err := db.Put(key, value)
assert.NoError(err)
}
}
}
wg := &sync.WaitGroup{}
go f(wg, 2)
wg.Add(1)
go f(wg, 3)
wg.Add(1)
wg.Wait()
})
t.Run("Get", func(t *testing.T) {
f := func(wg *sync.WaitGroup, N int) {
defer func() {
wg.Done()
}()
for i := 0; i <= N; i++ {
value, err := db.Get("foo")
assert.NoError(err)
assert.Equal([]byte("bar"), value)
}
}
wg := &sync.WaitGroup{}
go f(wg, 100)
wg.Add(1)
go f(wg, 100)
wg.Add(1)
wg.Wait()
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestLocking(t *testing.T) { func TestLocking(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
@@ -213,6 +294,11 @@ func TestLocking(t *testing.T) {
assert.Equal("error: cannot acquire lock", err.Error()) assert.Equal("error: cannot acquire lock", err.Error())
} }
type benchmarkTestCase struct {
name string
size int
}
func BenchmarkGet(b *testing.B) { func BenchmarkGet(b *testing.B) {
testdir, err := ioutil.TempDir("", "bitcask") testdir, err := ioutil.TempDir("", "bitcask")
if err != nil { if err != nil {
@@ -225,20 +311,39 @@ func BenchmarkGet(b *testing.B) {
} }
defer db.Close() defer db.Close()
err = db.Put("foo", []byte("bar")) tests := []benchmarkTestCase{
if err != nil { {"128B", 128},
b.Fatal(err) {"256B", 256},
{"512B", 512},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
} }
b.ResetTimer() for _, tt := range tests {
for i := 0; i < b.N; i++ { b.Run(tt.name, func(b *testing.B) {
val, err := db.Get("foo") key := "foo"
if err != nil { value := []byte(strings.Repeat(" ", tt.size))
b.Fatal(err)
} err = db.Put(key, value)
if string(val) != "bar" { if err != nil {
b.Errorf("expected val=bar got=%s", val) b.Fatal(err)
} }
b.ResetTimer()
for i := 0; i < b.N; i++ {
val, err := db.Get(key)
if err != nil {
b.Fatal(err)
}
if string(val) != string(value) {
b.Errorf("unexpected value")
}
}
})
} }
} }
@@ -254,11 +359,29 @@ func BenchmarkPut(b *testing.B) {
} }
defer db.Close() defer db.Close()
b.ResetTimer() tests := []benchmarkTestCase{
for i := 0; i < b.N; i++ { {"128B", 128},
err := db.Put(fmt.Sprintf("key%d", i), []byte("bar")) {"256B", 256},
if err != nil { {"512B", 512},
b.Fatal(err) {"1K", 1024},
} {"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
key := "foo"
value := []byte(strings.Repeat(" ", tt.size))
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := db.Put(key, value)
if err != nil {
b.Fatal(err)
}
}
})
} }
} }

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time" "time"
pb "github.com/prologic/bitcask/proto" pb "github.com/prologic/bitcask/proto"
@@ -20,6 +21,8 @@ var (
) )
type Datafile struct { type Datafile struct {
sync.RWMutex
id int id int
r *os.File r *os.File
w *os.File w *os.File
@@ -102,17 +105,23 @@ func (df *Datafile) Size() (int64, error) {
return stat.Size(), nil return stat.Size(), nil
} }
func (df *Datafile) Read() (pb.Entry, error) { func (df *Datafile) Read() (e pb.Entry, err error) {
var e pb.Entry df.Lock()
defer df.Unlock()
return e, df.dec.Decode(&e) return e, df.dec.Decode(&e)
} }
func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) { func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) {
df.Lock()
defer df.Unlock()
_, err = df.r.Seek(index, os.SEEK_SET) _, err = df.r.Seek(index, os.SEEK_SET)
if err != nil { if err != nil {
return return
} }
return df.Read()
return e, df.dec.Decode(&e)
} }
func (df *Datafile) Write(e pb.Entry) (int64, error) { func (df *Datafile) Write(e pb.Entry) (int64, error) {
@@ -130,7 +139,10 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
e.Index = index e.Index = index
e.Timestamp = time.Now().Unix() e.Timestamp = time.Now().Unix()
df.Lock()
err = df.enc.Encode(&e) err = df.enc.Encode(&e)
df.Unlock()
if err != nil { if err != nil {
return -1, err return -1, err
} }

View File

@@ -16,26 +16,27 @@ const (
// NewEncoder creates a streaming protobuf encoder. // NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder { func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: w, prefixBuf: make([]byte, prefixSize)} return &Encoder{w}
} }
// Encoder wraps an underlying io.Writer and allows you to stream // Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it. // proto encodings on it.
type Encoder struct { type Encoder struct {
w io.Writer w io.Writer
prefixBuf []byte
} }
// Encode takes any proto.Message and streams it to the underlying writer. // Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix. // Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) error { func (e *Encoder) Encode(msg proto.Message) error {
prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg) buf, err := proto.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
binary.BigEndian.PutUint64(e.prefixBuf, uint64(len(buf))) binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(e.prefixBuf); err != nil { if _, err := e.w.Write(prefixBuf); err != nil {
return errors.Wrap(err, "failed writing length prefix") return errors.Wrap(err, "failed writing length prefix")
} }
@@ -45,28 +46,26 @@ func (e *Encoder) Encode(msg proto.Message) error {
// NewDecoder creates a streaming protobuf decoder. // NewDecoder creates a streaming protobuf decoder.
func NewDecoder(r io.Reader) *Decoder { func NewDecoder(r io.Reader) *Decoder {
return &Decoder{ return &Decoder{r: r}
r: r,
prefixBuf: make([]byte, prefixSize),
}
} }
// Decoder wraps an underlying io.Reader and allows you to stream // Decoder wraps an underlying io.Reader and allows you to stream
// proto decodings on it. // proto decodings on it.
type Decoder struct { type Decoder struct {
r io.Reader r io.Reader
prefixBuf []byte
} }
// Decode takes a proto.Message and unmarshals the next payload in the // Decode takes a proto.Message and unmarshals the next payload in the
// underlying io.Reader. It returns an EOF when it's done. // underlying io.Reader. It returns an EOF when it's done.
func (d *Decoder) Decode(v proto.Message) error { func (d *Decoder) Decode(v proto.Message) error {
_, err := io.ReadFull(d.r, d.prefixBuf) prefixBuf := make([]byte, prefixSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil { if err != nil {
return err return err
} }
n := binary.BigEndian.Uint64(d.prefixBuf) n := binary.BigEndian.Uint64(prefixBuf)
buf := make([]byte, n) buf := make([]byte, n)