From 9b0daa8a301ae07d532edc6c6a4c9c03ca2f46f0 Mon Sep 17 00:00:00 2001 From: James Mills Date: Wed, 21 Jul 2021 02:36:06 +0000 Subject: [PATCH] Add RangeScan() support (#160) Co-authored-by: James Mills <1290234+prologic@users.noreply.github.com> Co-authored-by: James Mills Co-authored-by: Tai Groot Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/160 Co-authored-by: James Mills Co-committed-by: James Mills --- bitcask.go | 88 ++++++++++++++++++++++++- bitcask_test.go | 154 +++++++++++++++++++++++++++++++++++++++++-- cmd/bitcask/range.go | 61 +++++++++++++++++ go.mod | 3 +- go.sum | 4 ++ 5 files changed, 300 insertions(+), 10 deletions(-) create mode 100644 cmd/bitcask/range.go diff --git a/bitcask.go b/bitcask.go index 2d29359..2723a48 100644 --- a/bitcask.go +++ b/bitcask.go @@ -1,6 +1,7 @@ package bitcask import ( + "bytes" "errors" "fmt" "hash/crc32" @@ -13,6 +14,7 @@ import ( "sync" "time" + "github.com/abcum/lcp" "github.com/gofrs/flock" art "github.com/plar/go-adaptive-radix-tree" log "github.com/sirupsen/logrus" @@ -58,6 +60,7 @@ var ( // (typically opened by another process) ErrDatabaseLocked = errors.New("error: database locked") + ErrInvalidRange = errors.New("error: invalid range") ErrInvalidVersion = errors.New("error: invalid db version") // ErrMergeInProgress is the error returned if merge is called when already a merge @@ -348,12 +351,12 @@ func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) { return } -// ScanSift iterates over all keys in the database beginning with the given +// SiftScan iterates over all keys in the database beginning with the given // prefix, calling the function `f` for each key. If the KV pair is expired or // the function returns true, that key is deleted from the database. // If the function returns an error on any key, no further keys are processed, // no keys are deleted, and the first error is returned. -func (b *Bitcask) ScanSift(prefix []byte, f func(key []byte) (bool, error)) (err error) { +func (b *Bitcask) SiftScan(prefix []byte, f func(key []byte) (bool, error)) (err error) { keysToDelete := art.New() b.mu.RLock() @@ -385,6 +388,87 @@ func (b *Bitcask) ScanSift(prefix []byte, f func(key []byte) (bool, error)) (err return } +// Range performs a range scan of keys matching a range of keys between the +// start key and end key and calling the function `f` with the keys found. +// If the function returns an error no further keys are processed and the +// first error returned. +func (b *Bitcask) Range(start, end []byte, f func(key []byte) error) (err error) { + if bytes.Compare(start, end) == 1 { + return ErrInvalidRange + } + + commonPrefix := lcp.LCP(start, end) + if commonPrefix == nil { + return ErrInvalidRange + } + + b.mu.RLock() + defer b.mu.RUnlock() + + b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool { + if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 { + if err = f(node.Key()); err != nil { + return false + } + return true + } else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 { + return false + } + return true + }) + return +} + +// SiftRange performs a range scan of keys matching a range of keys between the +// start key and end key and calling the function `f` with the keys found. +// If the KV pair is expired or the function returns true, that key is deleted +// from the database. +// If the function returns an error on any key, no further keys are processed, no +// keys are deleted, and the first error is returned. +func (b *Bitcask) SiftRange(start, end []byte, f func(key []byte) (bool, error)) (err error) { + if bytes.Compare(start, end) == 1 { + return ErrInvalidRange + } + + commonPrefix := lcp.LCP(start, end) + if commonPrefix == nil { + return ErrInvalidRange + } + + keysToDelete := art.New() + + b.mu.RLock() + b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool { + if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 { + if b.isExpired(node.Key()) { + keysToDelete.Insert(node.Key(), true) + return true + } + var shouldDelete bool + if shouldDelete, err = f(node.Key()); err != nil { + return false + } else if shouldDelete { + keysToDelete.Insert(node.Key(), true) + } + return true + } else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 { + return false + } + return true + }) + b.mu.RUnlock() + + b.mu.Lock() + defer b.mu.Unlock() + + keysToDelete.ForEach(func(node art.Node) (cont bool) { + b.delete(node.Key()) + return true + }) + + return +} + // Len returns the total number of keys in the database func (b *Bitcask) Len() int { b.mu.RLock() diff --git a/bitcask_test.go b/bitcask_test.go index 74bce69..023854b 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -190,7 +190,7 @@ func TestAll(t *testing.T) { assert.NoError(err) }) - t.Run("ScanSift", func(t *testing.T) { + t.Run("SiftScan", func(t *testing.T) { err := db.DeleteAll() assert.NoError(err) err = db.Put([]byte("toBeSifted"), []byte("siftMe")) @@ -201,7 +201,7 @@ func TestAll(t *testing.T) { assert.NoError(err) err = db.Put([]byte("toBeSiftedButNotReally"), []byte("dontSiftMe")) assert.NoError(err) - err = db.ScanSift([]byte("toBeSifted"), func(key []byte) (bool, error) { + err = db.SiftScan([]byte("toBeSifted"), func(key []byte) (bool, error) { value, err := db.Get(key) if err != nil { return false, err @@ -1751,13 +1751,13 @@ func TestSift(t *testing.T) { }) assert.Equal(ErrMockError, err) - err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) { + err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) { return true, ErrMockError }) assert.Equal(ErrMockError, err) }) } -func TestScanSift(t *testing.T) { +func TestSiftScan(t *testing.T) { assert := assert.New(t) testdir, err := ioutil.TempDir("", "bitcask") @@ -1788,13 +1788,13 @@ func TestScanSift(t *testing.T) { }) }) - t.Run("ScanSiftErrors", func(t *testing.T) { - err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) { + t.Run("SiftScanErrors", func(t *testing.T) { + err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) { return false, ErrMockError }) assert.Equal(ErrMockError, err) - err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) { + err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) { return true, ErrMockError }) assert.Equal(ErrMockError, err) @@ -1860,6 +1860,146 @@ func TestScan(t *testing.T) { assert.Equal(ErrMockError, err) }) } +func TestSiftRange(t *testing.T) { + assert := assert.New(t) + + testdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + + var db *Bitcask + + t.Run("Setup", func(t *testing.T) { + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + for i := 1; i < 10; i++ { + key := []byte(fmt.Sprintf("foo_%d", i)) + val := []byte(fmt.Sprintf("%d", i)) + err = db.Put(key, val) + assert.NoError(err) + } + }) + }) + + t.Run("SiftRange", func(t *testing.T) { + var ( + vals [][]byte + expected = [][]byte{ + []byte("1"), + []byte("2"), + []byte("4"), + []byte("5"), + []byte("6"), + []byte("7"), + []byte("8"), + []byte("9"), + } + ) + + err = db.SiftRange([]byte("foo_3"), []byte("foo_7"), func(key []byte) (bool, error) { + val, err := db.Get(key) + assert.NoError(err) + if string(val) == "3" { + return true, nil + } + return false, nil + }) + err = db.Fold(func(key []byte) error { + val, err := db.Get(key) + assert.NoError(err) + vals = append(vals, val) + + return nil + }) + + _, err = db.Get([]byte("foo_3")) + assert.Equal(ErrKeyNotFound, err) + vals = SortByteArrays(vals) + assert.Equal(expected, vals) + }) + + t.Run("SiftRangeErrors", func(t *testing.T) { + err = db.SiftRange([]byte("foo_3"), []byte("foo_7"), func(key []byte) (bool, error) { + return true, ErrMockError + }) + assert.Error(err) + assert.Equal(ErrMockError, err) + }) + + t.Run("InvalidRange", func(t *testing.T) { + err = db.SiftRange([]byte("foo_3"), []byte("foo_1"), func(key []byte) (bool, error) { + return false, nil + }) + assert.Error(err) + assert.Equal(ErrInvalidRange, err) + }) +} + +func TestRange(t *testing.T) { + assert := assert.New(t) + + testdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + + var db *Bitcask + + t.Run("Setup", func(t *testing.T) { + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + for i := 1; i < 10; i++ { + key := []byte(fmt.Sprintf("foo_%d", i)) + val := []byte(fmt.Sprintf("%d", i)) + err = db.Put(key, val) + assert.NoError(err) + } + }) + }) + + t.Run("Range", func(t *testing.T) { + var ( + vals [][]byte + expected = [][]byte{ + []byte("3"), + []byte("4"), + []byte("5"), + []byte("6"), + []byte("7"), + } + ) + + err = db.Range([]byte("foo_3"), []byte("foo_7"), func(key []byte) error { + val, err := db.Get(key) + assert.NoError(err) + vals = append(vals, val) + return nil + }) + vals = SortByteArrays(vals) + assert.Equal(expected, vals) + }) + + t.Run("RangeErrors", func(t *testing.T) { + err = db.Range([]byte("foo_3"), []byte("foo_7"), func(key []byte) error { + return ErrMockError + }) + assert.Error(err) + assert.Equal(ErrMockError, err) + }) + + t.Run("InvalidRange", func(t *testing.T) { + err = db.Range([]byte("foo_3"), []byte("foo_1"), func(key []byte) error { + return nil + }) + assert.Error(err) + assert.Equal(ErrInvalidRange, err) + }) +} func TestLocking(t *testing.T) { assert := assert.New(t) diff --git a/cmd/bitcask/range.go b/cmd/bitcask/range.go new file mode 100644 index 0000000..afd104f --- /dev/null +++ b/cmd/bitcask/range.go @@ -0,0 +1,61 @@ +package main + +import ( + "fmt" + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "git.mills.io/prologic/bitcask" +) + +var rangeCmd = &cobra.Command{ + Use: "range ", + Aliases: []string{}, + Short: "Perform a range scan for keys from a start to end key", + Long: `This performa a range scan for keys starting with the given start +key and ending with the end key. This uses a Trie to search for matching keys +within the range and returns all matched keys.`, + Args: cobra.ExactArgs(2), + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + + start := args[0] + end := args[1] + + os.Exit(_range(path, start, end)) + }, +} + +func init() { + RootCmd.AddCommand(rangeCmd) +} + +func _range(path, start, end string) int { + db, err := bitcask.Open(path) + if err != nil { + log.WithError(err).Error("error opening database") + return 1 + } + defer db.Close() + + err = db.Range([]byte(start), []byte(end), func(key []byte) error { + value, err := db.Get(key) + if err != nil { + log.WithError(err).Error("error reading key") + return err + } + + fmt.Printf("%s\n", string(value)) + log.WithField("key", key).WithField("value", value).Debug("key/value") + return nil + }) + if err != nil { + log.WithError(err).Error("error ranging over keys") + return 1 + } + + return 0 +} diff --git a/go.mod b/go.mod index 03812da..f676246 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module git.mills.io/prologic/bitcask -go 1.13 +go 1.16 require ( + github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81 github.com/gofrs/flock v0.8.0 github.com/pkg/errors v0.9.1 github.com/plar/go-adaptive-radix-tree v1.0.4 diff --git a/go.sum b/go.sum index d99d940..881a1a2 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81 h1:uHogIJ9bXH75ZYrXnVShHIyywFiUZ7OOabwd9Sfd8rw= +github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81/go.mod h1:6ZvnjTZX1LNo1oLpfaJK8h+MXqHxcBFBIwkgsv+xlv0= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= @@ -162,6 +164,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= @@ -268,6 +271,7 @@ github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=