mirror of
https://github.com/gogrlx/bitcask.git
synced 2026-04-02 11:09:01 -07:00
Compare commits
29 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ec5b07eea | ||
|
|
5ed43e5a20 | ||
| edd32cad0a | |||
| d23c355e72 | |||
| 40425394d7 | |||
| f4cc0fb434 | |||
| 7d4174d5b1 | |||
|
|
5429693cc8 | ||
|
|
2c57c950f8 | ||
|
|
21a824e13e | ||
| 2279245b8c | |||
| 91d4db63d5 | |||
| 849192f709 | |||
| a4fc2cf4e8 | |||
| 609de833eb | |||
|
|
9b0daa8a30 | ||
| ef187f8315 | |||
|
|
b094cd33d3 | ||
| 3ff8937205 | |||
| 2ccca759ce | |||
| 92535e654b | |||
| c4a7ad7a7f | |||
| e64646fa8f | |||
| 2de030ad5c | |||
|
|
5e4d863ab7 | ||
|
|
a49bbf666a | ||
| 52df2fad55 | |||
| 947d15fed8 | |||
| d276c398da |
28
.drone.yml
Normal file
28
.drone.yml
Normal file
@@ -0,0 +1,28 @@
|
||||
---
|
||||
kind: pipeline
|
||||
name: default
|
||||
|
||||
steps:
|
||||
- name: build & run tests
|
||||
image: r.mills.io/prologic/golang-alpine
|
||||
commands:
|
||||
- make build
|
||||
- make test
|
||||
|
||||
- name: notify
|
||||
image: plugins/webhook
|
||||
settings:
|
||||
urls:
|
||||
- https://msgbus.mills.io/ci.mills.io
|
||||
when:
|
||||
status:
|
||||
- success
|
||||
- failure
|
||||
|
||||
trigger:
|
||||
branch:
|
||||
- master
|
||||
event:
|
||||
- tag
|
||||
- push
|
||||
- pull_request
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -5,6 +5,7 @@
|
||||
|
||||
/tmp
|
||||
/dist
|
||||
/cacheDb
|
||||
/coverage.txt
|
||||
|
||||
/bitcask
|
||||
|
||||
2
AUTHORS
2
AUTHORS
@@ -17,3 +17,5 @@ Yash Chandra <yashschandra@gmail.com>
|
||||
Yury Fedorov orlangure
|
||||
o2gy84 <o2gy84@gmail.com>
|
||||
garsue <labs.garsue@gmail.com>
|
||||
biozz <ielfimov@gmail.com>
|
||||
jason3gb <jason3gb@gmail.com>
|
||||
82
CHANGELOG.md
82
CHANGELOG.md
@@ -1,6 +1,86 @@
|
||||
|
||||
<a name="v1.0.2"></a>
|
||||
## [v1.0.2](https://git.mills.io/prologic/bitcask/compare/v1.0.1...v1.0.2) (2021-11-01)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* Fix a data race in Datafile.ReadAt()
|
||||
* Fix release tool
|
||||
|
||||
|
||||
<a name="v1.0.1"></a>
|
||||
## [v1.0.1](https://git.mills.io/prologic/bitcask/compare/v1.0.0...v1.0.1) (2021-10-31)
|
||||
|
||||
### Features
|
||||
|
||||
* Add ErrBadConfig and ErrBadMetadata as errors that consumers can check and use (#241)
|
||||
* Add key prefix matching to KEYS command (#237)
|
||||
|
||||
### Updates
|
||||
|
||||
* Update CHANGELOG for v1.0.1
|
||||
* Update image target
|
||||
|
||||
|
||||
<a name="v1.0.0"></a>
|
||||
## [v1.0.0](https://git.mills.io/prologic/bitcask/compare/1.0.0...v1.0.0) (2021-07-24)
|
||||
|
||||
### Updates
|
||||
|
||||
* Update CHANGELOG for v1.0.0
|
||||
|
||||
|
||||
<a name="1.0.0"></a>
|
||||
## [1.0.0](https://git.mills.io/prologic/bitcask/compare/v0.3.14...1.0.0) (2021-07-24)
|
||||
|
||||
### Updates
|
||||
|
||||
* Update CHANGELOG for 1.0.0
|
||||
* Update README
|
||||
|
||||
|
||||
<a name="v0.3.14"></a>
|
||||
## [v0.3.14](https://git.mills.io/prologic/bitcask/compare/v0.3.13...v0.3.14) (2021-07-21)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* Fix runGC behaviour to correctly delete all expired keys (#229)
|
||||
* Fix missing push event
|
||||
* Fix how CI is triggered
|
||||
* Fix README Go Reference badge
|
||||
* Fix README badges
|
||||
|
||||
### Features
|
||||
|
||||
* Add RangeScan() support (#160)
|
||||
|
||||
### Updates
|
||||
|
||||
* Update CHANGELOG for v0.3.14
|
||||
|
||||
|
||||
<a name="v0.3.13"></a>
|
||||
## [v0.3.13](https://git.mills.io/prologic/bitcask/compare/v0.3.12...v0.3.13) (2021-07-16)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* Fix paths used for temporary recovery iles to avoid crossing devices (#223)
|
||||
|
||||
### Features
|
||||
|
||||
* Add Drone CI config
|
||||
|
||||
### Updates
|
||||
|
||||
* Update CHANGELOG for v0.3.13
|
||||
|
||||
|
||||
<a name="v0.3.12"></a>
|
||||
## [v0.3.12](https://git.mills.io/prologic/bitcask/compare/v0.3.11...v0.3.12) (0001-01-01)
|
||||
## [v0.3.12](https://git.mills.io/prologic/bitcask/compare/v0.3.11...v0.3.12) (2021-07-13)
|
||||
|
||||
### Updates
|
||||
|
||||
* Update CHANGELOG for v0.3.12
|
||||
|
||||
|
||||
<a name="v0.3.11"></a>
|
||||
|
||||
34
Makefile
34
Makefile
@@ -3,6 +3,10 @@
|
||||
CGO_ENABLED=0
|
||||
VERSION=$(shell git describe --abbrev=0 --tags)
|
||||
COMMIT=$(shell git rev-parse --short HEAD)
|
||||
BUILD=$(shell git show -s --pretty=format:%cI)
|
||||
GOCMD=go
|
||||
|
||||
DESTDIR=/usr/local/bin
|
||||
|
||||
all: dev
|
||||
|
||||
@@ -11,46 +15,52 @@ dev: build
|
||||
@./bitcaskd --version
|
||||
|
||||
build: clean generate
|
||||
@go build \
|
||||
@$(GOCMD) build \
|
||||
-tags "netgo static_build" -installsuffix netgo \
|
||||
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT)" \
|
||||
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT) -X $(shell go list)/internal.Build=$(BUILD)" \
|
||||
./cmd/bitcask/...
|
||||
@go build \
|
||||
@$(GOCMD) build \
|
||||
-tags "netgo static_build" -installsuffix netgo \
|
||||
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT)" \
|
||||
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT) -X $(shell go list)/internal.Build=$(BUILD)" \
|
||||
./cmd/bitcaskd/...
|
||||
|
||||
generate:
|
||||
@go generate $(shell go list)/...
|
||||
@$(GOCMD) generate $(shell go list)/...
|
||||
|
||||
install: build
|
||||
@go install ./cmd/bitcask/...
|
||||
@go install ./cmd/bitcaskd/...
|
||||
@install -D -m 755 bitcask $(DESTDIR)/bitcask
|
||||
@install -D -m 755 bitcaskd $(DESTDIR)/bitcaskd
|
||||
|
||||
ifeq ($(PUBLISH), 1)
|
||||
image:
|
||||
@docker build -t prologic/bitcask .
|
||||
@docker build --build-arg VERSION="$(VERSION)" --build-arg COMMIT="$(COMMIT)" -t prologic/bitcask .
|
||||
@docker push prologic/bitcask
|
||||
else
|
||||
image:
|
||||
@docker build --build-arg VERSION="$(VERSION)" --build-arg COMMIT="$(COMMIT)" -t prologic/bitcask .
|
||||
endif
|
||||
|
||||
release:
|
||||
@./tools/release.sh
|
||||
|
||||
profile: build
|
||||
@go test -cpuprofile cpu.prof -memprofile mem.prof -v -bench .
|
||||
@$(GOCMD) test -cpuprofile cpu.prof -memprofile mem.prof -v -bench .
|
||||
|
||||
bench: build
|
||||
@go test -v -run=XXX -benchmem -bench=. .
|
||||
@$(GOCMD) test -v -run=XXX -benchmem -bench=. .
|
||||
|
||||
mocks:
|
||||
@mockery -all -case underscore -output ./internal/mocks -recursive
|
||||
|
||||
test: build
|
||||
@go test -v \
|
||||
@$(GOCMD) test -v \
|
||||
-cover -coverprofile=coverage.txt -covermode=atomic \
|
||||
-coverpkg=$(shell go list) \
|
||||
-race \
|
||||
.
|
||||
|
||||
setup:
|
||||
@go get github.com/vektra/mockery/...
|
||||
@$(GOCMD) get github.com/vektra/mockery/...
|
||||
|
||||
clean:
|
||||
@git clean -f -d -X
|
||||
|
||||
35
README.md
35
README.md
@@ -1,17 +1,8 @@
|
||||
# bitcask
|
||||
|
||||

