1
0
mirror of https://github.com/taigrr/bitcask synced 2025-01-18 04:03:17 -08:00

Fix concurrent read bug

This commit is contained in:
James Mills 2019-03-14 17:57:24 +10:00
parent e0c4c4fdae
commit 3b9627aeb8
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
3 changed files with 52 additions and 36 deletions

View File

@ -200,41 +200,29 @@ func TestMerge(t *testing.T) {
} }
func TestConcurrent(t *testing.T) { func TestConcurrent(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t) assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask") testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err) assert.NoError(err)
t.Run("Setup", func(t *testing.T) { t.Run("Setup", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) { t.Run("Open", func(t *testing.T) {
db, err = Open(testdir) db, err = Open(testdir)
assert.NoError(err) assert.NoError(err)
}) })
t.Run("Put", func(t *testing.T) { t.Run("Put", func(t *testing.T) {
for i := 0; i < 1024; i++ { err = db.Put("foo", []byte("bar"))
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err) assert.NoError(err)
}
}) })
}) })
t.Run("Concurrent", func(t *testing.T) { t.Run("Concurrent", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) { t.Run("Put", func(t *testing.T) {
f := func(wg *sync.WaitGroup, x int) { f := func(wg *sync.WaitGroup, x int) {
defer func() { defer func() {
@ -261,6 +249,29 @@ func TestConcurrent(t *testing.T) {
wg.Wait() wg.Wait()
}) })
t.Run("Get", func(t *testing.T) {
f := func(wg *sync.WaitGroup, N int) {
defer func() {
wg.Done()
}()
for i := 0; i <= N; i++ {
value, err := db.Get("foo")
assert.NoError(err)
assert.Equal([]byte("bar"), value)
}
}
wg := &sync.WaitGroup{}
go f(wg, 100)
wg.Add(1)
go f(wg, 100)
wg.Add(1)
wg.Wait()
})
t.Run("Close", func(t *testing.T) { t.Run("Close", func(t *testing.T) {
err = db.Close() err = db.Close()
assert.NoError(err) assert.NoError(err)

View File

@ -21,7 +21,7 @@ var (
) )
type Datafile struct { type Datafile struct {
sync.Mutex sync.RWMutex
id int id int
r *os.File r *os.File
@ -105,17 +105,23 @@ func (df *Datafile) Size() (int64, error) {
return stat.Size(), nil return stat.Size(), nil
} }
func (df *Datafile) Read() (pb.Entry, error) { func (df *Datafile) Read() (e pb.Entry, err error) {
var e pb.Entry df.Lock()
defer df.Unlock()
return e, df.dec.Decode(&e) return e, df.dec.Decode(&e)
} }
func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) { func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) {
df.Lock()
defer df.Unlock()
_, err = df.r.Seek(index, os.SEEK_SET) _, err = df.r.Seek(index, os.SEEK_SET)
if err != nil { if err != nil {
return return
} }
return df.Read()
return e, df.dec.Decode(&e)
} }
func (df *Datafile) Write(e pb.Entry) (int64, error) { func (df *Datafile) Write(e pb.Entry) (int64, error) {

View File

@ -16,26 +16,27 @@ const (
// NewEncoder creates a streaming protobuf encoder. // NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder { func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: w, prefixBuf: make([]byte, prefixSize)} return &Encoder{w}
} }
// Encoder wraps an underlying io.Writer and allows you to stream // Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it. // proto encodings on it.
type Encoder struct { type Encoder struct {
w io.Writer w io.Writer
prefixBuf []byte
} }
// Encode takes any proto.Message and streams it to the underlying writer. // Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix. // Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) error { func (e *Encoder) Encode(msg proto.Message) error {
prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg) buf, err := proto.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
binary.BigEndian.PutUint64(e.prefixBuf, uint64(len(buf))) binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(e.prefixBuf); err != nil { if _, err := e.w.Write(prefixBuf); err != nil {
return errors.Wrap(err, "failed writing length prefix") return errors.Wrap(err, "failed writing length prefix")
} }
@ -45,28 +46,26 @@ func (e *Encoder) Encode(msg proto.Message) error {
// NewDecoder creates a streaming protobuf decoder. // NewDecoder creates a streaming protobuf decoder.
func NewDecoder(r io.Reader) *Decoder { func NewDecoder(r io.Reader) *Decoder {
return &Decoder{ return &Decoder{r: r}
r: r,
prefixBuf: make([]byte, prefixSize),
}
} }
// Decoder wraps an underlying io.Reader and allows you to stream // Decoder wraps an underlying io.Reader and allows you to stream
// proto decodings on it. // proto decodings on it.
type Decoder struct { type Decoder struct {
r io.Reader r io.Reader
prefixBuf []byte
} }
// Decode takes a proto.Message and unmarshals the next payload in the // Decode takes a proto.Message and unmarshals the next payload in the
// underlying io.Reader. It returns an EOF when it's done. // underlying io.Reader. It returns an EOF when it's done.
func (d *Decoder) Decode(v proto.Message) error { func (d *Decoder) Decode(v proto.Message) error {
_, err := io.ReadFull(d.r, d.prefixBuf) prefixBuf := make([]byte, prefixSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil { if err != nil {
return err return err
} }
n := binary.BigEndian.Uint64(d.prefixBuf) n := binary.BigEndian.Uint64(prefixBuf)
buf := make([]byte, n) buf := make([]byte, n)