Compare commits

..

10 Commits

12 changed files with 159 additions and 168 deletions

View File

@@ -97,31 +97,33 @@ Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
``` ```
$ make bench $ make bench
... ...
BenchmarkGet/128B-4 200000 5780 ns/op 400 B/op 5 allocs/op BenchmarkGet/128B-4 300000 5178 ns/op 400 B/op 5 allocs/op
BenchmarkGet/256B-4 200000 6138 ns/op 656 B/op 5 allocs/op BenchmarkGet/256B-4 300000 5273 ns/op 656 B/op 5 allocs/op
BenchmarkGet/512B-4 200000 5967 ns/op 1200 B/op 5 allocs/op BenchmarkGet/512B-4 200000 5368 ns/op 1200 B/op 5 allocs/op
BenchmarkGet/1K-4 200000 6290 ns/op 2288 B/op 5 allocs/op BenchmarkGet/1K-4 200000 5800 ns/op 2288 B/op 5 allocs/op
BenchmarkGet/2K-4 200000 6293 ns/op 4464 B/op 5 allocs/op BenchmarkGet/2K-4 200000 6766 ns/op 4464 B/op 5 allocs/op
BenchmarkGet/4K-4 200000 7673 ns/op 9072 B/op 5 allocs/op BenchmarkGet/4K-4 200000 7857 ns/op 9072 B/op 5 allocs/op
BenchmarkGet/8K-4 200000 10373 ns/op 17776 B/op 5 allocs/op BenchmarkGet/8K-4 200000 9538 ns/op 17776 B/op 5 allocs/op
BenchmarkGet/16K-4 100000 14227 ns/op 34928 B/op 5 allocs/op BenchmarkGet/16K-4 100000 13188 ns/op 34928 B/op 5 allocs/op
BenchmarkGet/32K-4 100000 25953 ns/op 73840 B/op 5 allocs/op BenchmarkGet/32K-4 100000 21620 ns/op 73840 B/op 5 allocs/op
BenchmarkPut/128B-4 100000 17353 ns/op 680 B/op 5 allocs/op BenchmarkPut/128B-4 200000 7875 ns/op 409 B/op 6 allocs/op
BenchmarkPut/256B-4 100000 18620 ns/op 808 B/op 5 allocs/op BenchmarkPut/256B-4 200000 8712 ns/op 538 B/op 6 allocs/op
BenchmarkPut/512B-4 100000 19068 ns/op 1096 B/op 5 allocs/op BenchmarkPut/512B-4 200000 9832 ns/op 829 B/op 6 allocs/op
BenchmarkPut/1K-4 100000 23738 ns/op 1673 B/op 5 allocs/op BenchmarkPut/1K-4 100000 13105 ns/op 1410 B/op 6 allocs/op
BenchmarkPut/2K-4 50000 25118 ns/op 2826 B/op 5 allocs/op BenchmarkPut/2K-4 100000 18601 ns/op 2572 B/op 6 allocs/op
BenchmarkPut/4K-4 50000 44605 ns/op 5389 B/op 5 allocs/op BenchmarkPut/4K-4 50000 36631 ns/op 5151 B/op 6 allocs/op
BenchmarkPut/8K-4 30000 55237 ns/op 10001 B/op 5 allocs/op BenchmarkPut/8K-4 30000 56128 ns/op 9798 B/op 6 allocs/op
BenchmarkPut/16K-4 20000 78966 ns/op 18972 B/op 5 allocs/op BenchmarkPut/16K-4 20000 83209 ns/op 18834 B/op 6 allocs/op
BenchmarkPut/32K-4 10000 116253 ns/op 41520 B/op 5 allocs/op BenchmarkPut/32K-4 10000 135899 ns/op 41517 B/op 6 allocs/op
BenchmarkScan-4 1000000 1851 ns/op 493 B/op 25 allocs/op
``` ```
For 128B values: For 128B values:
* ~180,000 reads/sec * ~200,000 reads/sec
* ~60,000 writes/sec * ~130,000 writes/sec
The full benchmark above shows linear performance as you increase key/value sizes. The full benchmark above shows linear performance as you increase key/value sizes.

View File

