From 5e4d863ab7f46032ce211cd8fa4f57ebe53d2504 Mon Sep 17 00:00:00 2001 From: James Mills Date: Thu, 15 Jul 2021 21:33:20 +0000 Subject: [PATCH] Use package github.com/gofrs/flock as flock implementation. (#224) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Supercesd #219 after rebasing on master after migrating off Github. Co-authored-by: Nicolò Santamaria Co-authored-by: James Mills Co-authored-by: Tai Groot Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/224 Co-authored-by: James Mills Co-committed-by: James Mills --- bitcask.go | 23 ++-- cmd/bitcaskd/server.go | 35 ------ flock/flock.go | 97 ----------------- flock/flock_test.go | 121 --------------------- flock/flock_unix.go | 79 -------------- flock/race_test.go | 236 ----------------------------------------- go.mod | 2 +- go.sum | 2 + 8 files changed, 16 insertions(+), 579 deletions(-) delete mode 100644 flock/flock.go delete mode 100644 flock/flock_test.go delete mode 100644 flock/flock_unix.go delete mode 100644 flock/race_test.go diff --git a/bitcask.go b/bitcask.go index 8293a95..e92cb49 100644 --- a/bitcask.go +++ b/bitcask.go @@ -13,8 +13,10 @@ import ( "sync" "time" + "github.com/gofrs/flock" art "github.com/plar/go-adaptive-radix-tree" - "git.mills.io/prologic/bitcask/flock" + log "github.com/sirupsen/logrus" + "git.mills.io/prologic/bitcask/internal" "git.mills.io/prologic/bitcask/internal/config" "git.mills.io/prologic/bitcask/internal/data" @@ -22,7 +24,6 @@ import ( "git.mills.io/prologic/bitcask/internal/index" "git.mills.io/prologic/bitcask/internal/metadata" "git.mills.io/prologic/bitcask/scripts/migrations" - log "github.com/sirupsen/logrus" ) const ( @@ -68,10 +69,8 @@ var ( // and in-memory hash of key/value pairs as per the Bitcask paper and seen // in the Riak database. type Bitcask struct { - mu sync.RWMutex - - *flock.Flock - + mu sync.RWMutex + flock *flock.Flock config *config.Config options []Option path string @@ -114,7 +113,7 @@ func (b *Bitcask) Close() error { b.mu.RLock() defer func() { b.mu.RUnlock() - b.Flock.Unlock() + b.flock.Unlock() }() return b.close() @@ -669,6 +668,10 @@ func (b *Bitcask) Merge() error { return err } for _, file := range files { + // see #225 + if file.Name() == lockfile { + continue + } err := os.Rename( path.Join([]string{mdb.path, file.Name()}...), path.Join([]string{b.path, file.Name()}...), @@ -723,7 +726,7 @@ func Open(path string, options ...Option) (*Bitcask, error) { } bitcask := &Bitcask{ - Flock: flock.New(filepath.Join(path, lockfile)), + flock: flock.New(filepath.Join(path, lockfile)), config: cfg, options: options, path: path, @@ -732,12 +735,12 @@ func Open(path string, options ...Option) (*Bitcask, error) { metadata: meta, } - locked, err := bitcask.Flock.TryLock() + ok, err := bitcask.flock.TryLock() if err != nil { return nil, err } - if !locked { + if !ok { return nil, ErrDatabaseLocked } diff --git a/cmd/bitcaskd/server.go b/cmd/bitcaskd/server.go index f50c40d..3ae2e9b 100644 --- a/cmd/bitcaskd/server.go +++ b/cmd/bitcaskd/server.go @@ -54,13 +54,6 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) { ttl = &d } - err := s.db.Lock() - if err != nil { - conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "") - return - } - defer s.db.Unlock() - if ttl != nil { if err := s.db.PutWithTTL(key, value, *ttl); err != nil { conn.WriteString(fmt.Sprintf("ERR: %s", err)) @@ -82,13 +75,6 @@ func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) { key := cmd.Args[1] - err := s.db.Lock() - if err != nil { - conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "") - return - } - defer s.db.Unlock() - value, err := s.db.Get(key) if err != nil { conn.WriteNull() @@ -98,13 +84,6 @@ func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) { } func (s *server) handleKeys(cmd redcon.Command, conn redcon.Conn) { - err := s.db.Lock() - if err != nil { - conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "") - return - } - defer s.db.Unlock() - conn.WriteArray(s.db.Len()) for key := range s.db.Keys() { conn.WriteBulk(key) @@ -119,13 +98,6 @@ func (s *server) handleExists(cmd redcon.Command, conn redcon.Conn) { key := cmd.Args[1] - err := s.db.Lock() - if err != nil { - conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "") - return - } - defer s.db.Unlock() - if s.db.Has(key) { conn.WriteInt(1) } else { @@ -141,13 +113,6 @@ func (s *server) handleDel(cmd redcon.Command, conn redcon.Conn) { key := cmd.Args[1] - err := s.db.Lock() - if err != nil { - conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "") - return - } - defer s.db.Unlock() - if err := s.db.Delete(key); err != nil { conn.WriteInt(0) } else { diff --git a/flock/flock.go b/flock/flock.go deleted file mode 100644 index 77b4c31..0000000 --- a/flock/flock.go +++ /dev/null @@ -1,97 +0,0 @@ -package flock - -import ( - "errors" - "os" - "sync" -) - -type Flock struct { - path string - m sync.Mutex - fh *os.File -} - -var ( - ErrAlreadyLocked = errors.New("Double lock: already own the lock") - ErrLockFailed = errors.New("Could not acquire lock") - ErrLockNotHeld = errors.New("Could not unlock, lock is not held") - - ErrInodeChangedAtPath = errors.New("Inode changed at path") -) - -// New returns a new instance of *Flock. The only parameter -// it takes is the path to the desired lockfile. -func New(path string) *Flock { - return &Flock{path: path} -} - -// Path returns the file path linked to this lock. -func (f *Flock) Path() string { - return f.path -} - -// Lock will acquire the lock. This function may block indefinitely if some other process holds the lock. For a non-blocking version, see Flock.TryLock(). -func (f *Flock) Lock() error { - f.m.Lock() - defer f.m.Unlock() - - if f.fh != nil { - return ErrAlreadyLocked - } - - var fh *os.File - - fh, err := lock_sys(f.path, false) - // treat "ErrInodeChangedAtPath" as "some other process holds the lock, retry locking" - for err == ErrInodeChangedAtPath { - fh, err = lock_sys(f.path, false) - } - - if err != nil { - return err - } - if fh == nil { - return ErrLockFailed - } - - f.fh = fh - return nil -} - -// TryLock will try to acquire the lock, and returns immediately if the lock is already owned by another process. -func (f *Flock) TryLock() (bool, error) { - f.m.Lock() - defer f.m.Unlock() - - if f.fh != nil { - return false, ErrAlreadyLocked - } - - fh, err := lock_sys(f.path, true) - if err != nil { - return false, ErrLockFailed - } - - f.fh = fh - return true, nil -} - -// Unlock removes the lock file from disk and releases the lock. -// Whatever the result of `.Unlock()`, the caller must assume that it does not hold the lock anymore. -func (f *Flock) Unlock() error { - f.m.Lock() - defer f.m.Unlock() - - if f.fh == nil { - return ErrLockNotHeld - } - - err1 := rm_if_match(f.fh, f.path) - err2 := f.fh.Close() - - if err1 != nil { - return err1 - } - return err2 -} diff --git a/flock/flock_test.go b/flock/flock_test.go deleted file mode 100644 index e68fd4e..0000000 --- a/flock/flock_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package flock - -import ( - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// WARNING : this test will delete the file located at "testLockPath". Choose an adequate temporary file name. -const testLockPath = "/tmp/bitcask_unit_test_lock" // file path to use for the lock - -func TestTryLock(t *testing.T) { - // test that basic locking functionalities are consistent - - // make sure there is no present lock when starting this test - os.Remove(testLockPath) - - assert := assert.New(t) - - lock1 := New(testLockPath) - lock2 := New(testLockPath) - - // 1- take the first lock - locked1, err := lock1.TryLock() - assert.True(locked1) - assert.NoError(err) - - // 2- check that the second lock cannot acquire the lock - locked2, err := lock2.TryLock() - assert.False(locked2) - assert.Error(err) - - // 3- release the first lock - err = lock1.Unlock() - assert.NoError(err) - - // 4- check that the second lock can acquire and then release the lock without error - locked2, err = lock2.TryLock() - assert.True(locked2) - assert.NoError(err) - - err = lock2.Unlock() - assert.NoError(err) -} - -func TestLock(t *testing.T) { - assert := assert.New(t) - - // make sure there is no present lock when starting this test - os.Remove(testLockPath) - - syncChan := make(chan bool) - - // main goroutine: take lock on testPath - lock := New(testLockPath) - - err := lock.Lock() - assert.NoError(err) - - go func() { - // sub routine: - lock := New(testLockPath) - - // before entering the block '.Lock()' call, signal we are about to do it - // see below : the main goroutine will wait for a small delay before releasing the lock - syncChan <- true - // '.Lock()' should ultimately return without error : - err := lock.Lock() - assert.NoError(err) - - err = lock.Unlock() - assert.NoError(err) - - close(syncChan) - }() - - // wait for the "ready" signal from the sub routine, - <-syncChan - - // after that signal wait for a small delay before releasing the lock - <-time.After(100 * time.Microsecond) - err = lock.Unlock() - assert.NoError(err) - - // wait for the sub routine to finish - <-syncChan -} - -func TestErrorConditions(t *testing.T) { - // error conditions implemented in this version : - // - you can't release a lock you do not hold - // - you can't lock twice the same lock - - // -- setup - assert := assert.New(t) - - // make sure there is no present lock when starting this test - os.Remove(testLockPath) - - lock := New(testLockPath) - - // -- run tests : - - err := lock.Unlock() - assert.Error(err, "you can't release a lock you do not hold") - - // take the lock once: - lock.TryLock() - - locked, err := lock.TryLock() - assert.False(locked) - assert.Error(err, "you can't lock twice the same lock (using .TryLock())") - - err = lock.Lock() - assert.Error(err, "you can't lock twice the same lock (using .Lock())") - - // -- teardown - lock.Unlock() -} diff --git a/flock/flock_unix.go b/flock/flock_unix.go deleted file mode 100644 index 720c753..0000000 --- a/flock/flock_unix.go +++ /dev/null @@ -1,79 +0,0 @@ -// +build !aix,!windows - -package flock - -import ( - "os" - - "golang.org/x/sys/unix" -) - -func lock_sys(path string, nonBlocking bool) (_ *os.File, err error) { - var fh *os.File - - fh, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - return nil, err - } - - defer func() { - if err != nil { - fh.Close() - } - }() - - flag := unix.LOCK_EX - if nonBlocking { - flag |= unix.LOCK_NB - } - - err = unix.Flock(int(fh.Fd()), flag) - if err != nil { - return nil, err - } - - if !sameInodes(fh, path) { - return nil, ErrInodeChangedAtPath - } - - return fh, nil -} - -func rm_if_match(fh *os.File, path string) error { - // Sanity check : - // before running "rm", check that the file pointed at by the - // filehandle has the same inode as the path in the filesystem - // - // If this sanity check doesn't pass, store a "ErrInodeChangedAtPath" error, - // if the check passes, run os.Remove, and store the error if any. - // - // note : this sanity check is in no way atomic, but : - // - as long as only cooperative processes are involved, it will work as intended - // - it allows to avoid 99.9% the major pitfall case: "root user forcefully removed the lockfile" - - if !sameInodes(fh, path) { - return ErrInodeChangedAtPath - } - - return os.Remove(path) -} - -func sameInodes(f *os.File, path string) bool { - // get inode from opened file f: - var fstat unix.Stat_t - err := unix.Fstat(int(f.Fd()), &fstat) - if err != nil { - return false - } - fileIno := fstat.Ino - - // get inode for path on disk: - var dstat unix.Stat_t - err = unix.Stat(path, &dstat) - if err != nil { - return false - } - pathIno := dstat.Ino - - return pathIno == fileIno -} diff --git a/flock/race_test.go b/flock/race_test.go deleted file mode 100644 index f922ca0..0000000 --- a/flock/race_test.go +++ /dev/null @@ -1,236 +0,0 @@ -package flock - -// the "nd" in "nd_test.go" stands for non-deterministic - -import ( - "errors" - "os" - "sync" - "sync/atomic" - "syscall" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// The two tests in this file are test some concurrency scenarios : -// 1- TestRaceLock() runs several threads racing for the same lock -// 2- TestShatteredLock() runs racing racing threads, along with another threads which forcibly remove the file from disk -// -// Note that these tests are non-deterministic : the coverage produced by each test depends -// on how the runtime chooses to schedule the concurrent goroutines. - -var lockerCount int64 - -// lockAndCount tries to take a lock on "lockpath" -// if it fails : it returns 0 and stop there -// if it succeeds : -// 1- it sets a defer function to release the lock in the same fashion as "func (b *Bitcask) Close()" -// 2- it increments the shared "lockerCount" above -// 3- it waits for a short amount of time -// 4- it decrements "lockerCount" -// 5- it returns the value it has seen at step 2. -// -// If the locking and unlocking behave as we expect them to, -// instructions 1-5 should be in a critical section, -// and the only possible value at step 2 should be "1". -// -// Returning a value > 0 indicates this function successfully acquired the lock, -// returning a value == 0 indicates that TryLock failed. - -func lockAndCount(lockpath string) int64 { - lock := New(lockpath) - ok, _ := lock.TryLock() - if !ok { - return 0 - } - defer func() { - lock.Unlock() - }() - - x := atomic.AddInt64(&lockerCount, 1) - // emulate a workload : - <-time.After(1 * time.Microsecond) - atomic.AddInt64(&lockerCount, -1) - - return x -} - -// locker will call the lock function above in a loop, until one of the following holds : -// - reading from the "timeout" channel doesn't block -// - the number of calls to "lock()" that indicate the lock was successfully taken reaches "successfullLockCount" -func locker(t *testing.T, id int, lockPath string, successfulLockCount int, timeout <-chan struct{}) { - timedOut := false - - failCount := 0 - max := int64(0) - -lockloop: - for successfulLockCount > 0 { - select { - case <-timeout: - timedOut = true - break lockloop - default: - } - - x := lockAndCount(lockPath) - - if x > 0 { - // if x indicates the lock was taken : decrement the counter - successfulLockCount-- - } - - if x > 1 { - // if x indicates an invalid value : increase the failCount and update max accordingly - failCount++ - if x > max { - max = x - } - } - } - - // check failure cases : - if timedOut { - t.Fail() - t.Logf("[runner %02d] timed out", id) - } - if failCount > 0 { - t.Fail() - t.Logf("[runner %02d] lockCounter was > 1 on %2.d occasions, max seen value was %2.d", id, failCount, max) - } -} - -// TestRaceLock checks that no error occurs when several concurrent actors (goroutines in this case) race for the same lock. -func TestRaceLock(t *testing.T) { - // test parameters, written in code : - // you may want to tweak these values for testing - - goroutines := 20 // number of concurrent "locker" goroutines to launch - successfulLockCount := 50 // how many times a "locker" will successfully take the lock before halting - - // make sure there is no present lock when startng this test - os.Remove(testLockPath) - - // timeout implemented in code - // (the lock acquisition depends on the behavior of the filesystem, - // avoid sending CI in endless loop if something fishy happens on the test server ...) - // tweak this value if needed ; comment out the "close(ch)" instruction below - timeout := 10 * time.Second - ch := make(chan struct{}) - go func() { - <-time.After(timeout) - close(ch) - }() - - wg := &sync.WaitGroup{} - wg.Add(goroutines) - - for i := 0; i < goroutines; i++ { - go func(id int) { - locker(t, id, testLockPath, successfulLockCount, ch) - wg.Done() - }(i) - } - - wg.Wait() -} - -func isExpectedError(err error) bool { - switch { - case err == nil: - return true - case err == ErrInodeChangedAtPath: - return true - case errors.Is(err, syscall.ENOENT): - return true - - default: - return false - } -} - -// TestShatteredLock runs concurrent goroutines on one lock, with an extra goroutine -// which removes the lock file from disk without checking the locks -// (e.g: a user who would run 'rm lockfile' in a loop while the program is running). -// -// In this scenario, errors may occur on .Unlock() ; this test checks that only errors -// relating to the file being deleted occur. -// -// This test additionally logs the number of errors that occurred, grouped by error message. -func TestShatteredLock(t *testing.T) { - // test parameters, written in code : - // you may want to tweak these values for testing - - goroutines := 4 // number of concurrent "locker" and "remover" goroutines to launch - successfulLockCount := 10 // how many times a "locker" will successfully take the lock before halting - - // make sure there is no present lock when startng this test - os.Remove(testLockPath) - assert := assert.New(t) - - wg := &sync.WaitGroup{} - wg.Add(goroutines) - - stopChan := make(chan struct{}) - - errChan := make(chan error, 10) - - for i := 0; i < goroutines; i++ { - go func(id int, count int) { - for count > 0 { - lock := New(testLockPath) - ok, _ := lock.TryLock() - if !ok { - continue - } - - count-- - err := lock.Unlock() - if !isExpectedError(err) { - assert.Fail("goroutine %d - unexpected error: %v", id, err) - } - - if err != nil { - errChan <- err - } - } - - wg.Done() - }(i, successfulLockCount) - } - - var wgCompanion = &sync.WaitGroup{} - wgCompanion.Add(2) - - go func() { - defer wgCompanion.Done() - for { - os.Remove(testLockPath) - - select { - case <-stopChan: - return - default: - } - } - }() - - var errs = make(map[string]int) - go func() { - for err := range errChan { - errs[err.Error()]++ - } - wgCompanion.Done() - }() - - wg.Wait() - close(stopChan) - close(errChan) - wgCompanion.Wait() - - for err, count := range errs { - t.Logf(" seen %d times: %s", count, err) - } -} diff --git a/go.mod b/go.mod index 346e88e..03812da 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.mills.io/prologic/bitcask go 1.13 require ( + github.com/gofrs/flock v0.8.0 github.com/pkg/errors v0.9.1 github.com/plar/go-adaptive-radix-tree v1.0.4 github.com/sirupsen/logrus v1.8.1 @@ -13,5 +14,4 @@ require ( github.com/stretchr/testify v1.7.0 github.com/tidwall/redcon v1.4.1 golang.org/x/exp v0.0.0-20200228211341-fcea875c7e85 - golang.org/x/sys v0.0.0-20210510120138-977fb7262007 ) diff --git a/go.sum b/go.sum index 7229595..d99d940 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY= +github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=