Compare commits

...

29 Commits

Author SHA1 Message Date
James Mills
4ec5b07eea Fix install target 2023-08-12 20:19:26 +10:00
James Mills
5ed43e5a20 Fix install target and versioning 2023-08-12 20:19:00 +10:00
edd32cad0a update sift to match docstring (#245)
Closes #244.

Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/245
Reviewed-by: James Mills <james@mills.io>
Co-authored-by: Tai Groot <tai@taigrr.com>
Co-committed-by: Tai Groot <tai@taigrr.com>
2022-02-04 22:37:43 +00:00
d23c355e72 Update CHANGELOG for v1.0.2 2021-11-01 17:54:39 +10:00
40425394d7 Fix a data race in Datafile.ReadAt() 2021-11-01 17:54:31 +10:00
f4cc0fb434 Fix release tool 2021-10-31 07:08:53 +10:00
7d4174d5b1 Update CHANGELOG for v1.0.1 2021-10-31 07:08:01 +10:00
James Mills
5429693cc8 Add ErrBadConfig and ErrBadMetadata as errors that consumers can check and use (#241)
cc @taigrr

This PR will _hopefully_ help to fix some critical isseus in the real world with several or more [Yarn.social](https://yarn.social) pods running [yarnd](https://git.mills.io/yarnsocial/yarn) where starting back up after a power failure or crash can sometimes result in an empty `config.json` or empty `meta.json` or both!

I'm not actually sure how this can arise, and as yet I haven't been able to reproduce it (_I can only assume this has to be failures cases outside of our control_); but in any case the application and database is recoverable by simply `rm config.json` and/or `rm meta.json`.

So this PR makes errors loading the config and metadata first-class errors and exported error types that consumers of the library can use to perform automated recovery without requiring human intervention.

Basiclaly in this case it's no big deal we lost the database config of metadata, we can simply carry on.

Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/241
Co-authored-by: James Mills <james@mills.io>
Co-committed-by: James Mills <james@mills.io>
2021-10-30 21:07:42 +00:00
jason3gb
2c57c950f8 [Fix] disable mmap for current datafile from #239 (#240)
Fix issues related to #239

Disable mmap reader for current datafile, which only read from the fd.

Co-authored-by: jason3gb <jason3gb@gmail.com>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/240
Reviewed-by: James Mills <james@mills.io>
Reviewed-by: Tai Groot <tai@taigrr.com>
Co-authored-by: jason3gb <jason3gb@noreply@mills.io>
Co-committed-by: jason3gb <jason3gb@noreply@mills.io>
2021-09-25 04:26:26 +00:00
biozz
21a824e13e Add key prefix matching to KEYS command (#237)
Related to #234 and !236.

This is the implementation that was requested in the original issue. I updated KEYS command to be redis-valid and implemented prefix search. There is also a rather interesting test, I could you use some feedback here.

I noticed that it might not be possible to reduce the complexity of the KEYS command. Because even if you use Scan, you will have to store the counter of all found keys before you do WriteBulk of the actual keys.

@prologic here is what you probably had in mind:

```
s.db.Scan([]byte(prefix), func(key []byte) error {
	conn.WriteBulk(key)
	return nil
})
```

But there is no way to call `conn.WriteArray(n)` with the number of keys until you iterate through all of them, hence the second loop over found keys.

Co-authored-by: Ivan Elfimov <ielfimov@gmail.com>
Co-authored-by: James Mills <james@mills.io>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/237
Reviewed-by: James Mills <james@mills.io>
Co-authored-by: biozz <biozz@noreply@mills.io>
Co-committed-by: biozz <biozz@noreply@mills.io>
2021-09-20 10:35:27 +00:00
2279245b8c Update image target 2021-09-17 07:49:07 +10:00
91d4db63d5 Update CHANGELOG for v1.0.0 2021-07-24 17:07:25 +10:00
849192f709 Update CHANGELOG for 1.0.0 2021-07-24 13:37:57 +10:00
a4fc2cf4e8 Update README 2021-07-22 19:44:29 +10:00
609de833eb Update CHANGELOG for v0.3.14 2021-07-21 12:38:20 +10:00
James Mills
9b0daa8a30 Add RangeScan() support (#160)
Co-authored-by: James Mills <1290234+prologic@users.noreply.github.com>
Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Co-authored-by: Tai Groot <tai@taigrr.com>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/160
Co-authored-by: James Mills <james@mills.io>
Co-committed-by: James Mills <james@mills.io>
2021-07-21 02:36:06 +00:00
ef187f8315 [ADD] Sift and ScanSift (+ tests) (#232)
Added Sift and ScanSift functions for review without tests (for now)

fix docstrings

Added tests for Sift and ScanSift

Note this also fixes a bug in the Scan() function where the RMutex is not locked, allowing a potential race condition

closes #231

Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/232
Co-authored-by: Tai Groot <tai@taigrr.com>
Co-committed-by: Tai Groot <tai@taigrr.com>
2021-07-21 00:19:25 +00:00
James Mills
b094cd33d3 Fix runGC behaviour to correctly delete all expired keys (#229)
Fixes #228

Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/229
Co-authored-by: James Mills <james@mills.io>
Co-committed-by: James Mills <james@mills.io>
2021-07-20 20:42:22 +00:00
3ff8937205 Fix missing push event 2021-07-20 15:57:31 +10:00
2ccca759ce Fix how CI is triggered 2021-07-20 15:56:32 +10:00
92535e654b [FIX] race condition from #216 (#227)
[ADDED] new tests for TTL expiration race condition,  see #216

[REMOVED] removes cleanup / automatic expiration from get() function to resolve #216

Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/227
Co-authored-by: Tai Groot <tai@taigrr.com>
Co-committed-by: Tai Groot <tai@taigrr.com>
2021-07-18 23:41:40 +00:00
c4a7ad7a7f Fix README Go Reference badge 2021-07-16 07:49:45 +10:00
e64646fa8f Fix README badges 2021-07-16 07:46:39 +10:00
2de030ad5c Update CHANGELOG for v0.3.13 2021-07-16 07:37:55 +10:00
James Mills
5e4d863ab7 Use package github.com/gofrs/flock as flock implementation. (#224)
Supercesd #219 after rebasing on master after migrating off Github.

Co-authored-by: Nicolò Santamaria <nicolo.santamaria@protonmail.com>
Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Co-authored-by: Tai Groot <taigrr@noreply@mills.io>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/224
Co-authored-by: James Mills <prologic@noreply@mills.io>
Co-committed-by: James Mills <prologic@noreply@mills.io>
2021-07-15 21:33:20 +00:00
James Mills
a49bbf666a Fix paths used for temporary recovery iles to avoid crossing devices (#223)
Fixes #222

Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/223
Co-authored-by: James Mills <prologic@noreply@mills.io>
Co-committed-by: James Mills <prologic@noreply@mills.io>
2021-07-15 13:16:58 +00:00
52df2fad55 Improve error reporting on recovery errors 2021-07-14 22:59:51 +10:00
947d15fed8 Debug failing test in CI 2021-07-14 22:37:30 +10:00
d276c398da Add Drone CI config 2021-07-14 22:30:01 +10:00
22 changed files with 1028 additions and 681 deletions

28
.drone.yml Normal file
View File

@@ -0,0 +1,28 @@
---
kind: pipeline
name: default
steps:
- name: build & run tests
image: r.mills.io/prologic/golang-alpine
commands:
- make build
- make test
- name: notify
image: plugins/webhook
settings:
urls:
- https://msgbus.mills.io/ci.mills.io
when:
status:
- success
- failure
trigger:
branch:
- master
event:
- tag
- push
- pull_request

1
.gitignore vendored
View File

@@ -5,6 +5,7 @@
/tmp
/dist
/cacheDb
/coverage.txt
/bitcask

View File

@@ -17,3 +17,5 @@ Yash Chandra <yashschandra@gmail.com>
Yury Fedorov orlangure
o2gy84 <o2gy84@gmail.com>
garsue <labs.garsue@gmail.com>
biozz <ielfimov@gmail.com>
jason3gb <jason3gb@gmail.com>

View File

@@ -1,6 +1,86 @@
<a name="v1.0.2"></a>
## [v1.0.2](https://git.mills.io/prologic/bitcask/compare/v1.0.1...v1.0.2) (2021-11-01)
### Bug Fixes
* Fix a data race in Datafile.ReadAt()
* Fix release tool
<a name="v1.0.1"></a>
## [v1.0.1](https://git.mills.io/prologic/bitcask/compare/v1.0.0...v1.0.1) (2021-10-31)
### Features
* Add ErrBadConfig and ErrBadMetadata as errors that consumers can check and use (#241)
* Add key prefix matching to KEYS command (#237)
### Updates
* Update CHANGELOG for v1.0.1
* Update image target
<a name="v1.0.0"></a>
## [v1.0.0](https://git.mills.io/prologic/bitcask/compare/1.0.0...v1.0.0) (2021-07-24)
### Updates
* Update CHANGELOG for v1.0.0
<a name="1.0.0"></a>
## [1.0.0](https://git.mills.io/prologic/bitcask/compare/v0.3.14...1.0.0) (2021-07-24)
### Updates
* Update CHANGELOG for 1.0.0
* Update README
<a name="v0.3.14"></a>
## [v0.3.14](https://git.mills.io/prologic/bitcask/compare/v0.3.13...v0.3.14) (2021-07-21)
### Bug Fixes
* Fix runGC behaviour to correctly delete all expired keys (#229)
* Fix missing push event
* Fix how CI is triggered
* Fix README Go Reference badge
* Fix README badges
### Features
* Add RangeScan() support (#160)
### Updates
* Update CHANGELOG for v0.3.14
<a name="v0.3.13"></a>
## [v0.3.13](https://git.mills.io/prologic/bitcask/compare/v0.3.12...v0.3.13) (2021-07-16)
### Bug Fixes
* Fix paths used for temporary recovery iles to avoid crossing devices (#223)
### Features
* Add Drone CI config
### Updates
* Update CHANGELOG for v0.3.13
<a name="v0.3.12"></a>
## [v0.3.12](https://git.mills.io/prologic/bitcask/compare/v0.3.11...v0.3.12) (0001-01-01)
## [v0.3.12](https://git.mills.io/prologic/bitcask/compare/v0.3.11...v0.3.12) (2021-07-13)
### Updates
* Update CHANGELOG for v0.3.12
<a name="v0.3.11"></a>

View File

@@ -3,6 +3,10 @@
CGO_ENABLED=0
VERSION=$(shell git describe --abbrev=0 --tags)
COMMIT=$(shell git rev-parse --short HEAD)
BUILD=$(shell git show -s --pretty=format:%cI)
GOCMD=go
DESTDIR=/usr/local/bin
all: dev
@@ -11,46 +15,52 @@ dev: build
@./bitcaskd --version
build: clean generate
@go build \
@$(GOCMD) build \
-tags "netgo static_build" -installsuffix netgo \
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT)" \
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT) -X $(shell go list)/internal.Build=$(BUILD)" \
./cmd/bitcask/...
@go build \
@$(GOCMD) build \
-tags "netgo static_build" -installsuffix netgo \
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT)" \
-ldflags "-w -X $(shell go list)/internal.Version=$(VERSION) -X $(shell go list)/internal.Commit=$(COMMIT) -X $(shell go list)/internal.Build=$(BUILD)" \
./cmd/bitcaskd/...
generate:
@go generate $(shell go list)/...
@$(GOCMD) generate $(shell go list)/...
install: build
@go install ./cmd/bitcask/...
@go install ./cmd/bitcaskd/...
@install -D -m 755 bitcask $(DESTDIR)/bitcask
@install -D -m 755 bitcaskd $(DESTDIR)/bitcaskd
ifeq ($(PUBLISH), 1)
image:
@docker build -t prologic/bitcask .
@docker build --build-arg VERSION="$(VERSION)" --build-arg COMMIT="$(COMMIT)" -t prologic/bitcask .
@docker push prologic/bitcask
else
image:
@docker build --build-arg VERSION="$(VERSION)" --build-arg COMMIT="$(COMMIT)" -t prologic/bitcask .
endif
release:
@./tools/release.sh
profile: build
@go test -cpuprofile cpu.prof -memprofile mem.prof -v -bench .
@$(GOCMD) test -cpuprofile cpu.prof -memprofile mem.prof -v -bench .
bench: build
@go test -v -run=XXX -benchmem -bench=. .
@$(GOCMD) test -v -run=XXX -benchmem -bench=. .
mocks:
@mockery -all -case underscore -output ./internal/mocks -recursive
test: build
@go test -v \
@$(GOCMD) test -v \
-cover -coverprofile=coverage.txt -covermode=atomic \
-coverpkg=$(shell go list) \
-race \
.
setup:
@go get github.com/vektra/mockery/...
@$(GOCMD) get github.com/vektra/mockery/...
clean:
@git clean -f -d -X

View File

@@ -1,17 +1,8 @@
# bitcask
![](https://git.mills.io/prologic/bitcask/workflows/Coverage/badge.svg)
![](https://git.mills.io/prologic/bitcask/workflows/Docker/badge.svg)
![](https://git.mills.io/prologic/bitcask/workflows/Go/badge.svg)
![](https://git.mills.io/prologic/bitcask/workflows/ReviewDog/badge.svg)
[![CodeCov](https://codecov.io/gh/prologic/bitcask/branch/master/graph/badge.svg)](https://codecov.io/gh/prologic/bitcask)
[![Go Report Card](https://goreportcard.com/badge/prologic/bitcask)](https://goreportcard.com/report/prologic/bitcask)
[![codebeat badge](https://codebeat.co/badges/15fba8a5-3044-4f40-936f-9e0f5d5d1fd9)](https://codebeat.co/projects/github-com-prologic-bitcask-master)
[![GoDoc](https://godoc.org/git.mills.io/prologic/bitcask?status.svg)](https://godoc.org/git.mills.io/prologic/bitcask)
[![GitHub license](https://img.shields.io/github/license/prologic/bitcask.svg)](https://git.mills.io/prologic/bitcask)
[![Sourcegraph](https://sourcegraph.com/git.mills.io/prologic/bitcask/-/badge.svg)](https://sourcegraph.com/git.mills.io/prologic/bitcask?badge)
[![TODOs](https://img.shields.io/endpoint?url=https://api.tickgit.com/badge?repo=git.mills.io/prologic/bitcask)](https://www.tickgit.com/browse?repo=git.mills.io/prologic/bitcask)
[![Build Status](https://ci.mills.io/api/badges/prologic/bitcask/status.svg)](https://ci.mills.io/prologic/bitcask)
[![Go Report Card](https://goreportcard.com/badge/git.mills.io/prologic/bitcask)](https://goreportcard.com/report/git.mills.io/prologic/bitcask)
[![Go Reference](https://pkg.go.dev/badge/git.mills.io/prologic/bitcask.svg)](https://pkg.go.dev/git.mills.io/prologic/bitcask)
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/)
@@ -229,24 +220,14 @@ Support the ongoing development of Bitcask!
- Become a [Sponsor](https://www.patreon.com/prologic)
## Stargazers over time
[![Stargazers over time](https://starcharts.herokuapp.com/prologic/bitcask.svg)](https://starcharts.herokuapp.com/prologic/bitcask)
## Contributors
Thank you to all those that have contributed to this project, battle-tested it, used it in their own projects or products, fixed bugs, improved performance and even fix tiny typos in documentation! Thank you and keep contributing!
Thank you to all those that have contributed to this project, battle-tested it,
used it in their own projects or products, fixed bugs, improved performance
and even fix tiny typos in documentation! Thank you and keep contributing!
You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors to the project. If you contribute a PR please consider adding your name there. There is also GitHub's own [Contributors](https://git.mills.io/prologic/bitcask/graphs/contributors) statistics.
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/0)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/0)
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/1)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/1)
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/2)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/2)
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/3)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/3)
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/4)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/4)
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/5)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/5)
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/6)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/6)
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/7)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/7)
You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors
to the project. If you contribute a PR please consider adding your name there.
## Related Projects

View File

@@ -1,7 +1,7 @@
package bitcask
import (
"errors"
"bytes"
"fmt"
"hash/crc32"
"io"
@@ -13,8 +13,11 @@ import (
"sync"
"time"
"github.com/abcum/lcp"
"github.com/gofrs/flock"
art "github.com/plar/go-adaptive-radix-tree"
"git.mills.io/prologic/bitcask/flock"
log "github.com/sirupsen/logrus"
"git.mills.io/prologic/bitcask/internal"
"git.mills.io/prologic/bitcask/internal/config"
"git.mills.io/prologic/bitcask/internal/data"
@@ -22,7 +25,6 @@ import (
"git.mills.io/prologic/bitcask/internal/index"
"git.mills.io/prologic/bitcask/internal/metadata"
"git.mills.io/prologic/bitcask/scripts/migrations"
log "github.com/sirupsen/logrus"
)
const (
@@ -30,48 +32,12 @@ const (
ttlIndexFile = "ttl_index"
)
var (
// ErrKeyNotFound is the error returned when a key is not found
ErrKeyNotFound = errors.New("error: key not found")
// ErrKeyTooLarge is the error returned for a key that exceeds the
// maximum allowed key size (configured with WithMaxKeySize).
ErrKeyTooLarge = errors.New("error: key too large")
// ErrKeyExpired is the error returned when a key is queried which has
// already expired (due to ttl)
ErrKeyExpired = errors.New("error: key expired")
// ErrEmptyKey is the error returned for a value with an empty key.
ErrEmptyKey = errors.New("error: empty key")
// ErrValueTooLarge is the error returned for a value that exceeds the
// maximum allowed value size (configured with WithMaxValueSize).
ErrValueTooLarge = errors.New("error: value too large")
// ErrChecksumFailed is the error returned if a key/value retrieved does
// not match its CRC checksum
ErrChecksumFailed = errors.New("error: checksum failed")
// ErrDatabaseLocked is the error returned if the database is locked
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
ErrInvalidVersion = errors.New("error: invalid db version")
// ErrMergeInProgress is the error returned if merge is called when already a merge
// is in progress
ErrMergeInProgress = errors.New("error: merge already in progress")
)
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
// and in-memory hash of key/value pairs as per the Bitcask paper and seen
// in the Riak database.
type Bitcask struct {
mu sync.RWMutex
*flock.Flock
mu sync.RWMutex
flock *flock.Flock
config *config.Config
options []Option
path string
@@ -114,7 +80,7 @@ func (b *Bitcask) Close() error {
b.mu.RLock()
defer func() {
b.mu.RUnlock()
b.Flock.Unlock()
b.flock.Unlock()
}()
return b.close()
@@ -275,6 +241,41 @@ func (b *Bitcask) delete(key []byte) error {
return nil
}
// Sift iterates over all keys in the database calling the function `f` for
// each key. If the KV pair is expired or the function returns true, that key is
// deleted from the database.
// If the function returns an error on any key, no further keys are processed, no
// keys are deleted, and the first error is returned.
func (b *Bitcask) Sift(f func(key []byte) (bool, error)) (err error) {
keysToDelete := art.New()
b.mu.RLock()
b.trie.ForEach(func(node art.Node) bool {
if b.isExpired(node.Key()) {
keysToDelete.Insert(node.Key(), true)
return true
}
var shouldDelete bool
if shouldDelete, err = f(node.Key()); err != nil {
return false
} else if shouldDelete {
keysToDelete.Insert(node.Key(), true)
}
return true
})
b.mu.RUnlock()
if err != nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
}
// DeleteAll deletes all the keys. If an I/O error occurs the error is returned.
func (b *Bitcask) DeleteAll() (err error) {
b.mu.RLock()
@@ -297,8 +298,11 @@ func (b *Bitcask) DeleteAll() (err error) {
// Scan performs a prefix scan of keys matching the given prefix and calling
// the function `f` with the keys found. If the function returns an error
// no further keys are processed and the first error returned.
// no further keys are processed and the first error is returned.
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
// Skip the root node
if len(node.Key()) == 0 {
@@ -313,6 +317,132 @@ func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
return
}
// SiftScan iterates over all keys in the database beginning with the given
// prefix, calling the function `f` for each key. If the KV pair is expired or
// the function returns true, that key is deleted from the database.
// If the function returns an error on any key, no further keys are processed,
// no keys are deleted, and the first error is returned.
func (b *Bitcask) SiftScan(prefix []byte, f func(key []byte) (bool, error)) (err error) {
keysToDelete := art.New()
b.mu.RLock()
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
// Skip the root node
if len(node.Key()) == 0 {
return true
}
if b.isExpired(node.Key()) {
keysToDelete.Insert(node.Key(), true)
return true
}
var shouldDelete bool
if shouldDelete, err = f(node.Key()); err != nil {
return false
} else if shouldDelete {
keysToDelete.Insert(node.Key(), true)
}
return true
})
b.mu.RUnlock()
if err != nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
}
// Range performs a range scan of keys matching a range of keys between the
// start key and end key and calling the function `f` with the keys found.
// If the function returns an error no further keys are processed and the
// first error returned.
func (b *Bitcask) Range(start, end []byte, f func(key []byte) error) (err error) {
if bytes.Compare(start, end) == 1 {
return ErrInvalidRange
}
commonPrefix := lcp.LCP(start, end)
if commonPrefix == nil {
return ErrInvalidRange
}
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool {
if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 {
if err = f(node.Key()); err != nil {
return false
}
return true
} else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 {
return false
}
return true
})
return
}
// SiftRange performs a range scan of keys matching a range of keys between the
// start key and end key and calling the function `f` with the keys found.
// If the KV pair is expired or the function returns true, that key is deleted
// from the database.
// If the function returns an error on any key, no further keys are processed, no
// keys are deleted, and the first error is returned.
func (b *Bitcask) SiftRange(start, end []byte, f func(key []byte) (bool, error)) (err error) {
if bytes.Compare(start, end) == 1 {
return ErrInvalidRange
}
commonPrefix := lcp.LCP(start, end)
if commonPrefix == nil {
return ErrInvalidRange
}
keysToDelete := art.New()
b.mu.RLock()
b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool {
if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 {
if b.isExpired(node.Key()) {
keysToDelete.Insert(node.Key(), true)
return true
}
var shouldDelete bool
if shouldDelete, err = f(node.Key()); err != nil {
return false
} else if shouldDelete {
keysToDelete.Insert(node.Key(), true)
}
return true
} else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 {
return false
}
return true
})
b.mu.RUnlock()
if err != nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
}
// Len returns the total number of keys in the database
func (b *Bitcask) Len() int {
b.mu.RLock()
@@ -350,21 +480,29 @@ func (b *Bitcask) RunGC() error {
// runGC deletes all keys that are expired
// caller function should take care of the locking when calling this method
func (b *Bitcask) runGC() (err error) {
keysToDelete := art.New()
b.ttlIndex.ForEach(func(node art.Node) (cont bool) {
if !b.isExpired(node.Key()) {
// later, return false here when the ttlIndex is sorted
return true
}
if err = b.delete(node.Key()); err != nil {
return false
}
keysToDelete.Insert(node.Key(), true)
//keysToDelete = append(keysToDelete, node.Key())
return true
})
return
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return nil
}
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
// and the error is returned.
func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
b.mu.RLock()
defer b.mu.RUnlock()
@@ -387,8 +525,7 @@ func (b *Bitcask) get(key []byte) (internal.Entry, error) {
if !found {
return internal.Entry{}, ErrKeyNotFound
}
if expired := b.isExpired(key); expired {
_ = b.delete(key) // we don't care if it doesnt succeed
if b.isExpired(key) {
return internal.Entry{}, ErrKeyExpired
}
@@ -669,6 +806,10 @@ func (b *Bitcask) Merge() error {
return err
}
for _, file := range files {
// see #225
if file.Name() == lockfile {
continue
}
err := os.Rename(
path.Join([]string{mdb.path, file.Name()}...),
path.Join([]string{b.path, file.Name()}...),
@@ -697,7 +838,7 @@ func Open(path string, options ...Option) (*Bitcask, error) {
if internal.Exists(configPath) {
cfg, err = config.Load(configPath)
if err != nil {
return nil, err
return nil, &ErrBadConfig{err}
}
} else {
cfg = newDefaultConfig()
@@ -719,11 +860,11 @@ func Open(path string, options ...Option) (*Bitcask, error) {
meta, err = loadMetadata(path)
if err != nil {
return nil, err
return nil, &ErrBadMetadata{err}
}
bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, lockfile)),
flock: flock.New(filepath.Join(path, lockfile)),
config: cfg,
options: options,
path: path,
@@ -732,12 +873,12 @@ func Open(path string, options ...Option) (*Bitcask, error) {
metadata: meta,
}
locked, err := bitcask.Flock.TryLock()
ok, err := bitcask.flock.TryLock()
if err != nil {
return nil, err
}
if !locked {
if !ok {
return nil, ErrDatabaseLocked
}

View File

@@ -168,6 +168,65 @@ func TestAll(t *testing.T) {
assert.NoError(err)
})
t.Run("Sift", func(t *testing.T) {
err = db.Put([]byte("toBeSifted"), []byte("siftMe"))
assert.NoError(err)
err = db.Put([]byte("notToBeSifted"), []byte("dontSiftMe"))
assert.NoError(err)
err := db.Sift(func(key []byte) (bool, error) {
value, err := db.Get(key)
if err != nil {
return false, err
}
if string(value) == "siftMe" {
return true, nil
}
return false, nil
})
assert.NoError(err)
_, err = db.Get([]byte("toBeSifted"))
assert.Equal(ErrKeyNotFound, err)
_, err = db.Get([]byte("notToBeSifted"))
assert.NoError(err)
})
t.Run("SiftScan", func(t *testing.T) {
err := db.DeleteAll()
assert.NoError(err)
err = db.Put([]byte("toBeSifted"), []byte("siftMe"))
assert.NoError(err)
err = db.Put([]byte("toBeSkipped"), []byte("siftMe"))
assert.NoError(err)
err = db.Put([]byte("toBeSiftedAsWell"), []byte("siftMe"))
assert.NoError(err)
err = db.Put([]byte("toBeSiftedButNotReally"), []byte("dontSiftMe"))
assert.NoError(err)
err = db.SiftScan([]byte("toBeSifted"), func(key []byte) (bool, error) {
value, err := db.Get(key)
if err != nil {
return false, err
}
if string(value) == "siftMe" {
return true, nil
}
return false, nil
})
assert.NoError(err)
_, err = db.Get([]byte("toBeSifted"))
assert.Equal(ErrKeyNotFound, err)
_, err = db.Get([]byte("toBeSiftedAsWell"))
assert.Equal(ErrKeyNotFound, err)
_, err = db.Get([]byte("toBeSkipped"))
assert.NoError(err)
_, err = db.Get([]byte("toBeSiftedButNotReally"))
assert.NoError(err)
})
t.Run("DeleteAll", func(t *testing.T) {
err = db.DeleteAll()
assert.NoError(err)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
@@ -479,6 +538,7 @@ func TestAutoRecovery(t *testing.T) {
require.NoError(err)
db, err = Open(testdir, WithAutoRecovery(autoRecovery))
t.Logf("err: %s", err)
require.NoError(err)
defer db.Close()
// Check that all values but the last are still intact.
@@ -1654,6 +1714,93 @@ func TestConcurrent(t *testing.T) {
})
}
func TestSift(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte([]byte("foo")),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err = db.Put([]byte(k), v)
assert.NoError(err)
}
})
})
t.Run("SiftErrors", func(t *testing.T) {
err = db.Sift(func(key []byte) (bool, error) {
return false, ErrMockError
})
assert.Equal(ErrMockError, err)
err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) {
return true, ErrMockError
})
assert.Equal(ErrMockError, err)
})
}
func TestSiftScan(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte([]byte("foo")),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err = db.Put([]byte(k), v)
assert.NoError(err)
}
})
})
t.Run("SiftScanErrors", func(t *testing.T) {
err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) {
return false, ErrMockError
})
assert.Equal(ErrMockError, err)
err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) {
return true, ErrMockError
})
assert.Equal(ErrMockError, err)
})
}
func TestScan(t *testing.T) {
assert := assert.New(t)
@@ -1713,6 +1860,146 @@ func TestScan(t *testing.T) {
assert.Equal(ErrMockError, err)
})
}
func TestSiftRange(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
for i := 1; i < 10; i++ {
key := []byte(fmt.Sprintf("foo_%d", i))
val := []byte(fmt.Sprintf("%d", i))
err = db.Put(key, val)
assert.NoError(err)
}
})
})
t.Run("SiftRange", func(t *testing.T) {
var (
vals [][]byte
expected = [][]byte{
[]byte("1"),
[]byte("2"),
[]byte("4"),
[]byte("5"),
[]byte("6"),
[]byte("7"),
[]byte("8"),
[]byte("9"),
}
)
err = db.SiftRange([]byte("foo_3"), []byte("foo_7"), func(key []byte) (bool, error) {
val, err := db.Get(key)
assert.NoError(err)
if string(val) == "3" {
return true, nil
}
return false, nil
})
err = db.Fold(func(key []byte) error {
val, err := db.Get(key)
assert.NoError(err)
vals = append(vals, val)
return nil
})
_, err = db.Get([]byte("foo_3"))
assert.Equal(ErrKeyNotFound, err)
vals = SortByteArrays(vals)
assert.Equal(expected, vals)
})
t.Run("SiftRangeErrors", func(t *testing.T) {
err = db.SiftRange([]byte("foo_3"), []byte("foo_7"), func(key []byte) (bool, error) {
return true, ErrMockError
})
assert.Error(err)
assert.Equal(ErrMockError, err)
})
t.Run("InvalidRange", func(t *testing.T) {
err = db.SiftRange([]byte("foo_3"), []byte("foo_1"), func(key []byte) (bool, error) {
return false, nil
})
assert.Error(err)
assert.Equal(ErrInvalidRange, err)
})
}
func TestRange(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
for i := 1; i < 10; i++ {
key := []byte(fmt.Sprintf("foo_%d", i))
val := []byte(fmt.Sprintf("%d", i))
err = db.Put(key, val)
assert.NoError(err)
}
})
})
t.Run("Range", func(t *testing.T) {
var (
vals [][]byte
expected = [][]byte{
[]byte("3"),
[]byte("4"),
[]byte("5"),
[]byte("6"),
[]byte("7"),
}
)
err = db.Range([]byte("foo_3"), []byte("foo_7"), func(key []byte) error {
val, err := db.Get(key)
assert.NoError(err)
vals = append(vals, val)
return nil
})
vals = SortByteArrays(vals)
assert.Equal(expected, vals)
})
t.Run("RangeErrors", func(t *testing.T) {
err = db.Range([]byte("foo_3"), []byte("foo_7"), func(key []byte) error {
return ErrMockError
})
assert.Error(err)
assert.Equal(ErrMockError, err)
})
t.Run("InvalidRange", func(t *testing.T) {
err = db.Range([]byte("foo_3"), []byte("foo_1"), func(key []byte) error {
return nil
})
assert.Error(err)
assert.Equal(ErrInvalidRange, err)
})
}
func TestLocking(t *testing.T) {
assert := assert.New(t)
@@ -1749,6 +2036,74 @@ func TestLockingAfterMerge(t *testing.T) {
assert.Error(err)
}
func TestGetExpiredInsideFold(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
db, err := Open(testdir)
assert.NoError(err)
defer db.Close()
// Add a node to the tree that won't expire
db.Put([]byte("static"), []byte("static"))
// Add a node that expires almost immediately to the tree
db.PutWithTTL([]byte("shortLived"), []byte("shortLived"), 1*time.Millisecond)
db.Put([]byte("skipped"), []byte("skipped"))
db.Put([]byte("static2"), []byte("static2"))
time.Sleep(2 * time.Millisecond)
var arr []string
_ = db.Fold(func(key []byte) error {
val, err := db.Get(key)
switch string(key) {
case "skipped":
fallthrough
case "static2":
fallthrough
case "static":
assert.NoError(err)
assert.Equal(string(val), string(key))
case "shortLived":
assert.Error(err)
}
arr = append(arr, string(val))
return nil
})
assert.Contains(arr, "skipped")
}
func TestRunGCDeletesAllExpired(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
db, err := Open(testdir)
assert.NoError(err)
defer db.Close()
// Add a node to the tree that won't expire
db.Put([]byte("static"), []byte("static"))
// Add a node that expires almost immediately to the tree
db.PutWithTTL([]byte("shortLived"), []byte("shortLived"), 0)
db.PutWithTTL([]byte("longLived"), []byte("longLived"), time.Hour)
db.PutWithTTL([]byte("longLived2"), []byte("longLived2"), time.Hour)
db.PutWithTTL([]byte("shortLived2"), []byte("shortLived2"), 0)
db.PutWithTTL([]byte("shortLived3"), []byte("shortLived3"), 0)
db.Put([]byte("static2"), []byte("static2"))
// Sleep a bit and run the Garbage Collector
time.Sleep(3 * time.Millisecond)
db.RunGC()
_ = db.Fold(func(key []byte) error {
_, err := db.Get(key)
assert.NoError(err)
return nil
})
}
type benchmarkTestCase struct {
name string
size int

61
cmd/bitcask/range.go Normal file
View File

@@ -0,0 +1,61 @@
package main
import (
"fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"git.mills.io/prologic/bitcask"
)
var rangeCmd = &cobra.Command{
Use: "range <start> <end>",
Aliases: []string{},
Short: "Perform a range scan for keys from a start to end key",
Long: `This performa a range scan for keys starting with the given start
key and ending with the end key. This uses a Trie to search for matching keys
within the range and returns all matched keys.`,
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
start := args[0]
end := args[1]
os.Exit(_range(path, start, end))
},
}
func init() {
RootCmd.AddCommand(rangeCmd)
}
func _range(path, start, end string) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
err = db.Range([]byte(start), []byte(end), func(key []byte) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).Error("error reading key")
return err
}
fmt.Printf("%s\n", string(value))
log.WithField("key", key).WithField("value", value).Debug("key/value")
return nil
})
if err != nil {
log.WithError(err).Error("error ranging over keys")
return 1
}
return 0
}

View File

@@ -54,13 +54,6 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) {
ttl = &d
}
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
if ttl != nil {
if err := s.db.PutWithTTL(key, value, *ttl); err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
@@ -82,13 +75,6 @@ func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {
key := cmd.Args[1]
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
value, err := s.db.Get(key)
if err != nil {
conn.WriteNull()
@@ -98,17 +84,41 @@ func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {
}
func (s *server) handleKeys(cmd redcon.Command, conn redcon.Conn) {
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
defer s.db.Unlock()
conn.WriteArray(s.db.Len())
for key := range s.db.Keys() {
conn.WriteBulk(key)
pattern := string(cmd.Args[1])
// Fast-track condition for improved speed
if pattern == "*" {
conn.WriteArray(s.db.Len())
for key := range s.db.Keys() {
conn.WriteBulk(key)
}
return
}
// Prefix handling
if strings.Count(pattern, "*") == 1 && strings.HasSuffix(pattern, "*") {
prefix := strings.ReplaceAll(pattern, "*", "")
count := 0
keys := make([][]byte, 0)
s.db.Scan([]byte(prefix), func(key []byte) error {
keys = append(keys, key)
count++
return nil
})
conn.WriteArray(count)
for _, key := range keys {
conn.WriteBulk(key)
}
return
}
// No results means empty array
conn.WriteArray(0)
}
func (s *server) handleExists(cmd redcon.Command, conn redcon.Conn) {
@@ -119,13 +129,6 @@ func (s *server) handleExists(cmd redcon.Command, conn redcon.Conn) {
key := cmd.Args[1]
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
if s.db.Has(key) {
conn.WriteInt(1)
} else {
@@ -141,13 +144,6 @@ func (s *server) handleDel(cmd redcon.Command, conn redcon.Conn) {
key := cmd.Args[1]
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
if err := s.db.Delete(key); err != nil {
conn.WriteInt(0)
} else {

102
cmd/bitcaskd/server_test.go Normal file
View File

@@ -0,0 +1,102 @@
package main
import (
"net"
"strconv"
"testing"
"github.com/tidwall/redcon"
)
func TestHandleKeys(t *testing.T) {
s, err := newServer(":61234", "./test.db")
if err != nil {
t.Fatalf("Unable to create server: %v", err)
}
s.db.Put([]byte("foo"), []byte("bar"))
testCases := []TestCase{
{
Command: redcon.Command{
Raw: []byte("KEYS *"),
Args: [][]byte{[]byte("KEYS"), []byte("*")},
},
Expected: "1,foo",
},
{
Command: redcon.Command{
Raw: []byte("KEYS fo*"),
Args: [][]byte{[]byte("KEYS"), []byte("fo*")},
},
Expected: "1,foo",
},
{
Command: redcon.Command{
Raw: []byte("KEYS ba*"),
Args: [][]byte{[]byte("KEYS"), []byte("ba*")},
},
Expected: "0",
},
{
Command: redcon.Command{
Raw: []byte("KEYS *oo"),
Args: [][]byte{[]byte("KEYS"), []byte("*oo")},
},
Expected: "0",
},
}
for _, testCase := range testCases {
conn := DummyConn{}
s.handleKeys(testCase.Command, &conn)
if testCase.Expected != conn.Result {
t.Fatalf("s.handleKeys failed: expected '%s', got '%s'", testCase.Expected, conn.Result)
}
}
}
type TestCase struct {
Command redcon.Command
Expected string
}
type DummyConn struct {
Result string
}
func (dc *DummyConn) RemoteAddr() string {
return ""
}
func (dc *DummyConn) Close() error {
return nil
}
func (dc *DummyConn) WriteError(msg string) {}
func (dc *DummyConn) WriteString(str string) {}
func (dc *DummyConn) WriteBulk(bulk []byte) {
dc.Result += "," + string(bulk)
}
func (dc *DummyConn) WriteBulkString(bulk string) {}
func (dc *DummyConn) WriteInt(num int) {}
func (dc *DummyConn) WriteInt64(num int64) {}
func (dc *DummyConn) WriteUint64(num uint64) {}
func (dc *DummyConn) WriteArray(count int) {
dc.Result = strconv.Itoa(count)
}
func (dc *DummyConn) WriteNull() {}
func (dc *DummyConn) WriteRaw(data []byte) {}
func (dc *DummyConn) WriteAny(any interface{}) {}
func (dc *DummyConn) Context() interface{} {
return nil
}
func (dc *DummyConn) SetContext(v interface{}) {}
func (dc *DummyConn) SetReadBuffer(bytes int) {}
func (dc *DummyConn) Detach() redcon.DetachedConn {
return nil
}
func (dc *DummyConn) ReadPipeline() []redcon.Command {
return nil
}
func (dc *DummyConn) PeekPipeline() []redcon.Command {
return nil
}
func (dc *DummyConn) NetConn() net.Conn {
return nil
}

77
errors.go Normal file
View File

@@ -0,0 +1,77 @@
package bitcask
import (
"errors"
"fmt"
)
var (
// ErrKeyNotFound is the error returned when a key is not found
ErrKeyNotFound = errors.New("error: key not found")
// ErrKeyTooLarge is the error returned for a key that exceeds the
// maximum allowed key size (configured with WithMaxKeySize).
ErrKeyTooLarge = errors.New("error: key too large")
// ErrKeyExpired is the error returned when a key is queried which has
// already expired (due to ttl)
ErrKeyExpired = errors.New("error: key expired")
// ErrEmptyKey is the error returned for a value with an empty key.
ErrEmptyKey = errors.New("error: empty key")
// ErrValueTooLarge is the error returned for a value that exceeds the
// maximum allowed value size (configured with WithMaxValueSize).
ErrValueTooLarge = errors.New("error: value too large")
// ErrChecksumFailed is the error returned if a key/value retrieved does
// not match its CRC checksum
ErrChecksumFailed = errors.New("error: checksum failed")
// ErrDatabaseLocked is the error returned if the database is locked
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
// ErrInvalidRange is the error returned when the range scan is invalid
ErrInvalidRange = errors.New("error: invalid range")
// ErrInvalidVersion is the error returned when the database version is invalid
ErrInvalidVersion = errors.New("error: invalid db version")
// ErrMergeInProgress is the error returned if merge is called when already a merge
// is in progress
ErrMergeInProgress = errors.New("error: merge already in progress")
)
// ErrBadConfig is the error returned on failure to load the database config
type ErrBadConfig struct {
Err error
}
func (e *ErrBadConfig) Is(target error) bool {
if _, ok := target.(*ErrBadConfig); ok {
return true
}
return errors.Is(e.Err, target)
}
func (e *ErrBadConfig) Unwrap() error { return e.Err }
func (e *ErrBadConfig) Error() string {
return fmt.Sprintf("error reading config.json: %s", e.Err)
}
// ErrBadMetadata is the error returned on failure to load the database metadata
type ErrBadMetadata struct {
Err error
}
func (e *ErrBadMetadata) Is(target error) bool {
if _, ok := target.(*ErrBadMetadata); ok {
return true
}
return errors.Is(e.Err, target)
}
func (e *ErrBadMetadata) Unwrap() error { return e.Err }
func (e *ErrBadMetadata) Error() string {
return fmt.Sprintf("error reading meta.json: %s", e.Err)
}

View File

@@ -1,97 +0,0 @@
package flock
import (
"errors"
"os"
"sync"
)
type Flock struct {
path string
m sync.Mutex
fh *os.File
}
var (
ErrAlreadyLocked = errors.New("Double lock: already own the lock")
ErrLockFailed = errors.New("Could not acquire lock")
ErrLockNotHeld = errors.New("Could not unlock, lock is not held")
ErrInodeChangedAtPath = errors.New("Inode changed at path")
)
// New returns a new instance of *Flock. The only parameter
// it takes is the path to the desired lockfile.
func New(path string) *Flock {
return &Flock{path: path}
}
// Path returns the file path linked to this lock.
func (f *Flock) Path() string {
return f.path
}
// Lock will acquire the lock. This function may block indefinitely if some other process holds the lock. For a non-blocking version, see Flock.TryLock().
func (f *Flock) Lock() error {
f.m.Lock()
defer f.m.Unlock()
if f.fh != nil {
return ErrAlreadyLocked
}
var fh *os.File
fh, err := lock_sys(f.path, false)
// treat "ErrInodeChangedAtPath" as "some other process holds the lock, retry locking"
for err == ErrInodeChangedAtPath {
fh, err = lock_sys(f.path, false)
}
if err != nil {
return err
}
if fh == nil {
return ErrLockFailed
}
f.fh = fh
return nil
}
// TryLock will try to acquire the lock, and returns immediately if the lock is already owned by another process.
func (f *Flock) TryLock() (bool, error) {
f.m.Lock()
defer f.m.Unlock()
if f.fh != nil {
return false, ErrAlreadyLocked
}
fh, err := lock_sys(f.path, true)
if err != nil {
return false, ErrLockFailed
}
f.fh = fh
return true, nil
}
// Unlock removes the lock file from disk and releases the lock.
// Whatever the result of `.Unlock()`, the caller must assume that it does not hold the lock anymore.
func (f *Flock) Unlock() error {
f.m.Lock()
defer f.m.Unlock()
if f.fh == nil {
return ErrLockNotHeld
}
err1 := rm_if_match(f.fh, f.path)
err2 := f.fh.Close()
if err1 != nil {
return err1
}
return err2
}

View File

@@ -1,121 +0,0 @@
package flock
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// WARNING : this test will delete the file located at "testLockPath". Choose an adequate temporary file name.
const testLockPath = "/tmp/bitcask_unit_test_lock" // file path to use for the lock
func TestTryLock(t *testing.T) {
// test that basic locking functionalities are consistent
// make sure there is no present lock when starting this test
os.Remove(testLockPath)
assert := assert.New(t)
lock1 := New(testLockPath)
lock2 := New(testLockPath)
// 1- take the first lock
locked1, err := lock1.TryLock()
assert.True(locked1)
assert.NoError(err)
// 2- check that the second lock cannot acquire the lock
locked2, err := lock2.TryLock()
assert.False(locked2)
assert.Error(err)
// 3- release the first lock
err = lock1.Unlock()
assert.NoError(err)
// 4- check that the second lock can acquire and then release the lock without error
locked2, err = lock2.TryLock()
assert.True(locked2)
assert.NoError(err)
err = lock2.Unlock()
assert.NoError(err)
}
func TestLock(t *testing.T) {
assert := assert.New(t)
// make sure there is no present lock when starting this test
os.Remove(testLockPath)
syncChan := make(chan bool)
// main goroutine: take lock on testPath
lock := New(testLockPath)
err := lock.Lock()
assert.NoError(err)
go func() {
// sub routine:
lock := New(testLockPath)
// before entering the block '.Lock()' call, signal we are about to do it
// see below : the main goroutine will wait for a small delay before releasing the lock
syncChan <- true
// '.Lock()' should ultimately return without error :
err := lock.Lock()
assert.NoError(err)
err = lock.Unlock()
assert.NoError(err)
close(syncChan)
}()
// wait for the "ready" signal from the sub routine,
<-syncChan
// after that signal wait for a small delay before releasing the lock
<-time.After(100 * time.Microsecond)
err = lock.Unlock()
assert.NoError(err)
// wait for the sub routine to finish
<-syncChan
}
func TestErrorConditions(t *testing.T) {
// error conditions implemented in this version :
// - you can't release a lock you do not hold
// - you can't lock twice the same lock
// -- setup
assert := assert.New(t)
// make sure there is no present lock when starting this test
os.Remove(testLockPath)
lock := New(testLockPath)
// -- run tests :
err := lock.Unlock()
assert.Error(err, "you can't release a lock you do not hold")
// take the lock once:
lock.TryLock()
locked, err := lock.TryLock()
assert.False(locked)
assert.Error(err, "you can't lock twice the same lock (using .TryLock())")
err = lock.Lock()
assert.Error(err, "you can't lock twice the same lock (using .Lock())")
// -- teardown
lock.Unlock()
}

View File

@@ -1,79 +0,0 @@
// +build !aix,!windows
package flock
import (
"os"
"golang.org/x/sys/unix"
)
func lock_sys(path string, nonBlocking bool) (_ *os.File, err error) {
var fh *os.File
fh, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
fh.Close()
}
}()
flag := unix.LOCK_EX
if nonBlocking {
flag |= unix.LOCK_NB
}
err = unix.Flock(int(fh.Fd()), flag)
if err != nil {
return nil, err
}
if !sameInodes(fh, path) {
return nil, ErrInodeChangedAtPath
}
return fh, nil
}
func rm_if_match(fh *os.File, path string) error {
// Sanity check :
// before running "rm", check that the file pointed at by the
// filehandle has the same inode as the path in the filesystem
//
// If this sanity check doesn't pass, store a "ErrInodeChangedAtPath" error,
// if the check passes, run os.Remove, and store the error if any.
//
// note : this sanity check is in no way atomic, but :
// - as long as only cooperative processes are involved, it will work as intended
// - it allows to avoid 99.9% the major pitfall case: "root user forcefully removed the lockfile"
if !sameInodes(fh, path) {
return ErrInodeChangedAtPath
}
return os.Remove(path)
}
func sameInodes(f *os.File, path string) bool {
// get inode from opened file f:
var fstat unix.Stat_t
err := unix.Fstat(int(f.Fd()), &fstat)
if err != nil {
return false
}
fileIno := fstat.Ino
// get inode for path on disk:
var dstat unix.Stat_t
err = unix.Stat(path, &dstat)
if err != nil {
return false
}
pathIno := dstat.Ino
return pathIno == fileIno
}

View File

@@ -1,236 +0,0 @@
package flock
// the "nd" in "nd_test.go" stands for non-deterministic
import (
"errors"
"os"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// The two tests in this file are test some concurrency scenarios :
// 1- TestRaceLock() runs several threads racing for the same lock
// 2- TestShatteredLock() runs racing racing threads, along with another threads which forcibly remove the file from disk
//
// Note that these tests are non-deterministic : the coverage produced by each test depends
// on how the runtime chooses to schedule the concurrent goroutines.
var lockerCount int64
// lockAndCount tries to take a lock on "lockpath"
// if it fails : it returns 0 and stop there
// if it succeeds :
// 1- it sets a defer function to release the lock in the same fashion as "func (b *Bitcask) Close()"
// 2- it increments the shared "lockerCount" above
// 3- it waits for a short amount of time
// 4- it decrements "lockerCount"
// 5- it returns the value it has seen at step 2.
//
// If the locking and unlocking behave as we expect them to,
// instructions 1-5 should be in a critical section,
// and the only possible value at step 2 should be "1".
//
// Returning a value > 0 indicates this function successfully acquired the lock,
// returning a value == 0 indicates that TryLock failed.
func lockAndCount(lockpath string) int64 {
lock := New(lockpath)
ok, _ := lock.TryLock()
if !ok {
return 0
}
defer func() {
lock.Unlock()
}()
x := atomic.AddInt64(&lockerCount, 1)
// emulate a workload :
<-time.After(1 * time.Microsecond)
atomic.AddInt64(&lockerCount, -1)
return x
}
// locker will call the lock function above in a loop, until one of the following holds :
// - reading from the "timeout" channel doesn't block
// - the number of calls to "lock()" that indicate the lock was successfully taken reaches "successfullLockCount"
func locker(t *testing.T, id int, lockPath string, successfulLockCount int, timeout <-chan struct{}) {
timedOut := false
failCount := 0
max := int64(0)
lockloop:
for successfulLockCount > 0 {
select {
case <-timeout:
timedOut = true
break lockloop
default:
}
x := lockAndCount(lockPath)
if x > 0 {
// if x indicates the lock was taken : decrement the counter
successfulLockCount--
}
if x > 1 {
// if x indicates an invalid value : increase the failCount and update max accordingly
failCount++
if x > max {
max = x
}
}
}
// check failure cases :
if timedOut {
t.Fail()
t.Logf("[runner %02d] timed out", id)
}
if failCount > 0 {
t.Fail()
t.Logf("[runner %02d] lockCounter was > 1 on %2.d occasions, max seen value was %2.d", id, failCount, max)
}
}
// TestRaceLock checks that no error occurs when several concurrent actors (goroutines in this case) race for the same lock.
func TestRaceLock(t *testing.T) {
// test parameters, written in code :
// you may want to tweak these values for testing
goroutines := 20 // number of concurrent "locker" goroutines to launch
successfulLockCount := 50 // how many times a "locker" will successfully take the lock before halting
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
// timeout implemented in code
// (the lock acquisition depends on the behavior of the filesystem,
// avoid sending CI in endless loop if something fishy happens on the test server ...)
// tweak this value if needed ; comment out the "close(ch)" instruction below
timeout := 10 * time.Second
ch := make(chan struct{})
go func() {
<-time.After(timeout)
close(ch)
}()
wg := &sync.WaitGroup{}
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(id int) {
locker(t, id, testLockPath, successfulLockCount, ch)
wg.Done()
}(i)
}
wg.Wait()
}
func isExpectedError(err error) bool {
switch {
case err == nil:
return true
case err == ErrInodeChangedAtPath:
return true
case errors.Is(err, syscall.ENOENT):
return true
default:
return false
}
}
// TestShatteredLock runs concurrent goroutines on one lock, with an extra goroutine
// which removes the lock file from disk without checking the locks
// (e.g: a user who would run 'rm lockfile' in a loop while the program is running).
//
// In this scenario, errors may occur on .Unlock() ; this test checks that only errors
// relating to the file being deleted occur.
//
// This test additionally logs the number of errors that occurred, grouped by error message.
func TestShatteredLock(t *testing.T) {
// test parameters, written in code :
// you may want to tweak these values for testing
goroutines := 4 // number of concurrent "locker" and "remover" goroutines to launch
successfulLockCount := 10 // how many times a "locker" will successfully take the lock before halting
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
assert := assert.New(t)
wg := &sync.WaitGroup{}
wg.Add(goroutines)
stopChan := make(chan struct{})
errChan := make(chan error, 10)
for i := 0; i < goroutines; i++ {
go func(id int, count int) {
for count > 0 {
lock := New(testLockPath)
ok, _ := lock.TryLock()
if !ok {
continue
}
count--
err := lock.Unlock()
if !isExpectedError(err) {
assert.Fail("goroutine %d - unexpected error: %v", id, err)
}
if err != nil {
errChan <- err
}
}
wg.Done()
}(i, successfulLockCount)
}
var wgCompanion = &sync.WaitGroup{}
wgCompanion.Add(2)
go func() {
defer wgCompanion.Done()
for {
os.Remove(testLockPath)
select {
case <-stopChan:
return
default:
}
}
}()
var errs = make(map[string]int)
go func() {
for err := range errChan {
errs[err.Error()]++
}
wgCompanion.Done()
}()
wg.Wait()
close(stopChan)
close(errChan)
wgCompanion.Wait()
for err, count := range errs {
t.Logf(" seen %d times: %s", count, err)
}
}

5
go.mod
View File

@@ -1,8 +1,10 @@
module git.mills.io/prologic/bitcask
go 1.13
go 1.16
require (
github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81
github.com/gofrs/flock v0.8.0
github.com/pkg/errors v0.9.1
github.com/plar/go-adaptive-radix-tree v1.0.4
github.com/sirupsen/logrus v1.8.1
@@ -13,5 +15,4 @@ require (
github.com/stretchr/testify v1.7.0
github.com/tidwall/redcon v1.4.1
golang.org/x/exp v0.0.0-20200228211341-fcea875c7e85
golang.org/x/sys v0.0.0-20210510120138-977fb7262007
)

6
go.sum
View File

@@ -40,6 +40,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81 h1:uHogIJ9bXH75ZYrXnVShHIyywFiUZ7OOabwd9Sfd8rw=
github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81/go.mod h1:6ZvnjTZX1LNo1oLpfaJK8h+MXqHxcBFBIwkgsv+xlv0=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
@@ -93,6 +95,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY=
github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
@@ -160,6 +164,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
@@ -266,6 +271,7 @@ github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=

View File

@@ -6,9 +6,9 @@ import (
"path/filepath"
"sync"
"github.com/pkg/errors"
"git.mills.io/prologic/bitcask/internal"
"git.mills.io/prologic/bitcask/internal/data/codec"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
)
@@ -74,9 +74,11 @@ func NewDatafile(path string, id int, readonly bool, maxKeySize uint32, maxValue
return nil, errors.Wrap(err, "error calling Stat()")
}
ra, err = mmap.Open(fn)
if err != nil {
return nil, err
if readonly {
ra, err = mmap.Open(fn)
if err != nil {
return nil, err
}
}
offset := stat.Size()
@@ -107,7 +109,9 @@ func (df *datafile) Name() string {
func (df *datafile) Close() error {
defer func() {
df.ra.Close()
if df.ra != nil {
df.ra.Close()
}
df.r.Close()
}()
@@ -155,7 +159,10 @@ func (df *datafile) ReadAt(index, size int64) (e internal.Entry, err error) {
b := make([]byte, size)
if df.w == nil {
df.RLock()
defer df.RUnlock()
if df.ra != nil {
n, err = df.ra.ReadAt(b, index)
} else {
n, err = df.r.ReadAt(b, index)

View File

@@ -27,7 +27,7 @@ func CheckAndRecover(path string, cfg *config.Config) error {
f := dfs[len(dfs)-1]
recovered, err := recoverDatafile(f, cfg)
if err != nil {
return fmt.Errorf("recovering data file")
return fmt.Errorf("error recovering data file: %s", err)
}
if recovered {
if err := os.Remove(filepath.Join(path, "index")); err != nil {
@@ -48,8 +48,8 @@ func recoverDatafile(path string, cfg *config.Config) (recovered bool, err error
err = closeErr
}
}()
_, file := filepath.Split(path)
rPath := fmt.Sprintf("%s.recovered", file)
dir, file := filepath.Split(path)
rPath := filepath.Join(dir, fmt.Sprintf("%s.recovered", file))
fr, err := os.OpenFile(rPath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return false, fmt.Errorf("creating the recovered datafile: %w", err)

View File

@@ -2,17 +2,49 @@ package internal
import (
"fmt"
"runtime/debug"
"strings"
)
const (
defaultVersion = "0.0.0"
defaultCommit = "HEAD"
defaultBuild = "0000-01-01:00:00+00:00"
)
var (
// Version release version
Version = "0.0.1"
// Version is the tagged release version in the form <major>.<minor>.<patch>
// following semantic versioning and is overwritten by the build system.
Version = defaultVersion
// Commit will be overwritten automatically by the build system
Commit = "HEAD"
// Commit is the commit sha of the build (normally from Git) and is overwritten
// by the build system.
Commit = defaultCommit
// Build is the date and time of the build as an RFC3339 formatted string
// and is overwritten by the build system.
Build = defaultBuild
)
// FullVersion returns the full version and commit hash
// FullVersion display the full version and build
func FullVersion() string {
return fmt.Sprintf("%s@%s", Version, Commit)
var sb strings.Builder
isDefault := Version == defaultVersion && Commit == defaultCommit && Build == defaultBuild
if !isDefault {
sb.WriteString(fmt.Sprintf("%s@%s %s", Version, Commit, Build))
}
if info, ok := debug.ReadBuildInfo(); ok {
if isDefault {
sb.WriteString(fmt.Sprintf(" %s", info.Main.Version))
}
sb.WriteString(fmt.Sprintf(" %s", info.GoVersion))
if info.Main.Sum != "" {
sb.WriteString(fmt.Sprintf(" %s", info.Main.Sum))
}
}
return sb.String()
}

View File

@@ -26,7 +26,7 @@ echo "Releasing ${TAG} ..."
git-chglog --next-tag="${TAG}" --output CHANGELOG.md
git commit -a -m "Update CHANGELOG for ${TAG}"
git tag -a -s -m "Release ${TAG}" "${TAG}"
git push --tags
git push && git push --tags
goreleaser release \
--rm-dist \
--release-notes <(git-chglog "${TAG}" | tail -n+5)