@@ -1,29 +1,22 @@
package bitcask package bitcask
import ( import (
"errors" "fmt"
"hash/crc32"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/derekparker/trie"
"github.com/gofrs/flock" "github.com/gofrs/flock"
) "github.com/prologic/trie"
var (
ErrKeyNotFound = errors.New("error: key not found")
ErrKeyTooLarge = errors.New("error: key too large")
ErrValueTooLarge = errors.New("error: value too large")
ErrDatabaseLocked = errors.New("error: database locked")
) )
type Bitcask struct { type Bitcask struct {
*flock.Flock *flock.Flock
opts Options config *config
path string path string
curr *Datafile curr *Datafile
keydir *Keydir keydir *Keydir
@@ -54,7 +47,7 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
item, ok := b.keydir.Get(key) item, ok := b.keydir.Get(key)
if !ok { if !ok {
return nil, ErrKeyNotFound return nil, fmt.Errorf("error: key not found %s", key)
} }
if item.FileID == b.curr.id { if item.FileID == b.curr.id {
@@ -63,28 +56,33 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
df = b.datafiles[item.FileID] df = b.datafiles[item.FileID]
} }
e, err := df.ReadAt(item.Index) e, err := df.ReadAt(item.Offset)
if err != nil { if err != nil {
return nil, err return nil, err
} }
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return nil, fmt.Errorf("error: checksum falied %s %d != %d", key, e.Checksum, checksum)
}
return e.Value, nil return e.Value, nil
} }
func (b *Bitcask) Put(key string, value []byte) error { func (b *Bitcask) Put(key string, value []byte) error {
if len(key) > b.opts.MaxKeySize { if len(key) > b.config.MaxKeySize {
return ErrKeyTooLarge return fmt.Errorf("error: key too large %d > %d", len(key), b.config.MaxKeySize)
} }
if len(value) > b.opts.MaxValueSize { if len(value) > b.config.MaxValueSize {
return ErrValueTooLarge return fmt.Errorf("error: value too large %d > %d", len(value), b.config.MaxValueSize)
} }
index, err := b.put(key, value) offset, err := b.put(key, value)
if err != nil { if err != nil {
return err return err
} }
item := b.keydir.Add(key, b.curr.id, index, time.Now().Unix()) item := b.keydir.Add(key, b.curr.id, offset)
b.trie.Add(key, item) b.trie.Add(key, item)
return nil return nil
@@ -134,6 +132,10 @@ func (b *Bitcask) put(key string, value []byte) (int64, error) {
} }
df, err := NewDatafile(b.path, b.curr.id, true) df, err := NewDatafile(b.path, b.curr.id, true)
if err != nil {
return -1, err
}
b.datafiles = append(b.datafiles, df) b.datafiles = append(b.datafiles, df)
id := b.curr.id + 1 id := b.curr.id + 1
@@ -212,7 +214,7 @@ func Merge(path string, force bool) error {
continue continue
} }
keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) keydir.Add(e.Key, ids[i], e.Offset)
} }
tempdf, err := NewDatafile(temp, id, false) tempdf, err := NewDatafile(temp, id, false)
@@ -223,7 +225,7 @@ func Merge(path string, force bool) error {
for key := range keydir.Keys() { for key := range keydir.Keys() {
item, _ := keydir.Get(key) item, _ := keydir.Get(key)
e, err := df.ReadAt(item.Index) e, err := df.ReadAt(item.Offset)
if err != nil { if err != nil {
return err return err
} }
@@ -259,7 +261,7 @@ func Merge(path string, force bool) error {
return nil return nil
} }
func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { func Open(path string, options ...option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil { if err := os.MkdirAll(path, 0755); err != nil {
return nil, err return nil, err
} }
@@ -305,7 +307,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
for key := range hint.Keys() { for key := range hint.Keys() {
item, _ := hint.Get(key) item, _ := hint.Get(key)
_ = keydir.Add(key, item.FileID, item.Index, item.Timestamp) _ = keydir.Add(key, item.FileID, item.Offset)
trie.Add(key, item) trie.Add(key, item)
} }
} else { } else {
@@ -324,7 +326,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
continue continue
} }
item := keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) item := keydir.Add(e.Key, ids[i], e.Offset)
trie.Add(e.Key, item) trie.Add(e.Key, item)
} }
} }
@@ -342,7 +344,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
bitcask := &Bitcask{ bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")), Flock: flock.New(filepath.Join(path, "lock")),
opts: NewDefaultOptions(), config: NewDefaultConfig(),
path: path, path: path,
curr: curr, curr: curr,
keydir: keydir, keydir: keydir,
@@ -352,8 +354,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
maxDatafileSize: DefaultMaxDatafileSize, maxDatafileSize: DefaultMaxDatafileSize,
} }
for _, option := range options { for _, opt := range options {
err = option(bitcask) err = opt(bitcask.config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -365,7 +367,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) {
} }
if !locked { if !locked {
return nil, ErrDatabaseLocked return nil, fmt.Errorf("error: database locked %s", path)
} }
return bitcask, nil return bitcask, nil

View File

@@ -45,7 +45,7 @@ func TestAll(t *testing.T) {
assert.NoError(err) assert.NoError(err)
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal(err.Error(), "error: key not found") assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Sync", func(t *testing.T) { t.Run("Sync", func(t *testing.T) {
@@ -92,7 +92,7 @@ func TestDeletedKeys(t *testing.T) {
assert.NoError(err) assert.NoError(err)
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal("error: key not found", err.Error()) assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Sync", func(t *testing.T) { t.Run("Sync", func(t *testing.T) {
@@ -120,7 +120,7 @@ func TestDeletedKeys(t *testing.T) {
t.Run("Get", func(t *testing.T) { t.Run("Get", func(t *testing.T) {
_, err = db.Get("foo") _, err = db.Get("foo")
assert.Error(err) assert.Error(err)
assert.Equal("error: key not found", err.Error()) assert.Equal("error: key not found foo", err.Error())
}) })
t.Run("Close", func(t *testing.T) { t.Run("Close", func(t *testing.T) {
@@ -138,19 +138,17 @@ func TestMaxKeySize(t *testing.T) {
var db *Bitcask var db *Bitcask
size := 16
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxKeySize(size)) db, err = Open(testdir, WithMaxKeySize(16))
assert.NoError(err) assert.NoError(err)
}) })
t.Run("Put", func(t *testing.T) { t.Run("Put", func(t *testing.T) {
key := strings.Repeat(" ", size+1) key := strings.Repeat(" ", 17)
value := []byte("foobar") value := []byte("foobar")
err = db.Put(key, value) err = db.Put(key, value)
assert.Error(err) assert.Error(err)
assert.Equal("error: key too large", err.Error()) assert.Equal("error: key too large 17 > 16", err.Error())
}) })
} }
@@ -162,19 +160,17 @@ func TestMaxValueSize(t *testing.T) {
var db *Bitcask var db *Bitcask
size := 16
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxValueSize(size)) db, err = Open(testdir, WithMaxValueSize(16))
assert.NoError(err) assert.NoError(err)
}) })
t.Run("Put", func(t *testing.T) { t.Run("Put", func(t *testing.T) {
key := "foo" key := "foo"
value := []byte(strings.Repeat(" ", size+1)) value := []byte(strings.Repeat(" ", 17))
err = db.Put(key, value) err = db.Put(key, value)
assert.Error(err) assert.Error(err)
assert.Equal("error: value too large", err.Error()) assert.Equal("error: value too large 17 > 16", err.Error())
}) })
} }
@@ -291,10 +287,9 @@ func TestConcurrent(t *testing.T) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
go f(wg, 2) go f(wg, 2)
wg.Add(1)
go f(wg, 3) go f(wg, 3)
wg.Add(1) go f(wg, 5)
wg.Add(3)
wg.Wait() wg.Wait()
}) })
@@ -314,10 +309,9 @@ func TestConcurrent(t *testing.T) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
go f(wg, 100) go f(wg, 100)
wg.Add(1)
go f(wg, 100) go f(wg, 100)
wg.Add(1) go f(wg, 100)
wg.Add(3)
wg.Wait() wg.Wait()
}) })
@@ -393,7 +387,7 @@ func TestLocking(t *testing.T) {
_, err = Open(testdir) _, err = Open(testdir)
assert.Error(err) assert.Error(err)
assert.Equal("error: database locked", err.Error()) assert.Equal(fmt.Sprintf("error: database locked %s", testdir), err.Error())
} }
type benchmarkTestCase struct { type benchmarkTestCase struct {

View File

@@ -1,12 +1,12 @@
package bitcask package bitcask
import ( import (
"errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/pkg/errors"
pb "github.com/prologic/bitcask/proto" pb "github.com/prologic/bitcask/proto"
"github.com/prologic/bitcask/streampb" "github.com/prologic/bitcask/streampb"
@@ -23,11 +23,12 @@ var (
type Datafile struct { type Datafile struct {
sync.RWMutex sync.RWMutex
id int id int
r *os.File r *os.File
w *os.File w *os.File
dec *streampb.Decoder offset int64
enc *streampb.Encoder dec *streampb.Decoder
enc *streampb.Encoder
} }
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) { func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
@@ -50,16 +51,23 @@ func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
offset := stat.Size()
dec := streampb.NewDecoder(r) dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w) enc := streampb.NewEncoder(w)
return &Datafile{ return &Datafile{
id: id, id: id,
r: r, r: r,
w: w, w: w,
dec: dec, offset: offset,
enc: enc, dec: dec,
enc: enc,
}, nil }, nil
} }
@@ -87,22 +95,9 @@ func (df *Datafile) Sync() error {
} }
func (df *Datafile) Size() (int64, error) { func (df *Datafile) Size() (int64, error) {
var ( df.RLock()
stat os.FileInfo defer df.RUnlock()
err error return df.offset, nil
)
if df.w == nil {
stat, err = df.r.Stat()
} else {
stat, err = df.w.Stat()
}
if err != nil {
return -1, err
}
return stat.Size(), nil
} }
func (df *Datafile) Read() (e pb.Entry, err error) { func (df *Datafile) Read() (e pb.Entry, err error) {
@@ -129,23 +124,16 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
return -1, ErrReadonly return -1, ErrReadonly
} }
stat, err := df.w.Stat()
if err != nil {
return -1, err
}
index := stat.Size()
e.Index = index
e.Timestamp = time.Now().Unix()
df.Lock() df.Lock()
err = df.enc.Encode(&e) defer df.Unlock()
df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(&e)
if err != nil { if err != nil {
return -1, err return -1, err
} }
df.offset += n
return index, nil return e.Offset, nil
} }

