mirror of
https://github.com/taigrr/bitcask
synced 2025-01-18 04:03:17 -08:00
Fix concurrent write bug with multiple goroutines writing to the to the active datafile
This commit is contained in:
parent
fb50eb2f82
commit
e0c4c4fdae
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -198,6 +199,75 @@ func TestMerge(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrent(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
testdir, err := ioutil.TempDir("", "bitcask")
|
||||||
|
assert.NoError(err)
|
||||||
|
|
||||||
|
t.Run("Setup", func(t *testing.T) {
|
||||||
|
var (
|
||||||
|
db *Bitcask
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
t.Run("Open", func(t *testing.T) {
|
||||||
|
db, err = Open(testdir)
|
||||||
|
assert.NoError(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Put", func(t *testing.T) {
|
||||||
|
for i := 0; i < 1024; i++ {
|
||||||
|
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
|
||||||
|
assert.NoError(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Concurrent", func(t *testing.T) {
|
||||||
|
var (
|
||||||
|
db *Bitcask
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
t.Run("Open", func(t *testing.T) {
|
||||||
|
db, err = Open(testdir)
|
||||||
|
assert.NoError(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Put", func(t *testing.T) {
|
||||||
|
f := func(wg *sync.WaitGroup, x int) {
|
||||||
|
defer func() {
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
for i := 0; i <= 100; i++ {
|
||||||
|
if i%x == 0 {
|
||||||
|
key := fmt.Sprintf("k%d", i)
|
||||||
|
value := []byte(fmt.Sprintf("v%d", i))
|
||||||
|
err := db.Put(key, value)
|
||||||
|
assert.NoError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
|
go f(wg, 2)
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go f(wg, 3)
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Close", func(t *testing.T) {
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(err)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestLocking(t *testing.T) {
|
func TestLocking(t *testing.T) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
pb "github.com/prologic/bitcask/proto"
|
pb "github.com/prologic/bitcask/proto"
|
||||||
@ -20,6 +21,8 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Datafile struct {
|
type Datafile struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
id int
|
id int
|
||||||
r *os.File
|
r *os.File
|
||||||
w *os.File
|
w *os.File
|
||||||
@ -130,7 +133,10 @@ func (df *Datafile) Write(e pb.Entry) (int64, error) {
|
|||||||
e.Index = index
|
e.Index = index
|
||||||
e.Timestamp = time.Now().Unix()
|
e.Timestamp = time.Now().Unix()
|
||||||
|
|
||||||
|
df.Lock()
|
||||||
err = df.enc.Encode(&e)
|
err = df.enc.Encode(&e)
|
||||||
|
df.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user