1
0
mirror of https://github.com/taigrr/bitcask synced 2025-01-18 04:03:17 -08:00

Add all files again with v2 postfix to URL

This commit is contained in:
2022-02-01 19:06:30 -08:00
parent d23c355e72
commit 60aaf19d15
31 changed files with 6120 additions and 0 deletions

View File

@@ -0,0 +1,110 @@
package codec
import (
"encoding/binary"
"io"
"time"
"git.mills.io/prologic/bitcask/v2/internal"
"github.com/pkg/errors"
)
var (
errInvalidKeyOrValueSize = errors.New("key/value size is invalid")
errCantDecodeOnNilEntry = errors.New("can't decode on nil entry")
errTruncatedData = errors.New("data is truncated")
)
// NewDecoder creates a streaming Entry decoder.
func NewDecoder(r io.Reader, maxKeySize uint32, maxValueSize uint64) *Decoder {
return &Decoder{
r: r,
maxKeySize: maxKeySize,
maxValueSize: maxValueSize,
}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// Entry decodings on it.
type Decoder struct {
r io.Reader
maxKeySize uint32
maxValueSize uint64
}
// Decode decodes the next Entry from the current stream
func (d *Decoder) Decode(v *internal.Entry) (int64, error) {
if v == nil {
return 0, errCantDecodeOnNilEntry
}
prefixBuf := make([]byte, keySize+valueSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
actualKeySize, actualValueSize, err := getKeyValueSizes(prefixBuf, d.maxKeySize, d.maxValueSize)
if err != nil {
return 0, err
}
buf := make([]byte, uint64(actualKeySize)+actualValueSize+checksumSize+ttlSize)
if _, err = io.ReadFull(d.r, buf); err != nil {
return 0, errTruncatedData
}
decodeWithoutPrefix(buf, actualKeySize, v)
return int64(keySize + valueSize + uint64(actualKeySize) + actualValueSize + checksumSize + ttlSize), nil
}
// DecodeEntry decodes a serialized entry
func DecodeEntry(b []byte, e *internal.Entry, maxKeySize uint32, maxValueSize uint64) error {
valueOffset, _, err := getKeyValueSizes(b, maxKeySize, maxValueSize)
if err != nil {
return errors.Wrap(err, "key/value sizes are invalid")
}
decodeWithoutPrefix(b[keySize+valueSize:], valueOffset, e)
return nil
}
func getKeyValueSizes(buf []byte, maxKeySize uint32, maxValueSize uint64) (uint32, uint64, error) {
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
if (maxKeySize > 0 && actualKeySize > maxKeySize) || (maxValueSize > 0 && actualValueSize > maxValueSize) || actualKeySize == 0 {
return 0, 0, errInvalidKeyOrValueSize
}
return actualKeySize, actualValueSize, nil
}
func decodeWithoutPrefix(buf []byte, valueOffset uint32, v *internal.Entry) {
v.Key = buf[:valueOffset]
v.Value = buf[valueOffset : len(buf)-checksumSize-ttlSize]
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize-ttlSize : len(buf)-ttlSize])
v.Expiry = getKeyExpiry(buf)
}
func getKeyExpiry(buf []byte) *time.Time {
expiry := binary.BigEndian.Uint64(buf[len(buf)-ttlSize:])
if expiry == uint64(0) {
return nil
}
t := time.Unix(int64(expiry), 0).UTC()
return &t
}
// IsCorruptedData indicates if the error correspondes to possible data corruption
func IsCorruptedData(err error) bool {
switch err {
case errCantDecodeOnNilEntry, errInvalidKeyOrValueSize, errTruncatedData:
return true
default:
return false
}
}

View File

