From 711d08ce916355d7d83fca8fa8450f6a6d179fd5 Mon Sep 17 00:00:00 2001 From: James Mills <1290234+prologic@users.noreply.github.com> Date: Sat, 23 Mar 2019 12:14:15 +1000 Subject: [PATCH] Increased read performance by ~3-4x by removing another unnecessary I/O operation (Seek) --- .drone.yml | 2 +- Makefile | 10 ++++++--- README.md | 40 ++++++++++++++++----------------- bitcask.go | 30 ++++++++++++------------- internal/datafile.go | 45 ++++++++++++++++++++++++------------- internal/keydir.go | 4 +++- internal/streampb/stream.go | 8 +++---- 7 files changed, 80 insertions(+), 59 deletions(-) diff --git a/.drone.yml b/.drone.yml index 46e8071..52ffc10 100644 --- a/.drone.yml +++ b/.drone.yml @@ -5,7 +5,7 @@ steps: - name: build image: golang:latest commands: - - go test -v -short -cover -coverprofile=coverage.txt -coverpkg=$(go list) ./... + - go test -v -short -cover -coverprofile=coverage.txt -coverpkg=$(go list) . - name: coverage image: plugins/codecov diff --git a/Makefile b/Makefile index a35a867..7f33076 100644 --- a/Makefile +++ b/Makefile @@ -34,13 +34,17 @@ release: @./tools/release.sh profile: build - @go test -cpuprofile cpu.prof -memprofile mem.prof -v -bench ./... + @go test -cpuprofile cpu.prof -memprofile mem.prof -v -bench . bench: build - @go test -v -benchmem -bench=. ./... + @go test -v -benchmem -bench=. . test: build - @go test -v -cover -coverprofile=coverage.txt -covermode=atomic -coverpkg=$(shell go list) -race ./... + @go test -v \ + -cover -coverprofile=coverage.txt -covermode=atomic \ + -coverpkg=$(shell go list) \ + -race \ + . clean: @git clean -f -d -X diff --git a/README.md b/README.md index 64cc0e5..788ba18 100644 --- a/README.md +++ b/README.md @@ -94,32 +94,32 @@ Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7: ``` $ make bench ... -BenchmarkGet/128B-4 300000 5178 ns/op 400 B/op 5 allocs/op -BenchmarkGet/256B-4 300000 5273 ns/op 656 B/op 5 allocs/op -BenchmarkGet/512B-4 200000 5368 ns/op 1200 B/op 5 allocs/op -BenchmarkGet/1K-4 200000 5800 ns/op 2288 B/op 5 allocs/op -BenchmarkGet/2K-4 200000 6766 ns/op 4464 B/op 5 allocs/op -BenchmarkGet/4K-4 200000 7857 ns/op 9072 B/op 5 allocs/op -BenchmarkGet/8K-4 200000 9538 ns/op 17776 B/op 5 allocs/op -BenchmarkGet/16K-4 100000 13188 ns/op 34928 B/op 5 allocs/op -BenchmarkGet/32K-4 100000 21620 ns/op 73840 B/op 5 allocs/op +BenchmarkGet/128B-4 300000 3737 ns/op 1632 B/op 16 allocs/op +BenchmarkGet/256B-4 300000 4183 ns/op 2016 B/op 16 allocs/op +BenchmarkGet/512B-4 300000 4295 ns/op 2848 B/op 16 allocs/op +BenchmarkGet/1K-4 300000 4455 ns/op 4512 B/op 16 allocs/op +BenchmarkGet/2K-4 300000 5536 ns/op 7841 B/op 16 allocs/op +BenchmarkGet/4K-4 200000 7101 ns/op 15010 B/op 16 allocs/op +BenchmarkGet/8K-4 200000 10664 ns/op 28325 B/op 16 allocs/op +BenchmarkGet/16K-4 100000 18173 ns/op 54442 B/op 16 allocs/op +BenchmarkGet/32K-4 50000 33081 ns/op 115893 B/op 16 allocs/op -BenchmarkPut/128B-4 200000 7875 ns/op 409 B/op 6 allocs/op -BenchmarkPut/256B-4 200000 8712 ns/op 538 B/op 6 allocs/op -BenchmarkPut/512B-4 200000 9832 ns/op 829 B/op 6 allocs/op -BenchmarkPut/1K-4 100000 13105 ns/op 1410 B/op 6 allocs/op -BenchmarkPut/2K-4 100000 18601 ns/op 2572 B/op 6 allocs/op -BenchmarkPut/4K-4 50000 36631 ns/op 5151 B/op 6 allocs/op -BenchmarkPut/8K-4 30000 56128 ns/op 9798 B/op 6 allocs/op -BenchmarkPut/16K-4 20000 83209 ns/op 18834 B/op 6 allocs/op -BenchmarkPut/32K-4 10000 135899 ns/op 41517 B/op 6 allocs/op +BenchmarkPut/128B-4 200000 7967 ns/op 409 B/op 6 allocs/op +BenchmarkPut/256B-4 200000 8563 ns/op 538 B/op 6 allocs/op +BenchmarkPut/512B-4 200000 9678 ns/op 829 B/op 6 allocs/op +BenchmarkPut/1K-4 200000 12786 ns/op 1410 B/op 6 allocs/op +BenchmarkPut/2K-4 100000 18582 ns/op 2572 B/op 6 allocs/op +BenchmarkPut/4K-4 50000 35335 ns/op 5151 B/op 6 allocs/op +BenchmarkPut/8K-4 30000 56047 ns/op 9797 B/op 6 allocs/op +BenchmarkPut/16K-4 20000 86137 ns/op 18834 B/op 6 allocs/op +BenchmarkPut/32K-4 10000 140162 ns/op 41517 B/op 6 allocs/op -BenchmarkScan-4 1000000 1851 ns/op 493 B/op 25 allocs/op +BenchmarkScan-4 1000000 1885 ns/op 493 B/op 25 allocs/op ``` For 128B values: -* ~200,000 reads/sec +* ~270,000 reads/sec * ~130,000 writes/sec The full benchmark above shows linear performance as you increase key/value sizes. diff --git a/bitcask.go b/bitcask.go index 80c3ca5..ac2f29e 100644 --- a/bitcask.go +++ b/bitcask.go @@ -89,7 +89,7 @@ func (b *Bitcask) Get(key string) ([]byte, error) { df = b.datafiles[item.FileID] } - e, err := df.ReadAt(item.Offset) + e, err := df.ReadAt(item.Offset, item.Size) if err != nil { return nil, err } @@ -117,12 +117,12 @@ func (b *Bitcask) Put(key string, value []byte) error { return ErrValueTooLarge } - offset, err := b.put(key, value) + offset, n, err := b.put(key, value) if err != nil { return err } - item := b.keydir.Add(key, b.curr.FileID(), offset) + item := b.keydir.Add(key, b.curr.FileID(), offset, n) b.trie.Add(key, item) return nil @@ -131,7 +131,7 @@ func (b *Bitcask) Put(key string, value []byte) error { // Delete deletes the named key. If the key doesn't exist or an I/O error // occurs the error is returned. func (b *Bitcask) Delete(key string) error { - _, err := b.put(key, []byte{}) + _, _, err := b.put(key, []byte{}) if err != nil { return err } @@ -177,7 +177,7 @@ func (b *Bitcask) Fold(f func(key string) error) error { return nil } -func (b *Bitcask) put(key string, value []byte) (int64, error) { +func (b *Bitcask) put(key string, value []byte) (int64, int64, error) { b.mu.Lock() defer b.mu.Unlock() @@ -185,12 +185,12 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) { if size >= int64(b.config.maxDatafileSize) { err := b.curr.Close() if err != nil { - return -1, err + return -1, 0, err } df, err := internal.NewDatafile(b.path, b.curr.FileID(), true) if err != nil { - return -1, err + return -1, 0, err } b.datafiles = append(b.datafiles, df) @@ -198,7 +198,7 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) { id := b.curr.FileID() + 1 curr, err := internal.NewDatafile(b.path, id, false) if err != nil { - return -1, err + return -1, 0, err } b.curr = curr } @@ -255,7 +255,7 @@ func Merge(path string, force bool) error { defer df.Close() for { - e, err := df.Read() + e, n, err := df.Read() if err != nil { if err == io.EOF { break @@ -269,7 +269,7 @@ func Merge(path string, force bool) error { continue } - keydir.Add(e.Key, ids[i], e.Offset) + keydir.Add(e.Key, ids[i], e.Offset, n) } tempdf, err := internal.NewDatafile(temp, id, false) @@ -280,12 +280,12 @@ func Merge(path string, force bool) error { for key := range keydir.Keys() { item, _ := keydir.Get(key) - e, err := df.ReadAt(item.Offset) + e, err := df.ReadAt(item.Offset, item.Size) if err != nil { return err } - _, err = tempdf.Write(e) + _, _, err = tempdf.Write(e) if err != nil { return err } @@ -365,12 +365,12 @@ func Open(path string, options ...Option) (*Bitcask, error) { for key := range hint.Keys() { item, _ := hint.Get(key) - _ = keydir.Add(key, item.FileID, item.Offset) + _ = keydir.Add(key, item.FileID, item.Offset, item.Size) trie.Add(key, item) } } else { for { - e, err := df.Read() + e, n, err := df.Read() if err != nil { if err == io.EOF { break @@ -384,7 +384,7 @@ func Open(path string, options ...Option) (*Bitcask, error) { continue } - item := keydir.Add(e.Key, ids[i], e.Offset) + item := keydir.Add(e.Key, ids[i], e.Offset, n) trie.Add(e.Key, item) } } diff --git a/internal/datafile.go b/internal/datafile.go index 8dc2ad9..8ba35ae 100644 --- a/internal/datafile.go +++ b/internal/datafile.go @@ -1,12 +1,14 @@ package internal import ( + "bytes" "fmt" "os" "path/filepath" "sync" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" pb "github.com/prologic/bitcask/internal/proto" "github.com/prologic/bitcask/internal/streampb" @@ -17,7 +19,8 @@ const ( ) var ( - ErrReadonly = errors.New("error: read only datafile") + ErrReadonly = errors.New("error: read only datafile") + ErrReadError = errors.New("error: read error") ) type Datafile struct { @@ -104,28 +107,40 @@ func (df *Datafile) Size() int64 { return df.offset } -func (df *Datafile) Read() (e pb.Entry, err error) { +func (df *Datafile) Read() (e pb.Entry, n int64, err error) { df.Lock() defer df.Unlock() - return e, df.dec.Decode(&e) -} - -func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) { - df.Lock() - defer df.Unlock() - - _, err = df.r.Seek(index, os.SEEK_SET) + n, err = df.dec.Decode(&e) if err != nil { return } - return e, df.dec.Decode(&e) + return } -func (df *Datafile) Write(e pb.Entry) (int64, error) { +func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) { + log.WithField("index", index).WithField("size", size).Debug("ReadAt") + + b := make([]byte, size) + n, err := df.r.ReadAt(b, index) + if err != nil { + return + } + if int64(n) != size { + err = ErrReadError + return + } + + buf := bytes.NewBuffer(b) + dec := streampb.NewDecoder(buf) + _, err = dec.Decode(&e) + return +} + +func (df *Datafile) Write(e pb.Entry) (int64, int64, error) { if df.w == nil { - return -1, ErrReadonly + return -1, 0, ErrReadonly } df.Lock() @@ -135,9 +150,9 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) { n, err := df.enc.Encode(&e) if err != nil { - return -1, err + return -1, 0, err } df.offset += n - return e.Offset, nil + return e.Offset, n, nil } diff --git a/internal/keydir.go b/internal/keydir.go index 1f60c4c..6e94fb3 100644 --- a/internal/keydir.go +++ b/internal/keydir.go @@ -11,6 +11,7 @@ import ( type Item struct { FileID int Offset int64 + Size int64 } type Keydir struct { @@ -24,10 +25,11 @@ func NewKeydir() *Keydir { } } -func (k *Keydir) Add(key string, fileid int, offset int64) Item { +func (k *Keydir) Add(key string, fileid int, offset, size int64) Item { item := Item{ FileID: fileid, Offset: offset, + Size: size, } k.Lock() diff --git a/internal/streampb/stream.go b/internal/streampb/stream.go index 495d918..37791f4 100644 --- a/internal/streampb/stream.go +++ b/internal/streampb/stream.go @@ -66,12 +66,12 @@ type Decoder struct { // Decode takes a proto.Message and unmarshals the next payload in the // underlying io.Reader. It returns an EOF when it's done. -func (d *Decoder) Decode(v proto.Message) error { +func (d *Decoder) Decode(v proto.Message) (int64, error) { prefixBuf := make([]byte, prefixSize) _, err := io.ReadFull(d.r, prefixBuf) if err != nil { - return err + return 0, err } n := binary.BigEndian.Uint64(prefixBuf) @@ -82,11 +82,11 @@ func (d *Decoder) Decode(v proto.Message) error { for idx < n { m, err := d.r.Read(buf[idx:n]) if err != nil { - return errors.Wrap(translateError(err), "failed reading marshaled data") + return 0, errors.Wrap(translateError(err), "failed reading marshaled data") } idx += uint64(m) } - return proto.Unmarshal(buf[:n], v) + return int64(idx + prefixSize), proto.Unmarshal(buf[:n], v) } func translateError(err error) error {