View File

@@ -7,11 +7,11 @@ import (
) )
func NewEntry(key string, value []byte) pb.Entry { func NewEntry(key string, value []byte) pb.Entry {
crc := crc32.ChecksumIEEE(value) checksum := crc32.ChecksumIEEE(value)
return pb.Entry{ return pb.Entry{
CRC: crc, Checksum: checksum,
Key: key, Key: key,
Value: value, Value: value,
} }
} }

1
go.mod
View File

@@ -10,6 +10,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
github.com/prologic/msgbus v0.1.1 github.com/prologic/msgbus v0.1.1
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705
github.com/prometheus/client_golang v0.9.2 // indirect github.com/prometheus/client_golang v0.9.2 // indirect
github.com/sirupsen/logrus v1.3.0 github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3 github.com/spf13/cobra v0.0.3

2
go.sum
View File

@@ -42,6 +42,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk= github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705 h1:2J+cSlAeECj0lfMKSmM7n5OlIio+yLovaKLZJzwLc6U=
github.com/prologic/trie v0.0.0-20190316011403-395e39dac705/go.mod h1:LFuDmpHJGmciXd8Rl5YMhVlLMps9gz2GtYLzwxrFhzs=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=

View File

@@ -9,9 +9,8 @@ import (
) )
type Item struct { type Item struct {
FileID int FileID int
Index int64 Offset int64
Timestamp int64
} }
type Keydir struct { type Keydir struct {
@@ -25,11 +24,10 @@ func NewKeydir() *Keydir {
} }
} }
func (k *Keydir) Add(key string, fileid int, index, timestamp int64) Item { func (k *Keydir) Add(key string, fileid int, offset int64) Item {
item := Item{ item := Item{
FileID: fileid, FileID: fileid,
Index: index, Offset: offset,
Timestamp: timestamp,
} }
k.Lock() k.Lock()

View File

@@ -6,37 +6,42 @@ const (
DefaultMaxValueSize = 1 << 16 // 65KB DefaultMaxValueSize = 1 << 16 // 65KB
) )
type Options struct { // Option ...
type Option option
type option func(*config) error
type config struct {
MaxDatafileSize int MaxDatafileSize int
MaxKeySize int MaxKeySize int
MaxValueSize int MaxValueSize int
} }
func NewDefaultOptions() Options { func NewDefaultConfig() *config {
return Options{ return &config{
MaxDatafileSize: DefaultMaxDatafileSize, MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize, MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize, MaxValueSize: DefaultMaxValueSize,
} }
} }
func WithMaxDatafileSize(size int) func(*Bitcask) error { func WithMaxDatafileSize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxDatafileSize = size cfg.MaxDatafileSize = size
return nil return nil
} }
} }
func WithMaxKeySize(size int) func(*Bitcask) error { func WithMaxKeySize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxKeySize = size cfg.MaxKeySize = size
return nil return nil
} }
} }
func WithMaxValueSize(size int) func(*Bitcask) error { func WithMaxValueSize(size int) option {
return func(b *Bitcask) error { return func(cfg *config) error {
b.opts.MaxValueSize = size cfg.MaxValueSize = size
return nil return nil
} }
} }