@@ -0,0 +1,130 @@
package codec
import (
"bytes"
"encoding/binary"
"io"
"testing"
"time"
"git.mills.io/prologic/bitcask/v2/internal"
"github.com/stretchr/testify/assert"
)
func TestDecodeOnNilEntry(t *testing.T) {
t.Parallel()
assert := assert.New(t)
decoder := NewDecoder(&bytes.Buffer{}, 1, 1)
_, err := decoder.Decode(nil)
if assert.Error(err) {
assert.Equal(errCantDecodeOnNilEntry, err)
}
}
func TestShortPrefix(t *testing.T) {
t.Parallel()
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
prefix := make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(prefix, 1)
binary.BigEndian.PutUint64(prefix[keySize:], 1)
truncBytesCount := 2
buf := bytes.NewBuffer(prefix[:keySize+valueSize-truncBytesCount])
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(io.ErrUnexpectedEOF, err)
}
}
func TestInvalidValueKeySizes(t *testing.T) {
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
tests := []struct {
keySize uint32
valueSize uint64
name string
}{
{keySize: 0, valueSize: 5, name: "zero key size"}, //zero value size is correct for tombstones
{keySize: 11, valueSize: 5, name: "key size overflow"},
{keySize: 5, valueSize: 21, name: "value size overflow"},
{keySize: 11, valueSize: 21, name: "key and value size overflow"},
}
for i := range tests {
i := i
t.Run(tests[i].name, func(t *testing.T) {
t.Parallel()
prefix := make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(prefix, tests[i].keySize)
binary.BigEndian.PutUint64(prefix[keySize:], tests[i].valueSize)
buf := bytes.NewBuffer(prefix)
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(errInvalidKeyOrValueSize, err)
}
})
}
}
func TestTruncatedData(t *testing.T) {
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
key := []byte("foo")
value := []byte("bar")
data := make([]byte, keySize+valueSize+len(key)+len(value)+checksumSize)
binary.BigEndian.PutUint32(data, uint32(len(key)))
binary.BigEndian.PutUint64(data[keySize:], uint64(len(value)))
copy(data[keySize+valueSize:], key)
copy(data[keySize+valueSize+len(key):], value)
copy(data[keySize+valueSize+len(key)+len(value):], bytes.Repeat([]byte("0"), checksumSize))
tests := []struct {
data []byte
name string
}{
{data: data[:keySize+valueSize+len(key)-1], name: "truncated key"},
{data: data[:keySize+valueSize+len(key)+len(value)-1], name: "truncated value"},
{data: data[:keySize+valueSize+len(key)+len(value)+checksumSize-1], name: "truncated checksum"},
}
for i := range tests {
i := i
t.Run(tests[i].name, func(t *testing.T) {
t.Parallel()
buf := bytes.NewBuffer(tests[i].data)
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(errTruncatedData, err)
}
})
}
}
func TestDecodeWithoutPrefix(t *testing.T) {
assert := assert.New(t)
e := internal.Entry{}
buf := []byte{0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 7, 109, 121, 107, 101, 121, 109, 121, 118, 97, 108, 117, 101, 0, 6, 81, 189, 0, 0, 0, 0, 95, 117, 28, 0}
valueOffset := uint32(5)
mockTime := time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC)
expectedEntry := internal.Entry{
Key: []byte("mykey"),
Value: []byte("myvalue"),
Checksum: 414141,
Expiry: &mockTime,
}
decodeWithoutPrefix(buf[keySize+valueSize:], valueOffset, &e)
assert.Equal(expectedEntry.Key, e.Key)
assert.Equal(expectedEntry.Value, e.Value)
assert.Equal(expectedEntry.Checksum, e.Checksum)
assert.Equal(expectedEntry.Offset, e.Offset)
assert.Equal(*expectedEntry.Expiry, *e.Expiry)
}

View File

