From 3f1d6635c466b47347cd66ac7261c10a4f070824 Mon Sep 17 00:00:00 2001 From: James Mills <1290234+prologic@users.noreply.github.com> Date: Fri, 15 Mar 2019 23:48:50 +1000 Subject: [PATCH] Add prefix scan for keys using a Trie --- bitcask.go | 27 +++++++++++-- bitcask_test.go | 98 +++++++++++++++++++++++++++++++++++++++++++++ cmd/bitcask/scan.go | 60 +++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + keydir.go | 13 +++--- 6 files changed, 192 insertions(+), 9 deletions(-) create mode 100644 cmd/bitcask/scan.go diff --git a/bitcask.go b/bitcask.go index c876052..b0faed5 100644 --- a/bitcask.go +++ b/bitcask.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/derekparker/trie" "github.com/gofrs/flock" ) @@ -27,6 +28,7 @@ type Bitcask struct { curr *Datafile keydir *Keydir datafiles []*Datafile + trie *trie.Trie maxDatafileSize int64 } @@ -82,7 +84,8 @@ func (b *Bitcask) Put(key string, value []byte) error { return err } - b.keydir.Add(key, b.curr.id, index, time.Now().Unix()) + item := b.keydir.Add(key, b.curr.id, index, time.Now().Unix()) + b.trie.Add(key, item) return nil } @@ -94,10 +97,21 @@ func (b *Bitcask) Delete(key string) error { } b.keydir.Delete(key) + b.trie.Remove(key) return nil } +func (b *Bitcask) Scan(prefix string, f func(key string) error) error { + keys := b.trie.PrefixSearch(prefix) + for _, key := range keys { + if err := f(key); err != nil { + return err + } + } + return nil +} + func (b *Bitcask) Fold(f func(key string) error) error { for key := range b.keydir.Keys() { if err := f(key); err != nil { @@ -265,9 +279,11 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { return nil, err } - keydir := NewKeydir() var datafiles []*Datafile + keydir := NewKeydir() + trie := trie.New() + for i, fn := range fns { df, err := NewDatafile(path, ids[i], true) if err != nil { @@ -289,7 +305,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { for key := range hint.Keys() { item, _ := hint.Get(key) - keydir.Add(key, item.FileID, item.Index, item.Timestamp) + _ = keydir.Add(key, item.FileID, item.Index, item.Timestamp) + trie.Add(key, item) } } else { for { @@ -307,7 +324,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { continue } - keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) + item := keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) + trie.Add(e.Key, item) } } } @@ -329,6 +347,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { curr: curr, keydir: keydir, datafiles: datafiles, + trie: trie, maxDatafileSize: DefaultMaxDatafileSize, } diff --git a/bitcask_test.go b/bitcask_test.go index 350927d..50bfde7 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -3,6 +3,8 @@ package bitcask import ( "fmt" "io/ioutil" + "reflect" + "sort" "strings" "sync" "testing" @@ -327,6 +329,58 @@ func TestConcurrent(t *testing.T) { }) } +func TestScan(t *testing.T) { + assert := assert.New(t) + + testdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + + var db *Bitcask + + t.Run("Setup", func(t *testing.T) { + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + var items = map[string][]byte{ + "1": []byte("1"), + "2": []byte("2"), + "3": []byte("3"), + "food": []byte("pizza"), + "foo": []byte("foo"), + "fooz": []byte("fooz ball"), + "hello": []byte("world"), + } + for k, v := range items { + err = db.Put(k, v) + assert.NoError(err) + } + }) + }) + + t.Run("Scan", func(t *testing.T) { + var ( + vals []string + expected = []string{ + "foo", + "fooz ball", + "pizza", + } + ) + + err = db.Scan("fo", func(key string) error { + val, err := db.Get(key) + assert.NoError(err) + vals = append(vals, string(val)) + return nil + }) + sort.Strings(vals) + assert.Equal(expected, vals) + }) +} + func TestLocking(t *testing.T) { assert := assert.New(t) @@ -433,3 +487,47 @@ func BenchmarkPut(b *testing.B) { }) } } + +func BenchmarkScan(b *testing.B) { + testdir, err := ioutil.TempDir("", "bitcask") + if err != nil { + b.Fatal(err) + } + + db, err := Open(testdir) + if err != nil { + b.Fatal(err) + } + defer db.Close() + + var items = map[string][]byte{ + "1": []byte("1"), + "2": []byte("2"), + "3": []byte("3"), + "food": []byte("pizza"), + "foo": []byte("foo"), + "fooz": []byte("fooz ball"), + "hello": []byte("world"), + } + for k, v := range items { + err := db.Put(k, v) + if err != nil { + b.Fatal(err) + } + } + + var expected = []string{"foo", "food", "fooz"} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + var keys []string + err = db.Scan("fo", func(key string) error { + keys = append(keys, key) + return nil + }) + sort.Strings(keys) + if !reflect.DeepEqual(expected, keys) { + b.Fatal(fmt.Errorf("expected keys=#%v got=%#v", expected, keys)) + } + } +} diff --git a/cmd/bitcask/scan.go b/cmd/bitcask/scan.go new file mode 100644 index 0000000..4218f80 --- /dev/null +++ b/cmd/bitcask/scan.go @@ -0,0 +1,60 @@ +package main + +import ( + "fmt" + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/prologic/bitcask" +) + +var scanCmd = &cobra.Command{ + Use: "scan ", + Aliases: []string{"search", "find"}, + Short: "Perform a prefis scan for keys", + Long: `This performa a prefix scan for keys starting with the given +prefix. This uses a Trie to search for matching keys and returns all matched +keys.`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + + prefix := args[0] + + os.Exit(scan(path, prefix)) + }, +} + +func init() { + RootCmd.AddCommand(scanCmd) +} + +func scan(path, prefix string) int { + db, err := bitcask.Open(path) + if err != nil { + log.WithError(err).Error("error opening database") + return 1 + } + defer db.Close() + + err = db.Scan(prefix, func(key string) error { + value, err := db.Get(key) + if err != nil { + log.WithError(err).Error("error reading key") + return err + } + + fmt.Printf("%s\n", string(value)) + log.WithField("key", key).WithField("value", value).Debug("key/value") + return nil + }) + if err != nil { + log.WithError(err).Error("error scanning keys") + return 1 + } + + return 0 +} diff --git a/go.mod b/go.mod index f57542d..cb65039 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/prologic/bitcask require ( + github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7 github.com/gofrs/flock v0.7.1 github.com/gogo/protobuf v1.2.1 github.com/golang/protobuf v1.2.0 diff --git a/go.sum b/go.sum index a373297..3248a97 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7 h1:Cab9yoTQh1TxObKfis1DzZ6vFLK5kbeenMjRES/UE3o= +github.com/derekparker/trie v0.0.0-20180212171413-e608c2733dc7/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc= diff --git a/keydir.go b/keydir.go index ed09ff6..bc38c03 100644 --- a/keydir.go +++ b/keydir.go @@ -25,15 +25,18 @@ func NewKeydir() *Keydir { } } -func (k *Keydir) Add(key string, fileid int, index, timestamp int64) { - k.Lock() - defer k.Unlock() - - k.kv[key] = Item{ +func (k *Keydir) Add(key string, fileid int, index, timestamp int64) Item { + item := Item{ FileID: fileid, Index: index, Timestamp: timestamp, } + + k.Lock() + k.kv[key] = item + k.Unlock() + + return item } func (k *Keydir) Get(key string) (Item, bool) {