View File

@@ -19,11 +19,10 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct { type Entry struct {
CRC uint32 `protobuf:"varint,1,opt,name=CRC,proto3" json:"CRC,omitempty"` Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Index int64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"` Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"` Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
Timestamp int64 `protobuf:"varint,5,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@@ -33,7 +32,7 @@ func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) } func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {} func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) { func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_4f5906245d08394f, []int{0} return fileDescriptor_entry_3e91842c99935ae2, []int{0}
} }
func (m *Entry) XXX_Unmarshal(b []byte) error { func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b) return xxx_messageInfo_Entry.Unmarshal(m, b)
@@ -53,9 +52,9 @@ func (m *Entry) XXX_DiscardUnknown() {
var xxx_messageInfo_Entry proto.InternalMessageInfo var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetCRC() uint32 { func (m *Entry) GetChecksum() uint32 {
if m != nil { if m != nil {
return m.CRC return m.Checksum
} }
return 0 return 0
} }
@@ -67,9 +66,9 @@ func (m *Entry) GetKey() string {
return "" return ""
} }
func (m *Entry) GetIndex() int64 { func (m *Entry) GetOffset() int64 {
if m != nil { if m != nil {
return m.Index return m.Offset
} }
return 0 return 0
} }
@@ -81,28 +80,20 @@ func (m *Entry) GetValue() []byte {
return nil return nil
} }
func (m *Entry) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func init() { func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry") proto.RegisterType((*Entry)(nil), "proto.Entry")
} }
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_4f5906245d08394f) } func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_3e91842c99935ae2) }
var fileDescriptor_entry_4f5906245d08394f = []byte{ var fileDescriptor_entry_3e91842c99935ae2 = []byte{
// 134 bytes of a gzipped FileDescriptorProto // 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xa5, 0x5c, 0xac, 0xae, 0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x01, 0x2e, 0x66, 0xe7, 0x20, 0x67, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x10, 0x13, 0x24, 0xe2, 0x9d, 0x5a, 0x29, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x89, 0x70, 0xb1, 0x7a, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0x30, 0x07, 0x41, 0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x38, 0x20, 0xd1, 0xb0, 0xc4, 0x9c, 0xd2, 0x54, 0x09, 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x08, 0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x47, 0x48, 0x86, 0x8b, 0x33, 0x24, 0x33, 0x37, 0xb5, 0xb8, 0x24, 0x31, 0xb7, 0x40, 0x82, 0x15, 0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0xac, 0x1e, 0x21, 0x90, 0xc4, 0x06, 0xb6, 0xdd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x07, 0x99, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
0x47, 0xb9, 0x93, 0x00, 0x00, 0x00,
} }

