diff --git a/go.mod b/go.mod index 7c148f8..a2cf39d 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,10 @@ module github.com/prologic/bitcask go 1.13 require ( + github.com/boltdb/bolt v1.3.1 + github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c + github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect + github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect github.com/gofrs/flock v0.7.1 github.com/pelletier/go-toml v1.6.0 // indirect github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 707aa7b..90a7921 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -40,6 +42,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0= +github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= +github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A= +github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= +github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk= +github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= diff --git a/hash.go b/hash.go new file mode 100644 index 0000000..4b4841c --- /dev/null +++ b/hash.go @@ -0,0 +1,114 @@ +package bitcask + +import ( + "bytes" + "errors" +) + +func (b *Bitcask) Hash(key []byte) *Hash { + return &Hash{db: b, key: key} +} + +// Hash ... +// +key,h = "" +// h[key]name = "latermoon" +// h[key]age = "27" +// h[key]sex = "Male" +type Hash struct { + db *Bitcask + key []byte +} + +func (h *Hash) Get(field []byte) ([]byte, error) { + return h.db.Get(h.fieldKey(field)) +} + +func (h *Hash) MGet(fields ...[]byte) ([][]byte, error) { + vals := make([][]byte, 0, len(fields)) + for _, field := range fields { + val, err := h.db.Get(h.fieldKey(field)) + if err != nil { + return nil, err + } + vals = append(vals, val) + } + return vals, nil +} + +// GetAll ... +func (h *Hash) GetAll() (map[string][]byte, error) { + keyVals := map[string][]byte{} + prefix := h.fieldPrefix() + err := h.db.Scan(prefix, func(key []byte) error { + val, err := h.db.Get(key) + if err != nil { + return err + } + keyVals[string(h.fieldInKey(key))] = val + return nil + }) + return keyVals, err +} + +func (h *Hash) Set(field, value []byte) error { + return h.MSet(field, value) +} + +func (h *Hash) MSet(fieldVals ...[]byte) error { + if len(fieldVals) == 0 || len(fieldVals)%2 != 0 { + return errors.New("invalid field value pairs") + } + + for i := 0; i < len(fieldVals); i += 2 { + field, val := fieldVals[i], fieldVals[i+1] + if err := h.db.Put(h.fieldKey(field), val); err != nil { + return err + } + } + return h.db.Put(h.rawKey(), nil) +} + +func (h *Hash) Remove(fields ...[]byte) error { + for _, field := range fields { + if err := h.db.Delete(h.fieldKey(field)); err != nil { + return err + } + } + // clean up + prefix := h.fieldPrefix() + return h.db.Scan(prefix, func(key []byte) error { + return h.db.Delete(key) + }) +} + +func (h *Hash) Drop() error { + prefix := h.fieldPrefix() + err := h.db.Scan(prefix, func(key []byte) error { + return h.db.Delete(key) + }) + if err != nil { + return err + } + return h.db.Delete(h.rawKey()) +} + +// +key,h +func (h *Hash) rawKey() []byte { + return rawKey(h.key, HASH) +} + +// h[key]field +func (h *Hash) fieldKey(field []byte) []byte { + return bytes.Join([][]byte{h.fieldPrefix(), field}, nil) +} + +// h[key] +func (h *Hash) fieldPrefix() []byte { + return bytes.Join([][]byte{[]byte{byte(HASH)}, SOK, h.key, EOK}, nil) +} + +// split h[key]field into field +func (h *Hash) fieldInKey(fieldKey []byte) []byte { + right := bytes.Index(fieldKey, EOK) + return fieldKey[right+1:] +} diff --git a/list.go b/list.go new file mode 100644 index 0000000..f6b0ad1 --- /dev/null +++ b/list.go @@ -0,0 +1,200 @@ +package bitcask + +import ( + "bytes" + "errors" + "log" +) + +func (b *Bitcask) List(key []byte) *List { + return &List{db: b, key: key} +} + +// List ... +// +key,l = "" +// l[key]0 = "a" +// l[key]1 = "b" +// l[key]2 = "c" +type List struct { + db *Bitcask + key []byte +} + +func (l *List) Index(i int64) ([]byte, error) { + x, err := l.leftIndex() + if err != nil { + return nil, err + } + return l.db.Get(l.indexKey(x + i)) +} + +// Range enumerate value by index +// must >= 0 +// should equal to -1 or lager than +func (l *List) Range(start, stop int64, fn func(i int64, value []byte, quit *bool)) error { + if start < 0 || (stop != -1 && start > stop) { + return errors.New("bad start/stop index") + } + x, y, err := l.rangeIndex() + if err != nil { + return err + } + if stop == -1 { + stop = (y - x + 1) - 1 // (size) - 1 + } + min := l.indexKey(x + int64(start)) + max := l.indexKey(x + int64(stop)) + var i int64 // 0 + ErrStopIteration := errors.New("err: stop iteration") + err = l.db.Scan(min, func(key []byte) error { + if key != nil && bytes.Compare(key, max) <= 0 { + val, err := l.db.Get(key) + if err != nil { + return err + } + quit := false + if fn(start+i, val, &quit); quit { + return ErrStopIteration + } + i++ + return nil + } + return ErrStopIteration + }) + if err == ErrStopIteration { + return nil + } + return err +} + +// Append ... +func (l *List) Append(vals ...[]byte) error { + x, y, err := l.rangeIndex() + if err != nil { + return err + } + if x == 0 && y == -1 { + if err := l.db.Put(l.rawKey(), nil); err != nil { + return err + } + } + for i, val := range vals { + if err := l.db.Put(l.indexKey(y+int64(i)+1), val); err != nil { + return err + } + } + return nil +} + +// Pop ... +func (l *List) Pop() ([]byte, error) { + x, y, err := l.rangeIndex() + if err != nil { + return nil, err + } + + size := y - x + 1 + if size == 0 { + return nil, nil + } else if size < 0 { // double check + return nil, errors.New("bad list struct") + } + + idxkey := l.indexKey(y) + + val, err := l.db.Get(idxkey) + if err != nil { + return nil, err + } + if err := l.db.Delete(idxkey); err != nil { + return nil, err + } + if size == 1 { // clean up + return nil, l.db.Delete(l.rawKey()) + } + + return val, nil +} + +// Len ... +func (l *List) Len() (int64, error) { + x, y, err := l.rangeIndex() + return y - x + 1, err +} + +func (l *List) rangeIndex() (int64, int64, error) { + left, err := l.leftIndex() + if err != nil { + return 0, -1, err + } + right, err := l.rightIndex() + if err != nil { + return 0, -1, err + } + log.Printf("left: %d\n", left) + log.Printf("right: %d\n", right) + return left, right, nil +} + +func (l *List) leftIndex() (int64, error) { + log.Println("leftIndex:") + idx := int64(0) // default 0 + prefix := l.keyPrefix() + log.Printf(" prefix: %s\n", prefix) + ErrStopIteration := errors.New("err: stop iteration") + err := l.db.Scan(prefix, func(key []byte) error { + log.Printf(" key: %v\n", key) + if bytes.HasPrefix(key, prefix) { + idx = l.indexInKey(key) + log.Printf(" idx: %d\n", idx) + } + return ErrStopIteration + }) + if err == ErrStopIteration { + return idx, nil + } + return idx, err +} + +func (l *List) rightIndex() (int64, error) { + log.Println("rightIndex:") + idx := int64(-1) // default -1 + prefix := l.keyPrefix() + log.Printf(" prefix: %s\n", prefix) + err := l.db.Scan(prefix, func(key []byte) error { + log.Printf(" key: %v\n", key) + if bytes.HasPrefix(key, prefix) { + idx = l.indexInKey(key) + log.Printf(" idx: %d\n", idx) + } + return nil + }) + return idx, err +} + +// +key,l = "" +func (l *List) rawKey() []byte { + return rawKey(l.key, ElemType(LIST)) +} + +// l[key] +func (l *List) keyPrefix() []byte { + return bytes.Join([][]byte{[]byte{byte(LIST)}, SOK, l.key, EOK}, nil) +} + +// l[key]0 = "a" +func (l *List) indexKey(i int64) []byte { + sign := []byte{0} + if i >= 0 { + sign = []byte{1} + } + b := bytes.Join([][]byte{l.keyPrefix(), sign, itob(i)}, nil) + log.Printf("indexKeu: %x\n", b) + return b +} + +// split l[key]index into index +func (l *List) indexInKey(key []byte) int64 { + idxbuf := bytes.TrimPrefix(key, l.keyPrefix()) + return btoi(idxbuf[1:]) // skip sign "0/1" +} diff --git a/score.go b/score.go new file mode 100644 index 0000000..31a1b5e --- /dev/null +++ b/score.go @@ -0,0 +1,63 @@ +package bitcask + +import ( + "encoding/binary" + "math" +) + +// Score indicated that a number can be encoded to sorted []byte +// Score is use in SortedSet +// you can implement your own decode & encode function just like below +type Score []byte + +// Int64ToScore ... +func Int64ToScore(i int64) Score { + b := make([]byte, 9) + // store sign in the first byte to keep the score order + if i < 0 { + b[0] = byte(0) + } else { + b[0] = byte(1) + } + binary.LittleEndian.PutUint64(b[1:], uint64(i)) + return b +} + +// ScoreToInt64 ... +func ScoreToInt64(b Score) int64 { + return int64(binary.LittleEndian.Uint64(b[1:])) +} + +// Float64ToScore ... +func Float64ToScore(f float64) Score { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, float64ToUint64(f)) + return b +} + +// ScoreToFloat64 ... +func ScoreToFloat64(b Score) float64 { + return uint64ToFloat64(binary.LittleEndian.Uint64(b)) +} + +// Copy from https://github.com/reborndb/qdb/blob/master/pkg/store/util.go +// We can not use lexicographically bytes comparison for negative and positive float directly. +// so here we will do a trick below. +func float64ToUint64(f float64) uint64 { + u := math.Float64bits(f) + if f >= 0 { + u |= 0x8000000000000000 + } else { + u = ^u + } + return u +} + +func uint64ToFloat64(u uint64) float64 { + if u&0x8000000000000000 > 0 { + u &= ^uint64(0x8000000000000000) + } else { + u = ^u + } + return math.Float64frombits(u) +} diff --git a/sortedset.go b/sortedset.go new file mode 100644 index 0000000..c3cf86f --- /dev/null +++ b/sortedset.go @@ -0,0 +1,163 @@ +package bitcask + +import ( + "bytes" + "errors" +) + +func (b *Bitcask) SortedSet(key []byte) *SortedSet { + return &SortedSet{db: b, key: key} +} + +// SortedSet ... +// +key,z = "" +// z[key]m member = score +// z[key]s score member = "" +type SortedSet struct { + db *Bitcask + key []byte +} + +// Add add score & member pairs +// SortedSet.Add(Score, []byte, Score, []byte ...) +func (s *SortedSet) Add(scoreMembers ...[]byte) (int, error) { + count := len(scoreMembers) + if count < 2 || count%2 != 0 { + return 0, errors.New("invalid score/member pairs") + } + added := 0 + for i := 0; i < count; i += 2 { + score, member := scoreMembers[i], scoreMembers[i+1] + skey, mkey := s.scoreKey(score, member), s.memberKey(member) + oldscore, err := s.db.Get(mkey) + if err != nil && err != ErrKeyNotFound { + return added, err + } + // remove old score key + if oldscore != nil { + oldskey := s.scoreKey(oldscore, member) + if err := s.db.Delete(oldskey); err != nil { + return added, err + } + } else { + added++ + } + if err := s.db.Put(mkey, score); err != nil { + return added, err + } + if err := s.db.Put(skey, nil); err != nil { + return added, err + } + } + if err := s.db.Put(s.rawKey(), nil); err != nil { + return added, err + } + return added, nil +} + +func (s SortedSet) Score(member []byte) (Score, error) { + return s.db.Get(s.memberKey(member)) +} + +func (s *SortedSet) Remove(members ...[]byte) (int, error) { + removed := 0 // not including non existing members + for _, member := range members { + score, err := s.db.Get(s.memberKey(member)) + if err != nil { + return removed, err + } + if score == nil { + continue + } + if err := s.db.Delete(s.scoreKey(score, member)); err != nil { + return removed, err + } + if err := s.db.Delete(s.memberKey(member)); err != nil { + return removed, err + } + removed++ + } + // clean up + prefix := s.keyPrefix() + ErrStopIteration := errors.New("err: stop iteration") + err := s.db.Scan(prefix, func(key []byte) error { + if !bytes.HasPrefix(key, prefix) { + if err := s.db.Delete(s.rawKey()); err != nil { + return err + } + } + return ErrStopIteration + }) + if err != ErrStopIteration { + return removed, err + } + return removed, nil +} + +// Range ... +// is less than +func (s *SortedSet) Range(fr, to Score, fn func(i int64, score Score, member []byte, quit *bool)) error { + min := s.scorePrefix(fr) + max := append(s.scorePrefix(to), MAXBYTE) + var i int64 // 0 + ErrStopIteration := errors.New("err: stop iteration") + err := s.db.Scan(min, func(key []byte) error { + if bytes.Compare(key, max) <= 0 { + quit := false + score, member, err := s.splitScoreKey(key) + if err != nil { + return err + } + if fn(i, score, member, &quit); quit { + return ErrStopIteration + } + i++ + } + return nil + }) + if err != ErrStopIteration { + return err + } + return nil +} + +// +key,z = "" +func (s *SortedSet) rawKey() []byte { + return rawKey(s.key, ElemType(SORTEDSET)) +} + +// z[key] +func (s *SortedSet) keyPrefix() []byte { + return bytes.Join([][]byte{[]byte{byte(SORTEDSET)}, SOK, s.key, EOK}, nil) +} + +// z[key]m +func (s *SortedSet) memberKey(member []byte) []byte { + return bytes.Join([][]byte{s.keyPrefix(), []byte{'m'}, member}, nil) +} + +// z[key]s score +func (s *SortedSet) scorePrefix(score []byte) []byte { + return bytes.Join([][]byte{s.keyPrefix(), []byte{'s'}, score, []byte{' '}}, nil) +} + +// z[key]s score member +func (s *SortedSet) scoreKey(score, member []byte) []byte { + return bytes.Join([][]byte{s.keyPrefix(), []byte{'s'}, score, []byte{' '}, member}, nil) +} + +// split (z[key]s score member) into (score, member) +func (s *SortedSet) splitScoreKey(skey []byte) ([]byte, []byte, error) { + buf := bytes.TrimPrefix(skey, s.keyPrefix()) + pairs := bytes.Split(buf[1:], []byte{' '}) // skip score mark 's' + if len(pairs) != 2 { + return nil, nil, errors.New("invalid score/member key: " + string(skey)) + } + return pairs[0], pairs[1], nil +} + +// split (z[key]m member) into (member) +func (s *SortedSet) splitMemberKey(mkey []byte) ([]byte, error) { + buf := bytes.TrimPrefix(mkey, s.keyPrefix()) + return buf[1:], nil // skip member mark 'm' +} diff --git a/tests/test_hash.go b/tests/test_hash.go new file mode 100644 index 0000000..8be6b8d --- /dev/null +++ b/tests/test_hash.go @@ -0,0 +1,27 @@ +package main + +import ( + "log" + + "github.com/prologic/bitcask" +) + +func main() { + db, err := bitcask.Open("test.db") + if err != nil { + log.Fatal(err) + } + l := db.Hash([]byte("foo")) + err = l.Set([]byte("1"), []byte("one")) + if err != nil { + log.Fatal(err) + } + err = l.Set([]byte("2"), []byte("two")) + if err != nil { + log.Fatal(err) + } + err = l.Set([]byte("3"), []byte("three")) + if err != nil { + log.Fatal(err) + } +} diff --git a/tests/test_list.go b/tests/test_list.go new file mode 100644 index 0000000..cc18229 --- /dev/null +++ b/tests/test_list.go @@ -0,0 +1,33 @@ +package main + +import ( + "fmt" + "log" + + "github.com/prologic/bitcask" +) + +func main() { + db, err := bitcask.Open("test.db") + if err != nil { + log.Fatal(err) + } + l := db.List([]byte("foo")) + err = l.Append([]byte("one")) + if err != nil { + log.Fatal(err) + } + err = l.Append([]byte("two")) + if err != nil { + log.Fatal(err) + } + err = l.Append([]byte("three")) + if err != nil { + log.Fatal(err) + } + len, err := l.Len() + if err != nil { + log.Fatal(err) + } + fmt.Printf("len: %d\n", len) +} diff --git a/tests/test_sortedset.go b/tests/test_sortedset.go new file mode 100644 index 0000000..012eb20 --- /dev/null +++ b/tests/test_sortedset.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "log" + + "github.com/prologic/bitcask" +) + +func main() { + db, err := bitcask.Open("test.db") + if err != nil { + log.Fatal(err) + } + z := db.SortedSet([]byte("foo")) + added, err := z.Add( + bitcask.Int64ToScore(1), []byte("a"), + bitcask.Int64ToScore(2), []byte("b"), + bitcask.Int64ToScore(3), []byte("c"), + ) + if err != nil { + log.Fatal(err) + } + fmt.Printf("added %d\n", added) + + score, err := z.Score([]byte("b")) + if err != nil { + log.Fatal(err) + } + fmt.Printf("score: %d\n", bitcask.ScoreToInt64(score)) +} diff --git a/tests/tests b/tests/tests new file mode 100755 index 0000000..6547d3a Binary files /dev/null and b/tests/tests differ diff --git a/util.go b/util.go new file mode 100644 index 0000000..38ccd30 --- /dev/null +++ b/util.go @@ -0,0 +1,80 @@ +package bitcask + +import ( + "bytes" + "encoding/binary" + "errors" + "math" +) + +var ErrInvalidKeyFormat = errors.New("invalid key format includes +[],") + +// Raw key: +// +key,type = value +// +name,s = "latermoon" + +var ( + SEP = []byte{','} + KEY = []byte{'+'} // Key Prefix + SOK = []byte{'['} // Start of Key + EOK = []byte{']'} // End of Key +) + +const ( + MINBYTE byte = 0 + MAXBYTE byte = math.MaxUint8 +) + +type ElemType byte + +const ( + STRING ElemType = 's' + HASH ElemType = 'h' + LIST ElemType = 'l' + SORTEDSET ElemType = 'z' + NONE ElemType = '0' +) + +func (e ElemType) String() string { + switch byte(e) { + case 's': + return "string" + case 'h': + return "hash" + case 'l': + return "list" + case 'z': + return "sortedset" + default: + return "none" + } +} + +func rawKey(key []byte, t ElemType) []byte { + return bytes.Join([][]byte{KEY, key, SEP, []byte{byte(t)}}, nil) +} + +func verifyKey(key []byte) error { + err := ErrInvalidKeyFormat + if bytes.Contains(key, SEP) { + return err + } else if bytes.Contains(key, KEY) { + return err + } else if bytes.Contains(key, SOK) { + return err + } else if bytes.Contains(key, EOK) { + return err + } + return nil +} + +// itob returns an 8-byte big endian representation of v. +func itob(i int64) []byte { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(i)) + return b +} + +func btoi(b []byte) int64 { + return int64(binary.LittleEndian.Uint64(b)) +}