@@ -0,0 +1,69 @@
package codec
import (
"bufio"
"encoding/binary"
"io"
"git.mills.io/prologic/bitcask/v2/internal"
"github.com/pkg/errors"
)
const (
keySize = 4
valueSize = 8
checksumSize = 4
ttlSize = 8
MetaInfoSize = keySize + valueSize + checksumSize + ttlSize
)
// NewEncoder creates a streaming Entry encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// Entry encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any Entry and streams it to the underlying writer.
// Messages are framed with a key-length and value-length prefix.
func (e *Encoder) Encode(msg internal.Entry) (int64, error) {
var bufKeyValue = make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(bufKeyValue[:keySize], uint32(len(msg.Key)))
binary.BigEndian.PutUint64(bufKeyValue[keySize:keySize+valueSize], uint64(len(msg.Value)))
if _, err := e.w.Write(bufKeyValue); err != nil {
return 0, errors.Wrap(err, "failed writing key & value length prefix")
}
if _, err := e.w.Write(msg.Key); err != nil {
return 0, errors.Wrap(err, "failed writing key data")
}
if _, err := e.w.Write(msg.Value); err != nil {
return 0, errors.Wrap(err, "failed writing value data")
}
bufChecksumSize := bufKeyValue[:checksumSize]
binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum)
if _, err := e.w.Write(bufChecksumSize); err != nil {
return 0, errors.Wrap(err, "failed writing checksum data")
}
bufTTL := bufKeyValue[:ttlSize]
if msg.Expiry == nil {
binary.BigEndian.PutUint64(bufTTL, uint64(0))
} else {
binary.BigEndian.PutUint64(bufTTL, uint64(msg.Expiry.Unix()))
}
if _, err := e.w.Write(bufTTL); err != nil {
return 0, errors.Wrap(err, "failed writing ttl data")
}
if err := e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize + ttlSize), nil
}

View File

@@ -0,0 +1,32 @@
package codec
import (
"bytes"
"encoding/hex"
"testing"
"time"
"git.mills.io/prologic/bitcask/v2/internal"
"github.com/stretchr/testify/assert"
)
func TestEncode(t *testing.T) {
t.Parallel()
assert := assert.New(t)
var buf bytes.Buffer
mockTime := time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC)
encoder := NewEncoder(&buf)
_, err := encoder.Encode(internal.Entry{
Key: []byte("mykey"),
Value: []byte("myvalue"),
Checksum: 414141,
Offset: 424242,
Expiry: &mockTime,
})
expectedHex := "0000000500000000000000076d796b65796d7976616c7565000651bd000000005f751c00"
if assert.NoError(err) {
assert.Equal(expectedHex, hex.EncodeToString(buf.Bytes()))
}
}

View File

@@ -0,0 +1,200 @@
package data
import (
"fmt"
"os"
"path/filepath"
"sync"
"git.mills.io/prologic/bitcask/v2/internal"
"git.mills.io/prologic/bitcask/v2/internal/data/codec"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
)
const (
defaultDatafileFilename = "%09d.data"
)
var (
errReadonly = errors.New("error: read only datafile")
errReadError = errors.New("error: read error")
)
// Datafile is an interface that represents a readable and writeable datafile
type Datafile interface {
FileID() int
Name() string
Close() error
Sync() error
Size() int64
Read() (internal.Entry, int64, error)
ReadAt(index, size int64) (internal.Entry, error)
Write(internal.Entry) (int64, int64, error)
}
type datafile struct {
sync.RWMutex
id int
r *os.File
ra *mmap.ReaderAt
w *os.File
offset int64
dec *codec.Decoder
enc *codec.Encoder
maxKeySize uint32
maxValueSize uint64
}
// NewDatafile opens an existing datafile
func NewDatafile(path string, id int, readonly bool, maxKeySize uint32, maxValueSize uint64, fileMode os.FileMode) (Datafile, error) {
var (
r *os.File
ra *mmap.ReaderAt
w *os.File
err error
)
fn := filepath.Join(path, fmt.Sprintf(defaultDatafileFilename, id))
if !readonly {
w, err = os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, fileMode)
if err != nil {
return nil, err
}
}
r, err = os.Open(fn)
if err != nil {
return nil, err
}
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
if readonly {
ra, err = mmap.Open(fn)
if err != nil {
return nil, err
}
}
offset := stat.Size()
dec := codec.NewDecoder(r, maxKeySize, maxValueSize)
enc := codec.NewEncoder(w)
return &datafile{
id: id,
r: r,
ra: ra,
w: w,
offset: offset,
dec: dec,
enc: enc,
maxKeySize: maxKeySize,
maxValueSize: maxValueSize,
}, nil
}
func (df *datafile) FileID() int {
return df.id
}
func (df *datafile) Name() string {
return df.r.Name()
}
func (df *datafile) Close() error {
defer func() {
if df.ra != nil {
df.ra.Close()
}
df.r.Close()
}()
// Readonly datafile -- Nothing further to close on the write side
if df.w == nil {
return nil
}
err := df.Sync()
if err != nil {
return err
}
return df.w.Close()
}
func (df *datafile) Sync() error {
if df.w == nil {
return nil
}
return df.w.Sync()
}
func (df *datafile) Size() int64 {
df.RLock()
defer df.RUnlock()
return df.offset
}
// Read reads the next entry from the datafile
func (df *datafile) Read() (e internal.Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
n, err = df.dec.Decode(&e)
if err != nil {
return
}
return
}
// ReadAt the entry located at index offset with expected serialized size
func (df *datafile) ReadAt(index, size int64) (e internal.Entry, err error) {
var n int
b := make([]byte, size)
df.RLock()
defer df.RUnlock()
if df.ra != nil {
n, err = df.ra.ReadAt(b, index)
} else {
n, err = df.r.ReadAt(b, index)
}
if err != nil {
return
}
if int64(n) != size {
err = errReadError
return
}
codec.DecodeEntry(b, &e, df.maxKeySize, df.maxValueSize)
return
}
func (df *datafile) Write(e internal.Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, errReadonly
}
df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(e)
if err != nil {
return -1, 0, err
}
df.offset += n
return e.Offset, n, nil
}

