mirror of
https://github.com/taigrr/bitcask
synced 2025-01-18 04:03:17 -08:00
160 lines
3.3 KiB
Go
160 lines
3.3 KiB
Go
package migrations
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
|
|
"git.mills.io/prologic/bitcask/internal"
|
|
)
|
|
|
|
const (
|
|
keySize = 4
|
|
valueSize = 8
|
|
checksumSize = 4
|
|
ttlSize = 8
|
|
defaultDatafileFilename = "%09d.data"
|
|
)
|
|
|
|
func ApplyV0ToV1(dir string, maxDatafileSize int) error {
|
|
temp, err := prepare(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.RemoveAll(temp)
|
|
err = apply(dir, temp, maxDatafileSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return cleanup(dir, temp)
|
|
}
|
|
|
|
func prepare(dir string) (string, error) {
|
|
return ioutil.TempDir(dir, "migration")
|
|
}
|
|
|
|
func apply(dir, temp string, maxDatafileSize int) error {
|
|
datafilesPath, err := internal.GetDatafiles(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var id, newOffset int
|
|
datafile, err := getNewDatafile(temp, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
id++
|
|
for _, p := range datafilesPath {
|
|
df, err := os.Open(p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var off int64
|
|
for {
|
|
entry, err := getSingleEntry(df, off)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if newOffset+len(entry) > maxDatafileSize {
|
|
err = datafile.Sync()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
datafile, err = getNewDatafile(temp, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
id++
|
|
newOffset = 0
|
|
}
|
|
newEntry := make([]byte, len(entry)+ttlSize)
|
|
copy(newEntry[:len(entry)], entry)
|
|
n, err := datafile.Write(newEntry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newOffset += n
|
|
off += int64(len(entry))
|
|
}
|
|
}
|
|
return datafile.Sync()
|
|
}
|
|
|
|
func cleanup(dir, temp string) error {
|
|
files, err := ioutil.ReadDir(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, file := range files {
|
|
if !file.IsDir() {
|
|
err := os.RemoveAll(path.Join([]string{dir, file.Name()}...))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
files, err = ioutil.ReadDir(temp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, file := range files {
|
|
err := os.Rename(
|
|
path.Join([]string{temp, file.Name()}...),
|
|
path.Join([]string{dir, file.Name()}...),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getNewDatafile(path string, id int) (*os.File, error) {
|
|
fn := filepath.Join(path, fmt.Sprintf(defaultDatafileFilename, id))
|
|
return os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
|
|
}
|
|
|
|
func getSingleEntry(f *os.File, offset int64) ([]byte, error) {
|
|
prefixBuf, err := readPrefix(f, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
actualKeySize, actualValueSize := getKeyValueSize(prefixBuf)
|
|
entryBuf, err := read(f, uint64(actualKeySize)+actualValueSize+checksumSize, offset+keySize+valueSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return append(prefixBuf, entryBuf...), nil
|
|
}
|
|
|
|
func readPrefix(f *os.File, offset int64) ([]byte, error) {
|
|
prefixBuf := make([]byte, keySize+valueSize)
|
|
_, err := f.ReadAt(prefixBuf, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return prefixBuf, nil
|
|
}
|
|
|
|
func read(f *os.File, bufSize uint64, offset int64) ([]byte, error) {
|
|
buf := make([]byte, bufSize)
|
|
_, err := f.ReadAt(buf, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return buf, nil
|
|
}
|
|
|
|
func getKeyValueSize(buf []byte) (uint32, uint64) {
|
|
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
|
|
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
|
|
return actualKeySize, actualValueSize
|
|
}
|