From e0c4c4fdae236de2dce762d2f78f4d8eccbf24e3 Mon Sep 17 00:00:00 2001 From: James Mills <1290234+prologic@users.noreply.github.com> Date: Thu, 14 Mar 2019 08:07:02 +1000 Subject: [PATCH] Fix concurrent write bug with multiple goroutines writing to the to the active datafile --- bitcask_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++++++++ datafile.go | 6 +++++ 2 files changed, 76 insertions(+) diff --git a/bitcask_test.go b/bitcask_test.go index 963e1c9..7e8958e 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "strings" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -198,6 +199,75 @@ func TestMerge(t *testing.T) { }) } +func TestConcurrent(t *testing.T) { + assert := assert.New(t) + + testdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + + t.Run("Setup", func(t *testing.T) { + var ( + db *Bitcask + err error + ) + + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + for i := 0; i < 1024; i++ { + err = db.Put(string(i), []byte(strings.Repeat(" ", 1024))) + assert.NoError(err) + } + }) + }) + + t.Run("Concurrent", func(t *testing.T) { + var ( + db *Bitcask + err error + ) + + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + f := func(wg *sync.WaitGroup, x int) { + defer func() { + wg.Done() + }() + for i := 0; i <= 100; i++ { + if i%x == 0 { + key := fmt.Sprintf("k%d", i) + value := []byte(fmt.Sprintf("v%d", i)) + err := db.Put(key, value) + assert.NoError(err) + } + } + } + + wg := &sync.WaitGroup{} + + go f(wg, 2) + wg.Add(1) + + go f(wg, 3) + wg.Add(1) + + wg.Wait() + }) + + t.Run("Close", func(t *testing.T) { + err = db.Close() + assert.NoError(err) + }) + }) +} + func TestLocking(t *testing.T) { assert := assert.New(t) diff --git a/datafile.go b/datafile.go index 0ebabd0..8e4fa0d 100644 --- a/datafile.go +++ b/datafile.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "time" pb "github.com/prologic/bitcask/proto" @@ -20,6 +21,8 @@ var ( ) type Datafile struct { + sync.Mutex + id int r *os.File w *os.File @@ -130,7 +133,10 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) { e.Index = index e.Timestamp = time.Now().Unix() + df.Lock() err = df.enc.Encode(&e) + df.Unlock() + if err != nil { return -1, err }