View File

@@ -0,0 +1,95 @@
package data
import (
"fmt"
"io"
"os"
"path/filepath"
"git.mills.io/prologic/bitcask/v2/internal"
"git.mills.io/prologic/bitcask/v2/internal/config"
"git.mills.io/prologic/bitcask/v2/internal/data/codec"
)
// CheckAndRecover checks and recovers the last datafile.
// If the datafile isn't corrupted, this is a noop. If it is,
// the longest non-corrupted prefix will be kept and the rest
// will be *deleted*. Also, the index file is also *deleted* which
// will be automatically recreated on next startup.
func CheckAndRecover(path string, cfg *config.Config) error {
dfs, err := internal.GetDatafiles(path)
if err != nil {
return fmt.Errorf("scanning datafiles: %s", err)
}
if len(dfs) == 0 {
return nil
}
f := dfs[len(dfs)-1]
recovered, err := recoverDatafile(f, cfg)
if err != nil {
return fmt.Errorf("error recovering data file: %s", err)
}
if recovered {
if err := os.Remove(filepath.Join(path, "index")); err != nil {
return fmt.Errorf("error deleting the index on recovery: %s", err)
}
}
return nil
}
func recoverDatafile(path string, cfg *config.Config) (recovered bool, err error) {
f, err := os.Open(path)
if err != nil {
return false, fmt.Errorf("opening the datafile: %s", err)
}
defer func() {
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()
dir, file := filepath.Split(path)
rPath := filepath.Join(dir, fmt.Sprintf("%s.recovered", file))
fr, err := os.OpenFile(rPath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return false, fmt.Errorf("creating the recovered datafile: %w", err)
}
defer func() {
closeErr := fr.Close()
if err == nil {
err = closeErr
}
}()
dec := codec.NewDecoder(f, cfg.MaxKeySize, cfg.MaxValueSize)
enc := codec.NewEncoder(fr)
e := internal.Entry{}
corrupted := false
for !corrupted {
_, err = dec.Decode(&e)
if err == io.EOF {
break
}
if codec.IsCorruptedData(err) {
corrupted = true
continue
}
if err != nil {
return false, fmt.Errorf("unexpected error while reading datafile: %w", err)
}
if _, err := enc.Encode(e); err != nil {
return false, fmt.Errorf("writing to recovered datafile: %w", err)
}
}
if !corrupted {
if err := os.Remove(fr.Name()); err != nil {
return false, fmt.Errorf("can't remove temporal recovered datafile: %w", err)
}
return false, nil
}
if err := os.Rename(rPath, path); err != nil {
return false, fmt.Errorf("removing corrupted file: %s", err)
}
return true, nil
}