View File

@@ -3,9 +3,8 @@ syntax = "proto3";
package proto; package proto;
message Entry { message Entry {
uint32 CRC = 1; uint32 Checksum = 1;
string Key = 2; string Key = 2;
int64 Index = 3; int64 Offset = 3;
bytes Value = 4; bytes Value = 4;
int64 Timestamp = 5;
} }

View File

@@ -1,6 +1,7 @@
package streampb package streampb
import ( import (
"bufio"
"encoding/binary" "encoding/binary"
"io" "io"
@@ -16,32 +17,40 @@ const (
// NewEncoder creates a streaming protobuf encoder. // NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder { func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w} return &Encoder{w: bufio.NewWriter(w)}
} }
// Encoder wraps an underlying io.Writer and allows you to stream // Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it. // proto encodings on it.
type Encoder struct { type Encoder struct {
w io.Writer w *bufio.Writer
} }
// Encode takes any proto.Message and streams it to the underlying writer. // Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix. // Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) error { func (e *Encoder) Encode(msg proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize) prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg) buf, err := proto.Marshal(msg)
if err != nil { if err != nil {
return err return 0, err
} }
binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf))) binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(prefixBuf); err != nil { if _, err := e.w.Write(prefixBuf); err != nil {
return errors.Wrap(err, "failed writing length prefix") return 0, errors.Wrap(err, "failed writing length prefix")
} }
_, err = e.w.Write(buf) n, err := e.w.Write(buf)
return errors.Wrap(err, "failed writing marshaled data") if err != nil {
return 0, errors.Wrap(err, "failed writing marshaled data")
}
if err = e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(n + prefixSize), nil
} }
// NewDecoder creates a streaming protobuf decoder. // NewDecoder creates a streaming protobuf decoder.