|
||||

|
||||

|
||||

|
||||
|
||||
[](https://codecov.io/gh/prologic/bitcask)
|
||||
[](https://goreportcard.com/report/prologic/bitcask)
|
||||
[](https://codebeat.co/projects/github-com-prologic-bitcask-master)
|
||||
[](https://godoc.org/git.mills.io/prologic/bitcask)
|
||||
[](https://git.mills.io/prologic/bitcask)
|
||||
[](https://sourcegraph.com/git.mills.io/prologic/bitcask?badge)
|
||||
[](https://www.tickgit.com/browse?repo=git.mills.io/prologic/bitcask)
|
||||
[](https://ci.mills.io/prologic/bitcask)
|
||||
[](https://goreportcard.com/report/git.mills.io/prologic/bitcask)
|
||||
[](https://pkg.go.dev/git.mills.io/prologic/bitcask)
|
||||
|
||||
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/)
|
||||
|
||||
@@ -229,24 +220,14 @@ Support the ongoing development of Bitcask!
|
||||
|
||||
- Become a [Sponsor](https://www.patreon.com/prologic)
|
||||
|
||||
## Stargazers over time
|
||||
|
||||
[](https://starcharts.herokuapp.com/prologic/bitcask)
|
||||
|
||||
## Contributors
|
||||
|
||||
Thank you to all those that have contributed to this project, battle-tested it, used it in their own projects or products, fixed bugs, improved performance and even fix tiny typos in documentation! Thank you and keep contributing!
|
||||
Thank you to all those that have contributed to this project, battle-tested it,
|
||||
used it in their own projects or products, fixed bugs, improved performance
|
||||
and even fix tiny typos in documentation! Thank you and keep contributing!
|
||||
|
||||
You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors to the project. If you contribute a PR please consider adding your name there. There is also GitHub's own [Contributors](https://git.mills.io/prologic/bitcask/graphs/contributors) statistics.
|
||||
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/0)
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/1)
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/2)
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/3)
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/4)
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/5)
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/6)
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/7)
|
||||
You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors
|
||||
to the project. If you contribute a PR please consider adding your name there.
|
||||
|
||||
## Related Projects
|
||||
|
||||
|
||||
251
bitcask.go
251
bitcask.go
@@ -1,7 +1,7 @@
|
||||
package bitcask
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
@@ -13,8 +13,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/abcum/lcp"
|
||||
"github.com/gofrs/flock"
|
||||
art "github.com/plar/go-adaptive-radix-tree"
|
||||
"git.mills.io/prologic/bitcask/flock"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"git.mills.io/prologic/bitcask/internal"
|
||||
"git.mills.io/prologic/bitcask/internal/config"
|
||||
"git.mills.io/prologic/bitcask/internal/data"
|
||||
@@ -22,7 +25,6 @@ import (
|
||||
"git.mills.io/prologic/bitcask/internal/index"
|
||||
"git.mills.io/prologic/bitcask/internal/metadata"
|
||||
"git.mills.io/prologic/bitcask/scripts/migrations"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -30,48 +32,12 @@ const (
|
||||
ttlIndexFile = "ttl_index"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrKeyNotFound is the error returned when a key is not found
|
||||
ErrKeyNotFound = errors.New("error: key not found")
|
||||
|
||||
// ErrKeyTooLarge is the error returned for a key that exceeds the
|
||||
// maximum allowed key size (configured with WithMaxKeySize).
|
||||
ErrKeyTooLarge = errors.New("error: key too large")
|
||||
|
||||
// ErrKeyExpired is the error returned when a key is queried which has
|
||||
// already expired (due to ttl)
|
||||
ErrKeyExpired = errors.New("error: key expired")
|
||||
|
||||
// ErrEmptyKey is the error returned for a value with an empty key.
|
||||
ErrEmptyKey = errors.New("error: empty key")
|
||||
|
||||
// ErrValueTooLarge is the error returned for a value that exceeds the
|
||||
// maximum allowed value size (configured with WithMaxValueSize).
|
||||
ErrValueTooLarge = errors.New("error: value too large")
|
||||
|
||||
// ErrChecksumFailed is the error returned if a key/value retrieved does
|
||||
// not match its CRC checksum
|
||||
ErrChecksumFailed = errors.New("error: checksum failed")
|
||||
|
||||
// ErrDatabaseLocked is the error returned if the database is locked
|
||||
// (typically opened by another process)
|
||||
ErrDatabaseLocked = errors.New("error: database locked")
|
||||
|
||||
ErrInvalidVersion = errors.New("error: invalid db version")
|
||||
|
||||
// ErrMergeInProgress is the error returned if merge is called when already a merge
|
||||
// is in progress
|
||||
ErrMergeInProgress = errors.New("error: merge already in progress")
|
||||
)
|
||||
|
||||
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
|
||||
// and in-memory hash of key/value pairs as per the Bitcask paper and seen
|
||||
// in the Riak database.
|
||||
type Bitcask struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
*flock.Flock
|
||||
|
||||
mu sync.RWMutex
|
||||
flock *flock.Flock
|
||||
config *config.Config
|
||||
options []Option
|
||||
path string
|
||||
@@ -114,7 +80,7 @@ func (b *Bitcask) Close() error {
|
||||
b.mu.RLock()
|
||||
defer func() {
|
||||
b.mu.RUnlock()
|
||||
b.Flock.Unlock()
|
||||
b.flock.Unlock()
|
||||
}()
|
||||
|
||||
return b.close()
|
||||
@@ -275,6 +241,41 @@ func (b *Bitcask) delete(key []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sift iterates over all keys in the database calling the function `f` for
|
||||
// each key. If the KV pair is expired or the function returns true, that key is
|
||||
// deleted from the database.
|
||||
// If the function returns an error on any key, no further keys are processed, no
|
||||
// keys are deleted, and the first error is returned.
|
||||
func (b *Bitcask) Sift(f func(key []byte) (bool, error)) (err error) {
|
||||
keysToDelete := art.New()
|
||||
|
||||
b.mu.RLock()
|
||||
b.trie.ForEach(func(node art.Node) bool {
|
||||
if b.isExpired(node.Key()) {
|
||||
keysToDelete.Insert(node.Key(), true)
|
||||
return true
|
||||
}
|
||||
var shouldDelete bool
|
||||
if shouldDelete, err = f(node.Key()); err != nil {
|
||||
return false
|
||||
} else if shouldDelete {
|
||||
keysToDelete.Insert(node.Key(), true)
|
||||
}
|
||||
return true
|
||||
})
|
||||
b.mu.RUnlock()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
keysToDelete.ForEach(func(node art.Node) (cont bool) {
|
||||
b.delete(node.Key())
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteAll deletes all the keys. If an I/O error occurs the error is returned.
|
||||
func (b *Bitcask) DeleteAll() (err error) {
|
||||
b.mu.RLock()
|
||||
@@ -297,8 +298,11 @@ func (b *Bitcask) DeleteAll() (err error) {
|
||||
|
||||
// Scan performs a prefix scan of keys matching the given prefix and calling
|
||||
// the function `f` with the keys found. If the function returns an error
|
||||
// no further keys are processed and the first error returned.
|
||||
// no further keys are processed and the first error is returned.
|
||||
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
|
||||
// Skip the root node
|
||||
if len(node.Key()) == 0 {
|
||||
@@ -313,6 +317,132 @@ func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// SiftScan iterates over all keys in the database beginning with the given
|
||||
// prefix, calling the function `f` for each key. If the KV pair is expired or
|
||||
// the function returns true, that key is deleted from the database.
|
||||
// If the function returns an error on any key, no further keys are processed,
|
||||
// no keys are deleted, and the first error is returned.
|
||||
func (b *Bitcask) SiftScan(prefix []byte, f func(key []byte) (bool, error)) (err error) {
|
||||
keysToDelete := art.New()
|
||||
|
||||
b.mu.RLock()
|
||||
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
|
||||
// Skip the root node
|
||||
if len(node.Key()) == 0 {
|
||||
return true
|
||||
}
|
||||
if b.isExpired(node.Key()) {
|
||||
keysToDelete.Insert(node.Key(), true)
|
||||
return true
|
||||
}
|
||||
var shouldDelete bool
|
||||
if shouldDelete, err = f(node.Key()); err != nil {
|
||||
return false
|
||||
} else if shouldDelete {
|
||||
keysToDelete.Insert(node.Key(), true)
|
||||
}
|
||||
return true
|
||||
})
|
||||
b.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
keysToDelete.ForEach(func(node art.Node) (cont bool) {
|
||||
b.delete(node.Key())
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Range performs a range scan of keys matching a range of keys between the
|
||||
// start key and end key and calling the function `f` with the keys found.
|
||||
// If the function returns an error no further keys are processed and the
|
||||
// first error returned.
|
||||
func (b *Bitcask) Range(start, end []byte, f func(key []byte) error) (err error) {
|
||||
if bytes.Compare(start, end) == 1 {
|
||||
return ErrInvalidRange
|
||||
}
|
||||
|
||||
commonPrefix := lcp.LCP(start, end)
|
||||
if commonPrefix == nil {
|
||||
return ErrInvalidRange
|
||||
}
|
||||
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool {
|
||||
if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 {
|
||||
if err = f(node.Key()); err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
} else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// SiftRange performs a range scan of keys matching a range of keys between the
|
||||
// start key and end key and calling the function `f` with the keys found.
|
||||
// If the KV pair is expired or the function returns true, that key is deleted
|
||||
// from the database.
|
||||
// If the function returns an error on any key, no further keys are processed, no
|
||||
// keys are deleted, and the first error is returned.
|
||||
func (b *Bitcask) SiftRange(start, end []byte, f func(key []byte) (bool, error)) (err error) {
|
||||
if bytes.Compare(start, end) == 1 {
|
||||
return ErrInvalidRange
|
||||
}
|
||||
|
||||
commonPrefix := lcp.LCP(start, end)
|
||||
if commonPrefix == nil {
|
||||
return ErrInvalidRange
|
||||
}
|
||||
|
||||
keysToDelete := art.New()
|
||||
|
||||
b.mu.RLock()
|
||||
b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool {
|
||||
if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 {
|
||||
if b.isExpired(node.Key()) {
|
||||
keysToDelete.Insert(node.Key(), true)
|
||||
return true
|
||||
}
|
||||
var shouldDelete bool
|
||||
if shouldDelete, err = f(node.Key()); err != nil {
|
||||
return false
|
||||
} else if shouldDelete {
|
||||
keysToDelete.Insert(node.Key(), true)
|
||||
}
|
||||
return true
|
||||
} else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
b.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
keysToDelete.ForEach(func(node art.Node) (cont bool) {
|
||||
b.delete(node.Key())
|
||||
return true
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Len returns the total number of keys in the database
|
||||
func (b *Bitcask) Len() int {
|
||||
b.mu.RLock()
|
||||
@@ -350,21 +480,29 @@ func (b *Bitcask) RunGC() error {
|
||||
// runGC deletes all keys that are expired
|
||||
// caller function should take care of the locking when calling this method
|
||||
func (b *Bitcask) runGC() (err error) {
|
||||
keysToDelete := art.New()
|
||||
|
||||
b.ttlIndex.ForEach(func(node art.Node) (cont bool) {
|
||||
if !b.isExpired(node.Key()) {
|
||||
// later, return false here when the ttlIndex is sorted
|
||||
return true
|
||||
}
|
||||
if err = b.delete(node.Key()); err != nil {
|
||||
return false
|
||||
}
|
||||
keysToDelete.Insert(node.Key(), true)
|
||||
//keysToDelete = append(keysToDelete, node.Key())
|
||||
return true
|
||||
})
|
||||
return
|
||||
|
||||
keysToDelete.ForEach(func(node art.Node) (cont bool) {
|
||||
b.delete(node.Key())
|
||||
return true
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fold iterates over all keys in the database calling the function `f` for
|
||||
// each key. If the function returns an error, no further keys are processed
|
||||
// and the error returned.
|
||||
// and the error is returned.
|
||||
func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
@@ -387,8 +525,7 @@ func (b *Bitcask) get(key []byte) (internal.Entry, error) {
|
||||
if !found {
|
||||
return internal.Entry{}, ErrKeyNotFound
|
||||
}
|
||||
if expired := b.isExpired(key); expired {
|
||||
_ = b.delete(key) // we don't care if it doesnt succeed
|
||||
if b.isExpired(key) {
|
||||
return internal.Entry{}, ErrKeyExpired
|
||||
}
|
||||
|
||||
@@ -669,6 +806,10 @@ func (b *Bitcask) Merge() error {
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
// see #225
|
||||
if file.Name() == lockfile {
|
||||
continue
|
||||
}
|
||||
err := os.Rename(
|
||||
path.Join([]string{mdb.path, file.Name()}...),
|
||||
path.Join([]string{b.path, file.Name()}...),
|
||||
@@ -697,7 +838,7 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
if internal.Exists(configPath) {
|
||||
cfg, err = config.Load(configPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, &ErrBadConfig{err}
|
||||
}
|
||||
} else {
|
||||
cfg = newDefaultConfig()
|
||||
@@ -719,11 +860,11 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
|
||||
meta, err = loadMetadata(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, &ErrBadMetadata{err}
|
||||
}
|
||||
|
||||
bitcask := &Bitcask{
|
||||
Flock: flock.New(filepath.Join(path, lockfile)),
|
||||
flock: flock.New(filepath.Join(path, lockfile)),
|
||||
config: cfg,
|
||||
options: options,
|
||||
path: path,
|
||||
@@ -732,12 +873,12 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
metadata: meta,
|
||||
}
|
||||
|
||||
locked, err := bitcask.Flock.TryLock()
|
||||
ok, err := bitcask.flock.TryLock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !locked {
|
||||
if !ok {
|
||||
return nil, ErrDatabaseLocked
|
||||
}
|
||||
|
||||
|
||||
355
bitcask_test.go
355
bitcask_test.go
@@ -168,6 +168,65 @@ func TestAll(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Sift", func(t *testing.T) {
|
||||
err = db.Put([]byte("toBeSifted"), []byte("siftMe"))
|
||||
assert.NoError(err)
|
||||
err = db.Put([]byte("notToBeSifted"), []byte("dontSiftMe"))
|
||||
assert.NoError(err)
|
||||
err := db.Sift(func(key []byte) (bool, error) {
|
||||
value, err := db.Get(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if string(value) == "siftMe" {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
assert.NoError(err)
|
||||
_, err = db.Get([]byte("toBeSifted"))
|
||||
assert.Equal(ErrKeyNotFound, err)
|
||||
_, err = db.Get([]byte("notToBeSifted"))
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("SiftScan", func(t *testing.T) {
|
||||
err := db.DeleteAll()
|
||||
assert.NoError(err)
|
||||
err = db.Put([]byte("toBeSifted"), []byte("siftMe"))
|
||||
assert.NoError(err)
|
||||
err = db.Put([]byte("toBeSkipped"), []byte("siftMe"))
|
||||
assert.NoError(err)
|
||||
err = db.Put([]byte("toBeSiftedAsWell"), []byte("siftMe"))
|
||||
assert.NoError(err)
|
||||
err = db.Put([]byte("toBeSiftedButNotReally"), []byte("dontSiftMe"))
|
||||
assert.NoError(err)
|
||||
err = db.SiftScan([]byte("toBeSifted"), func(key []byte) (bool, error) {
|
||||
value, err := db.Get(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if string(value) == "siftMe" {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
assert.NoError(err)
|
||||
_, err = db.Get([]byte("toBeSifted"))
|
||||
assert.Equal(ErrKeyNotFound, err)
|
||||
_, err = db.Get([]byte("toBeSiftedAsWell"))
|
||||
assert.Equal(ErrKeyNotFound, err)
|
||||
_, err = db.Get([]byte("toBeSkipped"))
|
||||
assert.NoError(err)
|
||||
_, err = db.Get([]byte("toBeSiftedButNotReally"))
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("DeleteAll", func(t *testing.T) {
|
||||
err = db.DeleteAll()
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Close", func(t *testing.T) {
|
||||
err = db.Close()
|
||||
assert.NoError(err)
|
||||
@@ -479,6 +538,7 @@ func TestAutoRecovery(t *testing.T) {
|
||||
require.NoError(err)
|
||||
|
||||
db, err = Open(testdir, WithAutoRecovery(autoRecovery))
|
||||
t.Logf("err: %s", err)
|
||||
require.NoError(err)
|
||||
defer db.Close()
|
||||
// Check that all values but the last are still intact.
|
||||
@@ -1654,6 +1714,93 @@ func TestConcurrent(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestSift(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
|
||||
var db *Bitcask
|
||||
|
||||
t.Run("Setup", func(t *testing.T) {
|
||||
t.Run("Open", func(t *testing.T) {
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
var items = map[string][]byte{
|
||||
"1": []byte("1"),
|
||||
"2": []byte("2"),
|
||||
"3": []byte("3"),
|
||||
"food": []byte("pizza"),
|
||||
"foo": []byte([]byte("foo")),
|
||||
"fooz": []byte("fooz ball"),
|
||||
"hello": []byte("world"),
|
||||
}
|
||||
for k, v := range items {
|
||||
err = db.Put([]byte(k), v)
|
||||
assert.NoError(err)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("SiftErrors", func(t *testing.T) {
|
||||
err = db.Sift(func(key []byte) (bool, error) {
|
||||
return false, ErrMockError
|
||||
})
|
||||
assert.Equal(ErrMockError, err)
|
||||
|
||||
err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) {
|
||||
return true, ErrMockError
|
||||
})
|
||||
assert.Equal(ErrMockError, err)
|
||||
})
|
||||
}
|
||||
func TestSiftScan(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
|
||||
var db *Bitcask
|
||||
|
||||
t.Run("Setup", func(t *testing.T) {
|
||||
t.Run("Open", func(t *testing.T) {
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
var items = map[string][]byte{
|
||||
"1": []byte("1"),
|
||||
"2": []byte("2"),
|
||||
"3": []byte("3"),
|
||||
"food": []byte("pizza"),
|
||||
"foo": []byte([]byte("foo")),
|
||||
"fooz": []byte("fooz ball"),
|
||||
"hello": []byte("world"),
|
||||
}
|
||||
for k, v := range items {
|
||||
err = db.Put([]byte(k), v)
|
||||
assert.NoError(err)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("SiftScanErrors", func(t *testing.T) {
|
||||
err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) {
|
||||
return false, ErrMockError
|
||||
})
|
||||
assert.Equal(ErrMockError, err)
|
||||
|
||||
err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) {
|
||||
return true, ErrMockError
|
||||
})
|
||||
assert.Equal(ErrMockError, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestScan(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
@@ -1713,6 +1860,146 @@ func TestScan(t *testing.T) {
|
||||
assert.Equal(ErrMockError, err)
|
||||
})
|
||||
}
|
||||
func TestSiftRange(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
|
||||
var db *Bitcask
|
||||
|
||||
t.Run("Setup", func(t *testing.T) {
|
||||
t.Run("Open", func(t *testing.T) {
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
for i := 1; i < 10; i++ {
|
||||
key := []byte(fmt.Sprintf("foo_%d", i))
|
||||
val := []byte(fmt.Sprintf("%d", i))
|
||||
err = db.Put(key, val)
|
||||
assert.NoError(err)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("SiftRange", func(t *testing.T) {
|
||||
var (
|
||||
vals [][]byte
|
||||
expected = [][]byte{
|
||||
[]byte("1"),
|
||||
[]byte("2"),
|
||||
[]byte("4"),
|
||||
[]byte("5"),
|
||||
[]byte("6"),
|
||||
[]byte("7"),
|
||||
[]byte("8"),
|
||||
[]byte("9"),
|
||||
}
|
||||
)
|
||||
|
||||
err = db.SiftRange([]byte("foo_3"), []byte("foo_7"), func(key []byte) (bool, error) {
|
||||
val, err := db.Get(key)
|
||||
assert.NoError(err)
|
||||
if string(val) == "3" {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
err = db.Fold(func(key []byte) error {
|
||||
val, err := db.Get(key)
|
||||
assert.NoError(err)
|
||||
vals = append(vals, val)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
_, err = db.Get([]byte("foo_3"))
|
||||
assert.Equal(ErrKeyNotFound, err)
|
||||
vals = SortByteArrays(vals)
|
||||
assert.Equal(expected, vals)
|
||||
})
|
||||
|
||||
t.Run("SiftRangeErrors", func(t *testing.T) {
|
||||
err = db.SiftRange([]byte("foo_3"), []byte("foo_7"), func(key []byte) (bool, error) {
|
||||
return true, ErrMockError
|
||||
})
|
||||
assert.Error(err)
|
||||
assert.Equal(ErrMockError, err)
|
||||
})
|
||||
|
||||
t.Run("InvalidRange", func(t *testing.T) {
|
||||
err = db.SiftRange([]byte("foo_3"), []byte("foo_1"), func(key []byte) (bool, error) {
|
||||
return false, nil
|
||||
})
|
||||
assert.Error(err)
|
||||
assert.Equal(ErrInvalidRange, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRange(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
|
||||
var db *Bitcask
|
||||
|
||||
t.Run("Setup", func(t *testing.T) {
|
||||
t.Run("Open", func(t *testing.T) {
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
for i := 1; i < 10; i++ {
|
||||
key := []byte(fmt.Sprintf("foo_%d", i))
|
||||
val := []byte(fmt.Sprintf("%d", i))
|
||||
err = db.Put(key, val)
|
||||
assert.NoError(err)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Range", func(t *testing.T) {
|
||||
var (
|
||||
vals [][]byte
|
||||
expected = [][]byte{
|
||||
[]byte("3"),
|
||||
[]byte("4"),
|
||||
[]byte("5"),
|
||||
[]byte("6"),
|
||||
[]byte("7"),
|
||||
}
|
||||
)
|
||||
|
||||
err = db.Range([]byte("foo_3"), []byte("foo_7"), func(key []byte) error {
|
||||
val, err := db.Get(key)
|
||||
assert.NoError(err)
|
||||
vals = append(vals, val)
|
||||
return nil
|
||||
})
|
||||
vals = SortByteArrays(vals)
|
||||
assert.Equal(expected, vals)
|
||||
})
|
||||
|
||||
t.Run("RangeErrors", func(t *testing.T) {
|
||||
err = db.Range([]byte("foo_3"), []byte("foo_7"), func(key []byte) error {
|
||||
return ErrMockError
|
||||
})
|
||||
assert.Error(err)
|
||||
assert.Equal(ErrMockError, err)
|
||||
})
|
||||
|
||||
t.Run("InvalidRange", func(t *testing.T) {
|
||||
err = db.Range([]byte("foo_3"), []byte("foo_1"), func(key []byte) error {
|
||||
return nil
|
||||
})
|
||||
assert.Error(err)
|
||||
assert.Equal(ErrInvalidRange, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLocking(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
@@ -1749,6 +2036,74 @@ func TestLockingAfterMerge(t *testing.T) {
|
||||
assert.Error(err)
|
||||
}
|
||||
|
||||
func TestGetExpiredInsideFold(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
|
||||
db, err := Open(testdir)
|
||||
assert.NoError(err)
|
||||
defer db.Close()
|
||||
// Add a node to the tree that won't expire
|
||||
db.Put([]byte("static"), []byte("static"))
|
||||
// Add a node that expires almost immediately to the tree
|
||||
db.PutWithTTL([]byte("shortLived"), []byte("shortLived"), 1*time.Millisecond)
|
||||
db.Put([]byte("skipped"), []byte("skipped"))
|
||||
db.Put([]byte("static2"), []byte("static2"))
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
var arr []string
|
||||
_ = db.Fold(func(key []byte) error {
|
||||
val, err := db.Get(key)
|
||||
switch string(key) {
|
||||
case "skipped":
|
||||
fallthrough
|
||||
case "static2":
|
||||
fallthrough
|
||||
case "static":
|
||||
assert.NoError(err)
|
||||
assert.Equal(string(val), string(key))
|
||||
case "shortLived":
|
||||
assert.Error(err)
|
||||
}
|
||||
arr = append(arr, string(val))
|
||||
return nil
|
||||
})
|
||||
assert.Contains(arr, "skipped")
|
||||
}
|
||||
|
||||
func TestRunGCDeletesAllExpired(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
|
||||
db, err := Open(testdir)
|
||||
assert.NoError(err)
|
||||
defer db.Close()
|
||||
|
||||
// Add a node to the tree that won't expire
|
||||
db.Put([]byte("static"), []byte("static"))
|
||||
|
||||
// Add a node that expires almost immediately to the tree
|
||||
db.PutWithTTL([]byte("shortLived"), []byte("shortLived"), 0)
|
||||
db.PutWithTTL([]byte("longLived"), []byte("longLived"), time.Hour)
|
||||
db.PutWithTTL([]byte("longLived2"), []byte("longLived2"), time.Hour)
|
||||
db.PutWithTTL([]byte("shortLived2"), []byte("shortLived2"), 0)
|
||||
db.PutWithTTL([]byte("shortLived3"), []byte("shortLived3"), 0)
|
||||
db.Put([]byte("static2"), []byte("static2"))
|
||||
|
||||
// Sleep a bit and run the Garbage Collector
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
db.RunGC()
|
||||
|
||||
_ = db.Fold(func(key []byte) error {
|
||||
_, err := db.Get(key)
|
||||
assert.NoError(err)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
type benchmarkTestCase struct {
|
||||
name string
|
||||
size int
|
||||
|
||||
61
cmd/bitcask/range.go
Normal file
61
cmd/bitcask/range.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"git.mills.io/prologic/bitcask"
|
||||
)
|
||||
|
||||
var rangeCmd = &cobra.Command{
|
||||
Use: "range <start> <end>",
|
||||
Aliases: []string{},
|
||||
Short: "Perform a range scan for keys from a start to end key",
|
||||
Long: `This performa a range scan for keys starting with the given start
|
||||
key and ending with the end key. This uses a Trie to search for matching keys
|
||||
within the range and returns all matched keys.`,
|
||||
Args: cobra.ExactArgs(2),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
path := viper.GetString("path")
|
||||
|
||||
start := args[0]
|
||||
end := args[1]
|
||||
|
||||
os.Exit(_range(path, start, end))
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(rangeCmd)
|
||||
}
|
||||
|
||||
func _range(path, start, end string) int {
|
||||
db, err := bitcask.Open(path)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error opening database")
|
||||
return 1
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
err = db.Range([]byte(start), []byte(end), func(key []byte) error {
|
||||
value, err := db.Get(key)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error reading key")
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Printf("%s\n", string(value))
|
||||
log.WithField("key", key).WithField("value", value).Debug("key/value")
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error ranging over keys")
|
||||
return 1
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
@@ -54,13 +54,6 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) {
|
||||
ttl = &d
|
||||
}
|
||||
|
||||
err := s.db.Lock()
|
||||
if err != nil {
|
||||
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
|
||||
return
|
||||
}
|
||||
defer s.db.Unlock()
|
||||
|
||||
if ttl != nil {
|
||||
if err := s.db.PutWithTTL(key, value, *ttl); err != nil {
|
||||
conn.WriteString(fmt.Sprintf("ERR: %s", err))
|
||||
@@ -82,13 +75,6 @@ func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {
|
||||
|
||||
key := cmd.Args[1]
|
||||
|
||||
err := s.db.Lock()
|
||||
if err != nil {
|
||||
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
|
||||
return
|
||||
}
|
||||
defer s.db.Unlock()
|
||||
|
||||
value, err := s.db.Get(key)
|
||||
if err != nil {
|
||||
conn.WriteNull()
|
||||
@@ -98,17 +84,41 @@ func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {
|
||||
}
|
||||
|
||||
func (s *server) handleKeys(cmd redcon.Command, conn redcon.Conn) {
|
||||
err := s.db.Lock()
|
||||
if err != nil {
|
||||
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
|
||||
if len(cmd.Args) != 2 {
|
||||
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
|
||||
return
|
||||
}
|
||||
defer s.db.Unlock()
|
||||
|
||||
conn.WriteArray(s.db.Len())
|
||||
for key := range s.db.Keys() {
|
||||
conn.WriteBulk(key)
|
||||
pattern := string(cmd.Args[1])
|
||||
|
||||
// Fast-track condition for improved speed
|
||||
if pattern == "*" {
|
||||
conn.WriteArray(s.db.Len())
|
||||
for key := range s.db.Keys() {
|
||||
conn.WriteBulk(key)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Prefix handling
|
||||
if strings.Count(pattern, "*") == 1 && strings.HasSuffix(pattern, "*") {
|
||||
prefix := strings.ReplaceAll(pattern, "*", "")
|
||||
count := 0
|
||||
keys := make([][]byte, 0)
|
||||
s.db.Scan([]byte(prefix), func(key []byte) error {
|
||||
keys = append(keys, key)
|
||||
count++
|
||||
return nil
|
||||
})
|
||||
conn.WriteArray(count)
|
||||
for _, key := range keys {
|
||||
conn.WriteBulk(key)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// No results means empty array
|
||||
conn.WriteArray(0)
|
||||
}
|
||||
|
||||
func (s *server) handleExists(cmd redcon.Command, conn redcon.Conn) {
|
||||
@@ -119,13 +129,6 @@ func (s *server) handleExists(cmd redcon.Command, conn redcon.Conn) {
|
||||
|
||||
key := cmd.Args[1]
|
||||
|
||||
err := s.db.Lock()
|
||||
if err != nil {
|
||||
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
|
||||
return
|
||||
}
|
||||
defer s.db.Unlock()
|
||||
|
||||
if s.db.Has(key) {
|
||||
conn.WriteInt(1)
|
||||
} else {
|
||||
@@ -141,13 +144,6 @@ func (s *server) handleDel(cmd redcon.Command, conn redcon.Conn) {
|
||||
|
||||
key := cmd.Args[1]
|
||||
|
||||
err := s.db.Lock()
|
||||
if err != nil {
|
||||
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
|
||||
return
|
||||
}
|
||||
defer s.db.Unlock()
|
||||
|
||||
if err := s.db.Delete(key); err != nil {
|
||||
conn.WriteInt(0)
|
||||
} else {
|
||||
|
||||
102
cmd/bitcaskd/server_test.go
Normal file
102
cmd/bitcaskd/server_test.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/tidwall/redcon"
|
||||
)
|
||||
|
||||
func TestHandleKeys(t *testing.T) {
|
||||
s, err := newServer(":61234", "./test.db")
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to create server: %v", err)
|
||||
}
|
||||
s.db.Put([]byte("foo"), []byte("bar"))
|
||||
testCases := []TestCase{
|
||||
{
|
||||
Command: redcon.Command{
|
||||
Raw: []byte("KEYS *"),
|
||||
Args: [][]byte{[]byte("KEYS"), []byte("*")},
|
||||
},
|
||||
Expected: "1,foo",
|
||||
},
|
||||
{
|
||||
Command: redcon.Command{
|
||||
Raw: []byte("KEYS fo*"),
|
||||
Args: [][]byte{[]byte("KEYS"), []byte("fo*")},
|
||||
},
|
||||
Expected: "1,foo",
|
||||
},
|
||||
{
|
||||
Command: redcon.Command{
|
||||
Raw: []byte("KEYS ba*"),
|
||||
Args: [][]byte{[]byte("KEYS"), []byte("ba*")},
|
||||
},
|
||||
Expected: "0",
|
||||
},
|
||||
{
|
||||
Command: redcon.Command{
|
||||
Raw: []byte("KEYS *oo"),
|
||||
Args: [][]byte{[]byte("KEYS"), []byte("*oo")},
|
||||
},
|
||||
Expected: "0",
|
||||
},
|
||||
}
|
||||
for _, testCase := range testCases {
|
||||
conn := DummyConn{}
|
||||
s.handleKeys(testCase.Command, &conn)
|
||||
if testCase.Expected != conn.Result {
|
||||
t.Fatalf("s.handleKeys failed: expected '%s', got '%s'", testCase.Expected, conn.Result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type TestCase struct {
|
||||
Command redcon.Command
|
||||
Expected string
|
||||
}
|
||||
|
||||
type DummyConn struct {
|
||||
Result string
|
||||
}
|
||||
|
||||
func (dc *DummyConn) RemoteAddr() string {
|
||||
return ""
|
||||
}
|
||||
func (dc *DummyConn) Close() error {
|
||||
return nil
|
||||
}
|
||||
func (dc *DummyConn) WriteError(msg string) {}
|
||||
func (dc *DummyConn) WriteString(str string) {}
|
||||
func (dc *DummyConn) WriteBulk(bulk []byte) {
|
||||
dc.Result += "," + string(bulk)
|
||||
}
|
||||
func (dc *DummyConn) WriteBulkString(bulk string) {}
|
||||
func (dc *DummyConn) WriteInt(num int) {}
|
||||
func (dc *DummyConn) WriteInt64(num int64) {}
|
||||
func (dc *DummyConn) WriteUint64(num uint64) {}
|
||||
func (dc *DummyConn) WriteArray(count int) {
|
||||
dc.Result = strconv.Itoa(count)
|
||||
}
|
||||
func (dc *DummyConn) WriteNull() {}
|
||||
func (dc *DummyConn) WriteRaw(data []byte) {}
|
||||
func (dc *DummyConn) WriteAny(any interface{}) {}
|
||||
func (dc *DummyConn) Context() interface{} {
|
||||
return nil
|
||||
}
|
||||
func (dc *DummyConn) SetContext(v interface{}) {}
|
||||
func (dc *DummyConn) SetReadBuffer(bytes int) {}
|
||||
func (dc *DummyConn) Detach() redcon.DetachedConn {
|
||||
return nil
|
||||
}
|
||||
func (dc *DummyConn) ReadPipeline() []redcon.Command {
|
||||
return nil
|
||||
}
|
||||
func (dc *DummyConn) PeekPipeline() []redcon.Command {
|
||||
return nil
|
||||
}
|
||||
func (dc *DummyConn) NetConn() net.Conn {
|
||||
return nil
|
||||
}
|
||||
77
errors.go
Normal file
77
errors.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package bitcask
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrKeyNotFound is the error returned when a key is not found
|
||||
ErrKeyNotFound = errors.New("error: key not found")
|
||||
|
||||
// ErrKeyTooLarge is the error returned for a key that exceeds the
|
||||
// maximum allowed key size (configured with WithMaxKeySize).
|
||||
ErrKeyTooLarge = errors.New("error: key too large")
|
||||
|
||||
// ErrKeyExpired is the error returned when a key is queried which has
|
||||
// already expired (due to ttl)
|
||||
ErrKeyExpired = errors.New("error: key expired")
|
||||
|
||||
// ErrEmptyKey is the error returned for a value with an empty key.
|
||||
ErrEmptyKey = errors.New("error: empty key")
|
||||
|
||||
// ErrValueTooLarge is the error returned for a value that exceeds the
|
||||
// maximum allowed value size (configured with WithMaxValueSize).
|
||||
ErrValueTooLarge = errors.New("error: value too large")
|
||||
|
||||
// ErrChecksumFailed is the error returned if a key/value retrieved does
|
||||
// not match its CRC checksum
|
||||
ErrChecksumFailed = errors.New("error: checksum failed")
|
||||
|
||||
// ErrDatabaseLocked is the error returned if the database is locked
|
||||
// (typically opened by another process)
|
||||
ErrDatabaseLocked = errors.New("error: database locked")
|
||||
|
||||
// ErrInvalidRange is the error returned when the range scan is invalid
|
||||
ErrInvalidRange = errors.New("error: invalid range")
|
||||
|
||||
// ErrInvalidVersion is the error returned when the database version is invalid
|
||||
ErrInvalidVersion = errors.New("error: invalid db version")
|
||||
|
||||
// ErrMergeInProgress is the error returned if merge is called when already a merge
|
||||
// is in progress
|
||||
ErrMergeInProgress = errors.New("error: merge already in progress")
|
||||
)
|
||||
|
||||
// ErrBadConfig is the error returned on failure to load the database config
|
||||
type ErrBadConfig struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e *ErrBadConfig) Is(target error) bool {
|
||||
if _, ok := target.(*ErrBadConfig); ok {
|
||||
return true
|
||||
}
|
||||
return errors.Is(e.Err, target)
|
||||
}
|
||||
func (e *ErrBadConfig) Unwrap() error { return e.Err }
|
||||
func (e *ErrBadConfig) Error() string {
|
||||
return fmt.Sprintf("error reading config.json: %s", e.Err)
|
||||
}
|
||||
|
||||
// ErrBadMetadata is the error returned on failure to load the database metadata
|
||||
type ErrBadMetadata struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e *ErrBadMetadata) Is(target error) bool {
|
||||
if _, ok := target.(*ErrBadMetadata); ok {
|
||||
return true
|
||||
}
|
||||
return errors.Is(e.Err, target)
|
||||
}
|
||||
|
||||
func (e *ErrBadMetadata) Unwrap() error { return e.Err }
|
||||
func (e *ErrBadMetadata) Error() string {
|
||||
return fmt.Sprintf("error reading meta.json: %s", e.Err)
|
||||
}
|
||||
@@ -1,97 +0,0 @@
|
||||
package flock
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Flock struct {
|
||||
path string
|
||||
m sync.Mutex
|
||||
fh *os.File
|
||||
}
|
||||
|
||||
var (
|
||||
ErrAlreadyLocked = errors.New("Double lock: already own the lock")
|
||||
ErrLockFailed = errors.New("Could not acquire lock")
|
||||
ErrLockNotHeld = errors.New("Could not unlock, lock is not held")
|
||||
|
||||
ErrInodeChangedAtPath = errors.New("Inode changed at path")
|
||||
)
|
||||
|
||||
// New returns a new instance of *Flock. The only parameter
|
||||
// it takes is the path to the desired lockfile.
|
||||
func New(path string) *Flock {
|
||||
return &Flock{path: path}
|
||||
}
|
||||
|
||||
// Path returns the file path linked to this lock.
|
||||
func (f *Flock) Path() string {
|
||||
return f.path
|
||||
}
|
||||
|
||||
// Lock will acquire the lock. This function may block indefinitely if some other process holds the lock. For a non-blocking version, see Flock.TryLock().
|
||||
func (f *Flock) Lock() error {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
if f.fh != nil {
|
||||
return ErrAlreadyLocked
|
||||
}
|
||||
|
||||
var fh *os.File
|
||||
|
||||
fh, err := lock_sys(f.path, false)
|
||||
// treat "ErrInodeChangedAtPath" as "some other process holds the lock, retry locking"
|
||||
for err == ErrInodeChangedAtPath {
|
||||
fh, err = lock_sys(f.path, false)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fh == nil {
|
||||
return ErrLockFailed
|
||||
}
|
||||
|
||||
f.fh = fh
|
||||
return nil
|
||||
}
|
||||
|
||||
// TryLock will try to acquire the lock, and returns immediately if the lock is already owned by another process.
|
||||
func (f *Flock) TryLock() (bool, error) {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
if f.fh != nil {
|
||||
return false, ErrAlreadyLocked
|
||||
}
|
||||
|
||||
fh, err := lock_sys(f.path, true)
|
||||
if err != nil {
|
||||
return false, ErrLockFailed
|
||||
}
|
||||
|
||||
f.fh = fh
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Unlock removes the lock file from disk and releases the lock.
|
||||
// Whatever the result of `.Unlock()`, the caller must assume that it does not hold the lock anymore.
|
||||
func (f *Flock) Unlock() error {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
if f.fh == nil {
|
||||
return ErrLockNotHeld
|
||||
}
|
||||
|
||||
err1 := rm_if_match(f.fh, f.path)
|
||||
err2 := f.fh.Close()
|
||||
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
return err2
|
||||
}
|
||||
@@ -1,121 +0,0 @@
|
||||
package flock
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// WARNING : this test will delete the file located at "testLockPath". Choose an adequate temporary file name.
|
||||
const testLockPath = "/tmp/bitcask_unit_test_lock" // file path to use for the lock
|
||||
|
||||
func TestTryLock(t *testing.T) {
|
||||
// test that basic locking functionalities are consistent
|
||||
|
||||
// make sure there is no present lock when starting this test
|
||||
os.Remove(testLockPath)
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
lock1 := New(testLockPath)
|
||||
lock2 := New(testLockPath)
|
||||
|
||||
// 1- take the first lock
|
||||
locked1, err := lock1.TryLock()
|
||||
assert.True(locked1)
|
||||
assert.NoError(err)
|
||||
|
||||
// 2- check that the second lock cannot acquire the lock
|
||||
locked2, err := lock2.TryLock()
|
||||
assert.False(locked2)
|
||||
assert.Error(err)
|
||||
|
||||
// 3- release the first lock
|
||||
err = lock1.Unlock()
|
||||
assert.NoError(err)
|
||||
|
||||
// 4- check that the second lock can acquire and then release the lock without error
|
||||
locked2, err = lock2.TryLock()
|
||||
assert.True(locked2)
|
||||
assert.NoError(err)
|
||||
|
||||
err = lock2.Unlock()
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
func TestLock(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
// make sure there is no present lock when starting this test
|
||||
os.Remove(testLockPath)
|
||||
|
||||
syncChan := make(chan bool)
|
||||
|
||||
// main goroutine: take lock on testPath
|
||||
lock := New(testLockPath)
|
||||
|
||||
err := lock.Lock()
|
||||
assert.NoError(err)
|
||||
|
||||
go func() {
|
||||
// sub routine:
|
||||
lock := New(testLockPath)
|
||||
|
||||
// before entering the block '.Lock()' call, signal we are about to do it
|
||||
// see below : the main goroutine will wait for a small delay before releasing the lock
|
||||
syncChan <- true
|
||||
// '.Lock()' should ultimately return without error :
|
||||
err := lock.Lock()
|
||||
assert.NoError(err)
|
||||
|
||||
err = lock.Unlock()
|
||||
assert.NoError(err)
|
||||
|
||||
close(syncChan)
|
||||
}()
|
||||
|
||||
// wait for the "ready" signal from the sub routine,
|
||||
<-syncChan
|
||||
|
||||
// after that signal wait for a small delay before releasing the lock
|
||||
<-time.After(100 * time.Microsecond)
|
||||
err = lock.Unlock()
|
||||
assert.NoError(err)
|
||||
|
||||
// wait for the sub routine to finish
|
||||
<-syncChan
|
||||
}
|
||||
|
||||
func TestErrorConditions(t *testing.T) {
|
||||
// error conditions implemented in this version :
|
||||
// - you can't release a lock you do not hold
|
||||
// - you can't lock twice the same lock
|
||||
|
||||
// -- setup
|
||||
assert := assert.New(t)
|
||||
|
||||
// make sure there is no present lock when starting this test
|
||||
os.Remove(testLockPath)
|
||||
|
||||
lock := New(testLockPath)
|
||||
|
||||
// -- run tests :
|
||||
|
||||
err := lock.Unlock()
|
||||
assert.Error(err, "you can't release a lock you do not hold")
|
||||
|
||||
// take the lock once:
|
||||
lock.TryLock()
|
||||
|
||||
locked, err := lock.TryLock()
|
||||
assert.False(locked)
|
||||
assert.Error(err, "you can't lock twice the same lock (using .TryLock())")
|
||||
|
||||
err = lock.Lock()
|
||||
assert.Error(err, "you can't lock twice the same lock (using .Lock())")
|
||||
|
||||
// -- teardown
|
||||
lock.Unlock()
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
// +build !aix,!windows
|
||||
|
||||
package flock
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func lock_sys(path string, nonBlocking bool) (_ *os.File, err error) {
|
||||
var fh *os.File
|
||||
|
||||
fh, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
fh.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
flag := unix.LOCK_EX
|
||||
if nonBlocking {
|
||||
flag |= unix.LOCK_NB
|
||||
}
|
||||
|
||||
err = unix.Flock(int(fh.Fd()), flag)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !sameInodes(fh, path) {
|
||||
return nil, ErrInodeChangedAtPath
|
||||
}
|
||||
|
||||
return fh, nil
|
||||
}
|
||||
|
||||
func rm_if_match(fh *os.File, path string) error {
|
||||
// Sanity check :
|
||||
// before running "rm", check that the file pointed at by the
|
||||
// filehandle has the same inode as the path in the filesystem
|
||||
//
|
||||
// If this sanity check doesn't pass, store a "ErrInodeChangedAtPath" error,
|
||||
// if the check passes, run os.Remove, and store the error if any.
|
||||
//
|
||||
// note : this sanity check is in no way atomic, but :
|
||||
// - as long as only cooperative processes are involved, it will work as intended
|
||||
// - it allows to avoid 99.9% the major pitfall case: "root user forcefully removed the lockfile"
|
||||
|
||||
if !sameInodes(fh, path) {
|
||||
return ErrInodeChangedAtPath
|
||||
}
|
||||
|
||||
return os.Remove(path)
|
||||
}
|
||||
|
||||
func sameInodes(f *os.File, path string) bool {
|
||||
// get inode from opened file f:
|
||||
var fstat unix.Stat_t
|
||||
err := unix.Fstat(int(f.Fd()), &fstat)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
fileIno := fstat.Ino
|
||||
|
||||
// get inode for path on disk:
|
||||
var dstat unix.Stat_t
|
||||
err = unix.Stat(path, &dstat)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
pathIno := dstat.Ino
|
||||
|
||||
return pathIno == fileIno
|
||||
}
|
||||
@@ -1,236 +0,0 @@
|
||||
package flock
|
||||
|
||||
// the "nd" in "nd_test.go" stands for non-deterministic
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// The two tests in this file are test some concurrency scenarios :
|
||||
// 1- TestRaceLock() runs several threads racing for the same lock
|
||||
// 2- TestShatteredLock() runs racing racing threads, along with another threads which forcibly remove the file from disk
|
||||
//
|
||||
// Note that these tests are non-deterministic : the coverage produced by each test depends
|
||||
// on how the runtime chooses to schedule the concurrent goroutines.
|
||||
|
||||
var lockerCount int64
|
||||
|
||||
// lockAndCount tries to take a lock on "lockpath"
|
||||
// if it fails : it returns 0 and stop there
|
||||
// if it succeeds :
|
||||
// 1- it sets a defer function to release the lock in the same fashion as "func (b *Bitcask) Close()"
|
||||
// 2- it increments the shared "lockerCount" above
|
||||
// 3- it waits for a short amount of time
|
||||
// 4- it decrements "lockerCount"
|
||||
// 5- it returns the value it has seen at step 2.
|
||||
//
|
||||
// If the locking and unlocking behave as we expect them to,
|
||||
// instructions 1-5 should be in a critical section,
|
||||
// and the only possible value at step 2 should be "1".
|
||||
//
|
||||
// Returning a value > 0 indicates this function successfully acquired the lock,
|
||||
// returning a value == 0 indicates that TryLock failed.
|
||||
|
||||
func lockAndCount(lockpath string) int64 {
|
||||
lock := New(lockpath)
|
||||
ok, _ := lock.TryLock()
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
defer func() {
|
||||
lock.Unlock()
|
||||
}()
|
||||
|
||||
x := atomic.AddInt64(&lockerCount, 1)
|
||||
// emulate a workload :
|
||||
<-time.After(1 * time.Microsecond)
|
||||
atomic.AddInt64(&lockerCount, -1)
|
||||
|
||||
return x
|
||||
}
|
||||
|
||||
// locker will call the lock function above in a loop, until one of the following holds :
|
||||
// - reading from the "timeout" channel doesn't block
|
||||
// - the number of calls to "lock()" that indicate the lock was successfully taken reaches "successfullLockCount"
|
||||
func locker(t *testing.T, id int, lockPath string, successfulLockCount int, timeout <-chan struct{}) {
|
||||
timedOut := false
|
||||
|
||||
failCount := 0
|
||||
max := int64(0)
|
||||
|
||||
lockloop:
|
||||
for successfulLockCount > 0 {
|
||||
select {
|
||||
case <-timeout:
|
||||
timedOut = true
|
||||
break lockloop
|
||||
default:
|
||||
}
|
||||
|
||||
x := lockAndCount(lockPath)
|
||||
|
||||
if x > 0 {
|
||||
// if x indicates the lock was taken : decrement the counter
|
||||
successfulLockCount--
|
||||
}
|
||||
|
||||
if x > 1 {
|
||||
// if x indicates an invalid value : increase the failCount and update max accordingly
|
||||
failCount++
|
||||
if x > max {
|
||||
max = x
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check failure cases :
|
||||
if timedOut {
|
||||
t.Fail()
|
||||
t.Logf("[runner %02d] timed out", id)
|
||||
}
|
||||
if failCount > 0 {
|
||||
t.Fail()
|
||||
t.Logf("[runner %02d] lockCounter was > 1 on %2.d occasions, max seen value was %2.d", id, failCount, max)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRaceLock checks that no error occurs when several concurrent actors (goroutines in this case) race for the same lock.
|
||||
func TestRaceLock(t *testing.T) {
|
||||
// test parameters, written in code :
|
||||
// you may want to tweak these values for testing
|
||||
|
||||
goroutines := 20 // number of concurrent "locker" goroutines to launch
|
||||
successfulLockCount := 50 // how many times a "locker" will successfully take the lock before halting
|
||||
|
||||
// make sure there is no present lock when startng this test
|
||||
os.Remove(testLockPath)
|
||||
|
||||
// timeout implemented in code
|
||||
// (the lock acquisition depends on the behavior of the filesystem,
|
||||
// avoid sending CI in endless loop if something fishy happens on the test server ...)
|
||||
// tweak this value if needed ; comment out the "close(ch)" instruction below
|
||||
timeout := 10 * time.Second
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
<-time.After(timeout)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(goroutines)
|
||||
|
||||
for i := 0; i < goroutines; i++ {
|
||||
go func(id int) {
|
||||
locker(t, id, testLockPath, successfulLockCount, ch)
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func isExpectedError(err error) bool {
|
||||
switch {
|
||||
case err == nil:
|
||||
return true
|
||||
case err == ErrInodeChangedAtPath:
|
||||
return true
|
||||
case errors.Is(err, syscall.ENOENT):
|
||||
return true
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// TestShatteredLock runs concurrent goroutines on one lock, with an extra goroutine
|
||||
// which removes the lock file from disk without checking the locks
|
||||
// (e.g: a user who would run 'rm lockfile' in a loop while the program is running).
|
||||
//
|
||||
// In this scenario, errors may occur on .Unlock() ; this test checks that only errors
|
||||
// relating to the file being deleted occur.
|
||||
//
|
||||
// This test additionally logs the number of errors that occurred, grouped by error message.
|
||||
func TestShatteredLock(t *testing.T) {
|
||||
// test parameters, written in code :
|
||||
// you may want to tweak these values for testing
|
||||
|
||||
goroutines := 4 // number of concurrent "locker" and "remover" goroutines to launch
|
||||
successfulLockCount := 10 // how many times a "locker" will successfully take the lock before halting
|
||||
|
||||
// make sure there is no present lock when startng this test
|
||||
os.Remove(testLockPath)
|
||||
assert := assert.New(t)
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(goroutines)
|
||||
|
||||
stopChan := make(chan struct{})
|
||||
|
||||
errChan := make(chan error, 10)
|
||||
|
||||
for i := 0; i < goroutines; i++ {
|
||||
go func(id int, count int) {
|
||||
for count > 0 {
|
||||
lock := New(testLockPath)
|
||||
ok, _ := lock.TryLock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
count--
|
||||
err := lock.Unlock()
|
||||
if !isExpectedError(err) {
|
||||
assert.Fail("goroutine %d - unexpected error: %v", id, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
}(i, successfulLockCount)
|
||||
}
|
||||
|
||||
var wgCompanion = &sync.WaitGroup{}
|
||||
wgCompanion.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wgCompanion.Done()
|
||||
for {
|
||||
os.Remove(testLockPath)
|
||||
|
||||
select {
|
||||
case <-stopChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var errs = make(map[string]int)
|
||||
go func() {
|
||||
for err := range errChan {
|
||||
errs[err.Error()]++
|
||||
}
|
||||
wgCompanion.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
close(stopChan)
|
||||
close(errChan)
|
||||
wgCompanion.Wait()
|
||||
|
||||
for err, count := range errs {
|
||||
t.Logf(" seen %d times: %s", count, err)
|
||||
}
|
||||
}
|
||||
5
go.mod
5
go.mod
@@ -1,8 +1,10 @@
|
||||
module git.mills.io/prologic/bitcask
|
||||
|
||||
go 1.13
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81
|
||||
github.com/gofrs/flock v0.8.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/plar/go-adaptive-radix-tree v1.0.4
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
@@ -13,5 +15,4 @@ require (
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/tidwall/redcon v1.4.1
|
||||
golang.org/x/exp v0.0.0-20200228211341-fcea875c7e85
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007
|
||||
)
|
||||
|
||||
6
go.sum
6
go.sum
@@ -40,6 +40,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81 h1:uHogIJ9bXH75ZYrXnVShHIyywFiUZ7OOabwd9Sfd8rw=
|
||||
github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81/go.mod h1:6ZvnjTZX1LNo1oLpfaJK8h+MXqHxcBFBIwkgsv+xlv0=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
@@ -93,6 +95,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
|
||||
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY=
|
||||
github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
|
||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
@@ -160,6 +164,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
|
||||
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
|
||||
@@ -266,6 +271,7 @@ github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
|
||||
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
|
||||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"git.mills.io/prologic/bitcask/internal"
|
||||
"git.mills.io/prologic/bitcask/internal/data/codec"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/exp/mmap"
|
||||
)
|
||||
|
||||
@@ -74,9 +74,11 @@ func NewDatafile(path string, id int, readonly bool, maxKeySize uint32, maxValue
|
||||
return nil, errors.Wrap(err, "error calling Stat()")
|
||||
}
|
||||
|
||||
ra, err = mmap.Open(fn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if readonly {
|
||||
ra, err = mmap.Open(fn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
offset := stat.Size()
|
||||
@@ -107,7 +109,9 @@ func (df *datafile) Name() string {
|
||||
|
||||
func (df *datafile) Close() error {
|
||||
defer func() {
|
||||
df.ra.Close()
|
||||
if df.ra != nil {
|
||||
df.ra.Close()
|
||||
}
|
||||
df.r.Close()
|
||||
}()
|
||||
|
||||
@@ -155,7 +159,10 @@ func (df *datafile) ReadAt(index, size int64) (e internal.Entry, err error) {
|
||||
|
||||
b := make([]byte, size)
|
||||
|
||||
if df.w == nil {
|
||||
df.RLock()
|
||||
defer df.RUnlock()
|
||||
|
||||
if df.ra != nil {
|
||||
n, err = df.ra.ReadAt(b, index)
|
||||
} else {
|
||||
n, err = df.r.ReadAt(b, index)
|
||||
|
||||
@@ -27,7 +27,7 @@ func CheckAndRecover(path string, cfg *config.Config) error {
|
||||
f := dfs[len(dfs)-1]
|
||||
recovered, err := recoverDatafile(f, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("recovering data file")
|
||||
return fmt.Errorf("error recovering data file: %s", err)
|
||||
}
|
||||
if recovered {
|
||||
if err := os.Remove(filepath.Join(path, "index")); err != nil {
|
||||
@@ -48,8 +48,8 @@ func recoverDatafile(path string, cfg *config.Config) (recovered bool, err error
|
||||
err = closeErr
|
||||
}
|
||||
}()
|
||||
_, file := filepath.Split(path)
|
||||
rPath := fmt.Sprintf("%s.recovered", file)
|
||||
dir, file := filepath.Split(path)
|
||||
rPath := filepath.Join(dir, fmt.Sprintf("%s.recovered", file))
|
||||
fr, err := os.OpenFile(rPath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("creating the recovered datafile: %w", err)
|
||||
|
||||
@@ -2,17 +2,49 @@ package internal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultVersion = "0.0.0"
|
||||
defaultCommit = "HEAD"
|
||||
defaultBuild = "0000-01-01:00:00+00:00"
|
||||
)
|
||||
|
||||
var (
|
||||
// Version release version
|
||||
Version = "0.0.1"
|
||||
// Version is the tagged release version in the form <major>.<minor>.<patch>
|
||||
// following semantic versioning and is overwritten by the build system.
|
||||
Version = defaultVersion
|
||||
|
||||
// Commit will be overwritten automatically by the build system
|
||||
Commit = "HEAD"
|
||||
// Commit is the commit sha of the build (normally from Git) and is overwritten
|
||||
// by the build system.
|
||||
Commit = defaultCommit
|
||||
|
||||
// Build is the date and time of the build as an RFC3339 formatted string
|
||||
// and is overwritten by the build system.
|
||||
Build = defaultBuild
|
||||
)
|
||||
|
||||
// FullVersion returns the full version and commit hash
|
||||
// FullVersion display the full version and build
|
||||
func FullVersion() string {
|
||||
return fmt.Sprintf("%s@%s", Version, Commit)
|
||||
var sb strings.Builder
|
||||
|
||||
isDefault := Version == defaultVersion && Commit == defaultCommit && Build == defaultBuild
|
||||
|
||||
if !isDefault {
|
||||
sb.WriteString(fmt.Sprintf("%s@%s %s", Version, Commit, Build))
|
||||
}
|
||||
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
if isDefault {
|
||||
sb.WriteString(fmt.Sprintf(" %s", info.Main.Version))
|
||||
}
|
||||
sb.WriteString(fmt.Sprintf(" %s", info.GoVersion))
|
||||
if info.Main.Sum != "" {
|
||||
sb.WriteString(fmt.Sprintf(" %s", info.Main.Sum))
|
||||
}
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ echo "Releasing ${TAG} ..."
|
||||
git-chglog --next-tag="${TAG}" --output CHANGELOG.md
|
||||
git commit -a -m "Update CHANGELOG for ${TAG}"
|
||||
git tag -a -s -m "Release ${TAG}" "${TAG}"
|
||||
git push --tags
|
||||
git push && git push --tags
|
||||
goreleaser release \
|
||||
--rm-dist \
|
||||
--release-notes <(git-chglog "${TAG}" | tail -n+5)
|
||||
|
||||
Reference in New Issue
Block a user