From 5c6ceadac1d4545be7a9e28255f73a3a0db76004 Mon Sep 17 00:00:00 2001 From: Yash Suresh Chandra Date: Mon, 21 Dec 2020 13:11:43 +0530 Subject: [PATCH] Add support for keys with ttl (#177) * ttl support first commit * imports fix * put api args correction * put options added * upgrade method added * upgrade log added * v0 to v1 migration script added * error assertion added * temp migration dir fix Co-authored-by: yash --- AUTHORS | 1 + bitcask.go | 138 ++++++++++++++++-------- bitcask_test.go | 31 ++++-- cmd/bitcaskd/server.go | 12 ++- internal/config/config.go | 1 + internal/data/codec/decoder.go | 19 +++- internal/data/codec/decoder_test.go | 21 ++++ internal/data/codec/encoder.go | 15 ++- internal/data/codec/encoder_test.go | 5 +- internal/entry.go | 5 +- options.go | 17 +++ scripts/migrations/v0_to_v1.go | 159 ++++++++++++++++++++++++++++ scripts/migrations/v0_to_v1_test.go | 58 ++++++++++ 13 files changed, 421 insertions(+), 61 deletions(-) create mode 100644 scripts/migrations/v0_to_v1.go create mode 100644 scripts/migrations/v0_to_v1_test.go diff --git a/AUTHORS b/AUTHORS index ef5e7ec..11da40d 100644 --- a/AUTHORS +++ b/AUTHORS @@ -13,6 +13,7 @@ Kebert Xela kebertxela panyun panyun shiniao Whemoon Jang +Yash Chandra Yury Fedorov orlangure o2gy84 garsue diff --git a/bitcask.go b/bitcask.go index 4312001..5780236 100644 --- a/bitcask.go +++ b/bitcask.go @@ -11,6 +11,7 @@ import ( "path/filepath" "sort" "sync" + "time" art "github.com/plar/go-adaptive-radix-tree" "github.com/prologic/bitcask/flock" @@ -20,6 +21,8 @@ import ( "github.com/prologic/bitcask/internal/data/codec" "github.com/prologic/bitcask/internal/index" "github.com/prologic/bitcask/internal/metadata" + "github.com/prologic/bitcask/scripts/migrations" + log "github.com/sirupsen/logrus" ) const ( @@ -34,6 +37,10 @@ var ( // maximum allowed key size (configured with WithMaxKeySize). ErrKeyTooLarge = errors.New("error: key too large") + // ErrKeyExpired is the error returned when a key is queried which has + // already expired (due to ttl) + ErrKeyExpired = errors.New("error: key expired") + // ErrEmptyKey is the error returned for a value with an empty key. ErrEmptyKey = errors.New("error: empty key") @@ -49,6 +56,8 @@ var ( // (typically opened by another process) ErrDatabaseLocked = errors.New("error: database locked") + ErrInvalidVersion = errors.New("error: invalid db version") + // ErrMergeInProgress is the error returned if merge is called when already a merge // is in progress ErrMergeInProgress = errors.New("error: merge already in progress") @@ -139,42 +148,14 @@ func (b *Bitcask) Sync() error { return b.curr.Sync() } -// Get fetches value for given key +// Get fetches value for a key func (b *Bitcask) Get(key []byte) ([]byte, error) { b.mu.RLock() defer b.mu.RUnlock() - - return b.get(key) -} - -// get retrieves the value of the given key. If the key is not found or an/I/O -// error occurs a null byte slice is returned along with the error. -func (b *Bitcask) get(key []byte) ([]byte, error) { - var df data.Datafile - - value, found := b.trie.Search(key) - if !found { - return nil, ErrKeyNotFound - } - - item := value.(internal.Item) - - if item.FileID == b.curr.FileID() { - df = b.curr - } else { - df = b.datafiles[item.FileID] - } - - e, err := df.ReadAt(item.Offset, item.Size) + e, err := b.get(key) if err != nil { return nil, err } - - checksum := crc32.ChecksumIEEE(e.Value) - if checksum != e.Checksum { - return nil, ErrChecksumFailed - } - return e.Value, nil } @@ -187,7 +168,7 @@ func (b *Bitcask) Has(key []byte) bool { } // Put stores the key and value in the database. -func (b *Bitcask) Put(key, value []byte) error { +func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error { if len(key) == 0 { return ErrEmptyKey } @@ -197,10 +178,16 @@ func (b *Bitcask) Put(key, value []byte) error { if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize { return ErrValueTooLarge } + var feature Feature + for _, opt := range options { + if err := opt(&feature); err != nil { + return err + } + } b.mu.Lock() defer b.mu.Unlock() - offset, n, err := b.put(key, value) + offset, n, err := b.put(key, value, feature) if err != nil { return err } @@ -224,20 +211,24 @@ func (b *Bitcask) Put(key, value []byte) error { return nil } -// Delete deletes the named key. If the key doesn't exist or an I/O error -// occurs the error is returned. +// Delete deletes the named key. func (b *Bitcask) Delete(key []byte) error { b.mu.Lock() - _, _, err := b.put(key, []byte{}) + defer b.mu.Unlock() + return b.delete(key) +} + +// delete deletes the named key. If the key doesn't exist or an I/O error +// occurs the error is returned. +func (b *Bitcask) delete(key []byte) error { + _, _, err := b.put(key, []byte{}, Feature{}) if err != nil { - b.mu.Unlock() return err } if item, found := b.trie.Search(key); found { b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key)) } b.trie.Delete(key) - b.mu.Unlock() return nil } @@ -248,7 +239,7 @@ func (b *Bitcask) DeleteAll() (err error) { defer b.mu.RUnlock() b.trie.ForEach(func(node art.Node) bool { - _, _, err = b.put(node.Key(), []byte{}) + _, _, err = b.put(node.Key(), []byte{}, Feature{}) if err != nil { return false } @@ -320,8 +311,44 @@ func (b *Bitcask) Fold(f func(key []byte) error) (err error) { return } +// get retrieves the value of the given key. If the key is not found or an/I/O +// error occurs a null byte slice is returned along with the error. +func (b *Bitcask) get(key []byte) (internal.Entry, error) { + var df data.Datafile + + value, found := b.trie.Search(key) + if !found { + return internal.Entry{}, ErrKeyNotFound + } + + item := value.(internal.Item) + + if item.FileID == b.curr.FileID() { + df = b.curr + } else { + df = b.datafiles[item.FileID] + } + + e, err := df.ReadAt(item.Offset, item.Size) + if err != nil { + return internal.Entry{}, err + } + + if e.Expiry != nil && e.Expiry.Before(time.Now().UTC()) { + _ = b.delete(key) // we don't care if it doesnt succeed + return internal.Entry{}, ErrKeyExpired + } + + checksum := crc32.ChecksumIEEE(e.Value) + if checksum != e.Checksum { + return internal.Entry{}, ErrChecksumFailed + } + + return e, nil +} + // put inserts a new (key, value). Both key and value are valid inputs. -func (b *Bitcask) put(key, value []byte) (int64, int64, error) { +func (b *Bitcask) put(key, value []byte, feature Feature) (int64, int64, error) { size := b.curr.Size() if size >= int64(b.config.MaxDatafileSize) { err := b.curr.Close() @@ -350,7 +377,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) { } } - e := internal.NewEntry(key, value) + e := internal.NewEntry(key, value, feature.Expiry) return b.curr.Write(e) } @@ -466,12 +493,17 @@ func (b *Bitcask) Merge() error { if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] { return nil } - value, err := b.get(key) + e, err := b.get(key) if err != nil { return err } + // prepare entry options + var opts []PutOptions + if e.Expiry != nil { + opts = append(opts, WithExpiry(*(e.Expiry))) + } - if err := mdb.Put(key, value); err != nil { + if err := mdb.Put(key, e.Value, opts...); err != nil { return err } @@ -553,6 +585,10 @@ func Open(path string, options ...Option) (*Bitcask, error) { cfg = newDefaultConfig() } + if err := checkAndUpgrade(cfg, configPath); err != nil { + return nil, err + } + for _, opt := range options { if err := opt(cfg); err != nil { return nil, err @@ -602,6 +638,24 @@ func Open(path string, options ...Option) (*Bitcask, error) { return bitcask, nil } +// checkAndUpgrade checks if DB upgrade is required +// if yes, then applies version upgrade and saves updated config +func checkAndUpgrade(cfg *config.Config, configPath string) error { + if cfg.DBVersion == CurrentDBVersion { + return nil + } + if cfg.DBVersion > CurrentDBVersion { + return ErrInvalidVersion + } + // for v0 to v1 upgrade, we need to append 8 null bytes after each encoded entry in datafiles + if cfg.DBVersion == uint32(0) && CurrentDBVersion == uint32(1) { + log.Warn("upgrading db version, might take some time....") + cfg.DBVersion = CurrentDBVersion + return migrations.ApplyV0ToV1(filepath.Dir(configPath), cfg.MaxDatafileSize) + } + return nil +} + // Backup copies db directory to given path // it creates path if it does not exist func (b *Bitcask) Backup(path string) error { diff --git a/bitcask_test.go b/bitcask_test.go index 45e35be..a1b42c7 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -70,7 +71,7 @@ func TestAll(t *testing.T) { }) t.Run("Put", func(t *testing.T) { - err = db.Put([]byte([]byte("foo")), []byte("bar")) + err = db.Put([]byte("foo"), []byte("bar")) assert.NoError(err) }) @@ -84,6 +85,18 @@ func TestAll(t *testing.T) { assert.Equal(1, db.Len()) }) + t.Run("PutWithExpiry", func(t *testing.T) { + err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + assert.NoError(err) + }) + + t.Run("GetExpiredKey", func(t *testing.T) { + time.Sleep(time.Millisecond) + _, err := db.Get([]byte("bar")) + assert.Error(err) + assert.Equal(ErrKeyExpired, err) + }) + t.Run("Has", func(t *testing.T) { assert.True(db.Has([]byte("foo"))) }) @@ -338,19 +351,19 @@ func TestMetadata(t *testing.T) { }) t.Run("ReclaimableAfterRepeatedPut", func(t *testing.T) { assert.NoError(db.Put([]byte("hello"), []byte("world"))) - assert.Equal(int64(26), db.Reclaimable()) + assert.Equal(int64(34), db.Reclaimable()) }) t.Run("ReclaimableAfterDelete", func(t *testing.T) { assert.NoError(db.Delete([]byte("hello"))) - assert.Equal(int64(73), db.Reclaimable()) + assert.Equal(int64(97), db.Reclaimable()) }) t.Run("ReclaimableAfterNonExistingDelete", func(t *testing.T) { assert.NoError(db.Delete([]byte("hello1"))) - assert.Equal(int64(73), db.Reclaimable()) + assert.Equal(int64(97), db.Reclaimable()) }) t.Run("ReclaimableAfterDeleteAll", func(t *testing.T) { assert.NoError(db.DeleteAll()) - assert.Equal(int64(158), db.Reclaimable()) + assert.Equal(int64(214), db.Reclaimable()) }) t.Run("ReclaimableAfterMerge", func(t *testing.T) { assert.NoError(db.Merge()) @@ -1072,7 +1085,7 @@ func TestGetErrors(t *testing.T) { mockDatafile := new(mocks.Datafile) mockDatafile.On("FileID").Return(0) - mockDatafile.On("ReadAt", int64(0), int64(22)).Return( + mockDatafile.On("ReadAt", int64(0), int64(30)).Return( internal.Entry{}, ErrMockError, ) @@ -1088,7 +1101,7 @@ func TestGetErrors(t *testing.T) { assert.NoError(err) defer os.RemoveAll(testdir) - db, err := Open(testdir, WithMaxDatafileSize(32)) + db, err := Open(testdir, WithMaxDatafileSize(40)) assert.NoError(err) err = db.Put([]byte("foo"), []byte("bar")) @@ -1096,7 +1109,7 @@ func TestGetErrors(t *testing.T) { mockDatafile := new(mocks.Datafile) mockDatafile.On("FileID").Return(0) - mockDatafile.On("ReadAt", int64(0), int64(22)).Return( + mockDatafile.On("ReadAt", int64(0), int64(30)).Return( internal.Entry{ Checksum: 0x0, Key: []byte("foo"), @@ -1377,7 +1390,7 @@ func TestMergeErrors(t *testing.T) { mockDatafile := new(mocks.Datafile) mockDatafile.On("Close").Return(nil) - mockDatafile.On("ReadAt", int64(0), int64(22)).Return( + mockDatafile.On("ReadAt", int64(0), int64(30)).Return( internal.Entry{}, ErrMockError, ) diff --git a/cmd/bitcaskd/server.go b/cmd/bitcaskd/server.go index 6e126e0..39aadc4 100644 --- a/cmd/bitcaskd/server.go +++ b/cmd/bitcaskd/server.go @@ -1,11 +1,13 @@ package main import ( + "encoding/binary" "fmt" "os" "os/signal" "strings" "syscall" + "time" log "github.com/sirupsen/logrus" "github.com/tidwall/redcon" @@ -32,12 +34,18 @@ func newServer(bind, dbpath string) (*server, error) { } func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) { - if len(cmd.Args) != 3 { + if len(cmd.Args) != 3 && len(cmd.Args) != 4 { conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } key := cmd.Args[1] value := cmd.Args[2] + var opts []bitcask.PutOptions + if len(cmd.Args) == 4 { + ttl, _ := binary.Varint(cmd.Args[3]) + e := time.Now().UTC().Add(time.Duration(ttl)*time.Millisecond) + opts = append(opts, bitcask.WithExpiry(e)) + } err := s.db.Lock() if err != nil { @@ -46,7 +54,7 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) { } defer s.db.Unlock() - if err := s.db.Put(key, value); err != nil { + if err := s.db.Put(key, value, opts...); err != nil { conn.WriteString(fmt.Sprintf("ERR: %s", err)) } else { conn.WriteString("OK") diff --git a/internal/config/config.go b/internal/config/config.go index 6ac170b..8f5dd30 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -13,6 +13,7 @@ type Config struct { MaxValueSize uint64 `json:"max_value_size"` Sync bool `json:"sync"` AutoRecovery bool `json:"autorecovery"` + DBVersion uint32 `json:"db_version"` DirFileModeBeforeUmask os.FileMode FileFileModeBeforeUmask os.FileMode } diff --git a/internal/data/codec/decoder.go b/internal/data/codec/decoder.go index 4c1f93a..870fbef 100644 --- a/internal/data/codec/decoder.go +++ b/internal/data/codec/decoder.go @@ -3,6 +3,7 @@ package codec import ( "encoding/binary" "io" + "time" "github.com/pkg/errors" "github.com/prologic/bitcask/internal" @@ -49,13 +50,13 @@ func (d *Decoder) Decode(v *internal.Entry) (int64, error) { return 0, err } - buf := make([]byte, uint64(actualKeySize)+actualValueSize+checksumSize) + buf := make([]byte, uint64(actualKeySize)+actualValueSize+checksumSize+ttlSize) if _, err = io.ReadFull(d.r, buf); err != nil { return 0, errTruncatedData } decodeWithoutPrefix(buf, actualKeySize, v) - return int64(keySize + valueSize + uint64(actualKeySize) + actualValueSize + checksumSize), nil + return int64(keySize + valueSize + uint64(actualKeySize) + actualValueSize + checksumSize + ttlSize), nil } // DecodeEntry decodes a serialized entry @@ -84,8 +85,18 @@ func getKeyValueSizes(buf []byte, maxKeySize uint32, maxValueSize uint64) (uint3 func decodeWithoutPrefix(buf []byte, valueOffset uint32, v *internal.Entry) { v.Key = buf[:valueOffset] - v.Value = buf[valueOffset : len(buf)-checksumSize] - v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:]) + v.Value = buf[valueOffset : len(buf)-checksumSize-ttlSize] + v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize-ttlSize : len(buf)-ttlSize]) + v.Expiry = getKeyExpiry(buf) +} + +func getKeyExpiry(buf []byte) *time.Time { + expiry := binary.BigEndian.Uint64(buf[len(buf)-ttlSize:]) + if expiry == uint64(0) { + return nil + } + t := time.Unix(int64(expiry), 0).UTC() + return &t } // IsCorruptedData indicates if the error correspondes to possible data corruption diff --git a/internal/data/codec/decoder_test.go b/internal/data/codec/decoder_test.go index 203d07b..d37a7b8 100644 --- a/internal/data/codec/decoder_test.go +++ b/internal/data/codec/decoder_test.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "io" "testing" + "time" "github.com/prologic/bitcask/internal" "github.com/stretchr/testify/assert" @@ -107,3 +108,23 @@ func TestTruncatedData(t *testing.T) { }) } } + +func TestDecodeWithoutPrefix(t *testing.T) { + assert := assert.New(t) + e := internal.Entry{} + buf := []byte{0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 7, 109, 121, 107, 101, 121, 109, 121, 118, 97, 108, 117, 101, 0, 6, 81, 189, 0, 0, 0, 0, 95, 117, 28, 0} + valueOffset := uint32(5) + mockTime := time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC) + expectedEntry := internal.Entry{ + Key: []byte("mykey"), + Value: []byte("myvalue"), + Checksum: 414141, + Expiry: &mockTime, + } + decodeWithoutPrefix(buf[keySize+valueSize:], valueOffset, &e) + assert.Equal(expectedEntry.Key, e.Key) + assert.Equal(expectedEntry.Value, e.Value) + assert.Equal(expectedEntry.Checksum, e.Checksum) + assert.Equal(expectedEntry.Offset, e.Offset) + assert.Equal(*expectedEntry.Expiry, *e.Expiry) +} diff --git a/internal/data/codec/encoder.go b/internal/data/codec/encoder.go index 221d187..1c56c83 100644 --- a/internal/data/codec/encoder.go +++ b/internal/data/codec/encoder.go @@ -13,7 +13,8 @@ const ( keySize = 4 valueSize = 8 checksumSize = 4 - MetaInfoSize = keySize + valueSize + checksumSize + ttlSize = 8 + MetaInfoSize = keySize + valueSize + checksumSize + ttlSize ) // NewEncoder creates a streaming Entry encoder. @@ -50,9 +51,19 @@ func (e *Encoder) Encode(msg internal.Entry) (int64, error) { return 0, errors.Wrap(err, "failed writing checksum data") } + bufTTL := bufKeyValue[:ttlSize] + if msg.Expiry == nil { + binary.BigEndian.PutUint64(bufTTL, uint64(0)) + } else { + binary.BigEndian.PutUint64(bufTTL, uint64(msg.Expiry.Unix())) + } + if _, err := e.w.Write(bufTTL); err != nil { + return 0, errors.Wrap(err, "failed writing ttl data") + } + if err := e.w.Flush(); err != nil { return 0, errors.Wrap(err, "failed flushing data") } - return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil + return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize + ttlSize), nil } diff --git a/internal/data/codec/encoder_test.go b/internal/data/codec/encoder_test.go index 842df0a..1eea0c7 100644 --- a/internal/data/codec/encoder_test.go +++ b/internal/data/codec/encoder_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/hex" "testing" + "time" "github.com/prologic/bitcask/internal" "github.com/stretchr/testify/assert" @@ -14,15 +15,17 @@ func TestEncode(t *testing.T) { assert := assert.New(t) var buf bytes.Buffer + mockTime := time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC) encoder := NewEncoder(&buf) _, err := encoder.Encode(internal.Entry{ Key: []byte("mykey"), Value: []byte("myvalue"), Checksum: 414141, Offset: 424242, + Expiry: &mockTime, }) - expectedHex := "0000000500000000000000076d796b65796d7976616c7565000651bd" + expectedHex := "0000000500000000000000076d796b65796d7976616c7565000651bd000000005f751c00" if assert.NoError(err) { assert.Equal(expectedHex, hex.EncodeToString(buf.Bytes())) } diff --git a/internal/entry.go b/internal/entry.go index a1272f5..090f2f5 100644 --- a/internal/entry.go +++ b/internal/entry.go @@ -2,6 +2,7 @@ package internal import ( "hash/crc32" + "time" ) // Entry represents a key/value in the database @@ -10,15 +11,17 @@ type Entry struct { Key []byte Offset int64 Value []byte + Expiry *time.Time } // NewEntry creates a new `Entry` with the given `key` and `value` -func NewEntry(key, value []byte) Entry { +func NewEntry(key, value []byte, expiry *time.Time) Entry { checksum := crc32.ChecksumIEEE(value) return Entry{ Checksum: checksum, Key: key, Value: value, + Expiry: expiry, } } diff --git a/options.go b/options.go index 86d8ee3..701df80 100644 --- a/options.go +++ b/options.go @@ -2,6 +2,7 @@ package bitcask import ( "os" + "time" "github.com/prologic/bitcask/internal/config" ) @@ -26,6 +27,8 @@ const ( DefaultSync = false // DefaultAutoRecovery is the default auto-recovery action. + + CurrentDBVersion = uint32(1) ) // Option is a function that takes a config struct and modifies it @@ -111,5 +114,19 @@ func newDefaultConfig() *config.Config { Sync: DefaultSync, DirFileModeBeforeUmask: DefaultDirFileModeBeforeUmask, FileFileModeBeforeUmask: DefaultFileFileModeBeforeUmask, + DBVersion: CurrentDBVersion, + } +} + +type Feature struct { + Expiry *time.Time +} + +type PutOptions func(*Feature) error + +func WithExpiry(expiry time.Time) PutOptions { + return func(f *Feature) error { + f.Expiry = &expiry + return nil } } diff --git a/scripts/migrations/v0_to_v1.go b/scripts/migrations/v0_to_v1.go new file mode 100644 index 0000000..53e942a --- /dev/null +++ b/scripts/migrations/v0_to_v1.go @@ -0,0 +1,159 @@ +package migrations + +import ( + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "path/filepath" + + "github.com/prologic/bitcask/internal" +) + +const ( + keySize = 4 + valueSize = 8 + checksumSize = 4 + ttlSize = 8 + defaultDatafileFilename = "%09d.data" +) + +func ApplyV0ToV1(dir string, maxDatafileSize int) error { + temp, err := prepare(dir) + if err != nil { + return err + } + defer os.RemoveAll(temp) + err = apply(dir, temp, maxDatafileSize) + if err != nil { + return err + } + return cleanup(dir, temp) +} + +func prepare(dir string) (string, error) { + return ioutil.TempDir(dir, "migration") +} + +func apply(dir, temp string, maxDatafileSize int) error { + datafilesPath, err := internal.GetDatafiles(dir) + if err != nil { + return err + } + var id, newOffset int + datafile, err := getNewDatafile(temp, id) + if err != nil { + return err + } + id++ + for _, p := range datafilesPath { + df, err := os.Open(p) + if err != nil { + return err + } + var off int64 + for { + entry, err := getSingleEntry(df, off) + if err == io.EOF { + break + } + if err != nil { + return err + } + if newOffset+len(entry) > maxDatafileSize { + err = datafile.Sync() + if err != nil { + return err + } + datafile, err = getNewDatafile(temp, id) + if err != nil { + return err + } + id++ + newOffset = 0 + } + newEntry := make([]byte, len(entry)+ttlSize) + copy(newEntry[:len(entry)], entry) + n, err := datafile.Write(newEntry) + if err != nil { + return err + } + newOffset += n + off += int64(len(entry)) + } + } + return datafile.Sync() +} + +func cleanup(dir, temp string) error { + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + for _, file := range files { + if !file.IsDir() { + err := os.RemoveAll(path.Join([]string{dir, file.Name()}...)) + if err != nil { + return err + } + } + } + files, err = ioutil.ReadDir(temp) + if err != nil { + return err + } + for _, file := range files { + err := os.Rename( + path.Join([]string{temp, file.Name()}...), + path.Join([]string{dir, file.Name()}...), + ) + if err != nil { + return err + } + } + return nil +} + +func getNewDatafile(path string, id int) (*os.File, error) { + fn := filepath.Join(path, fmt.Sprintf(defaultDatafileFilename, id)) + return os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) +} + +func getSingleEntry(f *os.File, offset int64) ([]byte, error) { + prefixBuf, err := readPrefix(f, offset) + if err != nil { + return nil, err + } + actualKeySize, actualValueSize := getKeyValueSize(prefixBuf) + entryBuf, err := read(f, uint64(actualKeySize)+actualValueSize+checksumSize, offset+keySize+valueSize) + if err != nil { + return nil, err + } + return append(prefixBuf, entryBuf...), nil +} + +func readPrefix(f *os.File, offset int64) ([]byte, error) { + prefixBuf := make([]byte, keySize+valueSize) + _, err := f.ReadAt(prefixBuf, offset) + if err != nil { + return nil, err + } + return prefixBuf, nil +} + +func read(f *os.File, bufSize uint64, offset int64) ([]byte, error) { + buf := make([]byte, bufSize) + _, err := f.ReadAt(buf, offset) + if err != nil { + return nil, err + } + return buf, nil +} + +func getKeyValueSize(buf []byte) (uint32, uint64) { + actualKeySize := binary.BigEndian.Uint32(buf[:keySize]) + actualValueSize := binary.BigEndian.Uint64(buf[keySize:]) + return actualKeySize, actualValueSize +} diff --git a/scripts/migrations/v0_to_v1_test.go b/scripts/migrations/v0_to_v1_test.go new file mode 100644 index 0000000..0dcb0c7 --- /dev/null +++ b/scripts/migrations/v0_to_v1_test.go @@ -0,0 +1,58 @@ +package migrations + +import ( + "encoding/binary" + "encoding/hex" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_ApplyV0ToV1(t *testing.T) { + assert := assert.New(t) + testdir, err := ioutil.TempDir("/tmp", "bitcask") + assert.NoError(err) + defer os.RemoveAll(testdir) + w0, err := os.OpenFile(filepath.Join(testdir, "000000000.data"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) + assert.NoError(err) + w1, err := os.OpenFile(filepath.Join(testdir, "000000001.data"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) + assert.NoError(err) + w2, err := os.OpenFile(filepath.Join(testdir, "000000002.data"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) + assert.NoError(err) + defer w0.Close() + defer w1.Close() + defer w2.Close() + buf := make([]byte, 104) + binary.BigEndian.PutUint32(buf[:4], 5) + binary.BigEndian.PutUint64(buf[4:12], 7) + copy(buf[12:28], "mykeymyvalue0AAA") + binary.BigEndian.PutUint32(buf[28:32], 3) + binary.BigEndian.PutUint64(buf[32:40], 5) + copy(buf[40:52], "keyvalue0BBB") + _, err = w0.Write(buf[:52]) + assert.NoError(err) + _, err = w1.Write(buf[:52]) + assert.NoError(err) + _, err = w2.Write(buf[:52]) + assert.NoError(err) + err = ApplyV0ToV1(testdir, 104) + assert.NoError(err) + r0, err := os.Open(filepath.Join(testdir, "000000000.data")) + assert.NoError(err) + defer r0.Close() + n, err := io.ReadFull(r0, buf) + assert.NoError(err) + assert.Equal(104, n) + assert.Equal("0000000500000000000000076d796b65796d7976616c75653041414100000000000000000000000300000000000000056b657976616c75653042424200000000000000000000000500000000000000076d796b65796d7976616c7565304141410000000000000000", hex.EncodeToString(buf)) + r1, err := os.Open(filepath.Join(testdir, "000000001.data")) + assert.NoError(err) + defer r1.Close() + n, err = io.ReadFull(r1, buf[:100]) + assert.NoError(err) + assert.Equal(100, n) + assert.Equal("0000000300000000000000056b657976616c75653042424200000000000000000000000500000000000000076d796b65796d7976616c75653041414100000000000000000000000300000000000000056b657976616c7565304242420000000000000000", hex.EncodeToString(buf[:100])) +}