mirror of
https://github.com/taigrr/bitcask
synced 2025-01-18 04:03:17 -08:00
Get space that can be reclaimed (#189)
* get reclaimable space added * import order fix Co-authored-by: yash <yash.chandra@grabpay.com>
This commit is contained in:
parent
f4357e6f18
commit
158f6d9888
43
bitcask.go
43
bitcask.go
@ -17,6 +17,7 @@ import (
|
||||
"github.com/prologic/bitcask/internal"
|
||||
"github.com/prologic/bitcask/internal/config"
|
||||
"github.com/prologic/bitcask/internal/data"
|
||||
"github.com/prologic/bitcask/internal/data/codec"
|
||||
"github.com/prologic/bitcask/internal/index"
|
||||
"github.com/prologic/bitcask/internal/metadata"
|
||||
)
|
||||
@ -101,6 +102,11 @@ func (b *Bitcask) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
b.metadata.IndexUpToDate = true
|
||||
if err := b.saveMetadata(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, df := range b.datafiles {
|
||||
if err := df.Close(); err != nil {
|
||||
return err
|
||||
@ -172,29 +178,32 @@ func (b *Bitcask) Put(key, value []byte) error {
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
offset, n, err := b.put(key, value)
|
||||
if err != nil {
|
||||
b.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
if b.config.Sync {
|
||||
if err := b.curr.Sync(); err != nil {
|
||||
b.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// in case of successful `put`, IndexUpToDate will be always be false
|
||||
if b.metadata.IndexUpToDate {
|
||||
b.metadata.IndexUpToDate = false
|
||||
if err := b.metadata.Save(filepath.Join(b.path, "meta.json"), b.config.FileFileModeBeforeUmask); err != nil {
|
||||
if err := b.saveMetadata(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if oldItem, found := b.trie.Search(key); found {
|
||||
b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
|
||||
}
|
||||
|
||||
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
|
||||
b.trie.Insert(key, item)
|
||||
b.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -208,6 +217,9 @@ func (b *Bitcask) Delete(key []byte) error {
|
||||
b.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
if item, found := b.trie.Search(key); found {
|
||||
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key))
|
||||
}
|
||||
b.trie.Delete(key)
|
||||
b.mu.Unlock()
|
||||
|
||||
@ -221,7 +233,12 @@ func (b *Bitcask) DeleteAll() (err error) {
|
||||
|
||||
b.trie.ForEach(func(node art.Node) bool {
|
||||
_, _, err = b.put(node.Key(), []byte{})
|
||||
return err == nil
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
item, _ := b.trie.Search(node.Key())
|
||||
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(node.Key()))
|
||||
return true
|
||||
})
|
||||
b.trie = art.New()
|
||||
|
||||
@ -421,6 +438,7 @@ func (b *Bitcask) Merge() error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
b.metadata.ReclaimableSpace = 0
|
||||
|
||||
// And finally reopen the database
|
||||
return b.Reopen()
|
||||
@ -512,14 +530,19 @@ func (b *Bitcask) saveIndex() error {
|
||||
if err := b.indexer.Save(b.trie, filepath.Join(b.path, tempIdx)); err != nil {
|
||||
return err
|
||||
}
|
||||
err := os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.metadata.IndexUpToDate = true
|
||||
return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index"))
|
||||
}
|
||||
|
||||
// saveMetadata saves metadata into disk
|
||||
func (b *Bitcask) saveMetadata() error {
|
||||
return b.metadata.Save(filepath.Join(b.path, "meta.json"), b.config.DirFileModeBeforeUmask)
|
||||
}
|
||||
|
||||
// Reclaimable returns space that can be reclaimed
|
||||
func (b *Bitcask) Reclaimable() int64 {
|
||||
return b.metadata.ReclaimableSpace
|
||||
}
|
||||
|
||||
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) {
|
||||
fns, err := internal.GetDatafiles(path)
|
||||
if err != nil {
|
||||
|
@ -314,6 +314,64 @@ func TestDeletedKeys(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestMetadata(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
db, err := Open(testdir)
|
||||
assert.NoError(err)
|
||||
err = db.Put([]byte("foo"), []byte("bar"))
|
||||
assert.NoError(err)
|
||||
err = db.Close()
|
||||
assert.NoError(err)
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
|
||||
t.Run("IndexUptoDateAfterCloseAndOpen", func(t *testing.T) {
|
||||
assert.Equal(true, db.metadata.IndexUpToDate)
|
||||
})
|
||||
t.Run("IndexUptoDateAfterPut", func(t *testing.T) {
|
||||
assert.NoError(db.Put([]byte("foo1"), []byte("bar1")))
|
||||
assert.Equal(false, db.metadata.IndexUpToDate)
|
||||
})
|
||||
t.Run("Reclaimable", func(t *testing.T) {
|
||||
assert.Equal(int64(0), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterNewPut", func(t *testing.T) {
|
||||
assert.NoError(db.Put([]byte("hello"), []byte("world")))
|
||||
assert.Equal(int64(0), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterRepeatedPut", func(t *testing.T) {
|
||||
assert.NoError(db.Put([]byte("hello"), []byte("world")))
|
||||
assert.Equal(int64(26), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterDelete", func(t *testing.T) {
|
||||
assert.NoError(db.Delete([]byte("hello")))
|
||||
assert.Equal(int64(73), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterNonExistingDelete", func(t *testing.T) {
|
||||
assert.NoError(db.Delete([]byte("hello1")))
|
||||
assert.Equal(int64(73), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterDeleteAll", func(t *testing.T) {
|
||||
assert.NoError(db.DeleteAll())
|
||||
assert.Equal(int64(158), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterMerge", func(t *testing.T) {
|
||||
assert.NoError(db.Merge())
|
||||
assert.Equal(int64(0), db.Reclaimable())
|
||||
})
|
||||
t.Run("IndexUptoDateAfterMerge", func(t *testing.T) {
|
||||
assert.Equal(true, db.metadata.IndexUpToDate)
|
||||
})
|
||||
t.Run("ReclaimableAfterMergeAndDeleteAll", func(t *testing.T) {
|
||||
assert.NoError(db.DeleteAll())
|
||||
assert.Equal(int64(0), db.Reclaimable())
|
||||
})
|
||||
}
|
||||
|
||||
func TestConfigErrors(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -13,6 +13,7 @@ const (
|
||||
keySize = 4
|
||||
valueSize = 8
|
||||
checksumSize = 4
|
||||
MetaInfoSize = keySize + valueSize + checksumSize
|
||||
)
|
||||
|
||||
// NewEncoder creates a streaming Entry encoder.
|
||||
|
@ -7,7 +7,8 @@ import (
|
||||
)
|
||||
|
||||
type MetaData struct {
|
||||
IndexUpToDate bool `json:"index_up_to_date"`
|
||||
IndexUpToDate bool `json:"index_up_to_date"`
|
||||
ReclaimableSpace int64 `json:"reclaimable_space"`
|
||||
}
|
||||
|
||||
func (m *MetaData) Save(path string, mode os.FileMode) error {
|
||||
|
Loading…
x
Reference in New Issue
Block a user