1
0
mirror of https://github.com/taigrr/bitcask synced 2025-01-18 04:03:17 -08:00

Unexport some internal implemtnation details

This commit is contained in:
James Mills
2019-03-18 17:22:55 +10:00
parent 2a35976cdd
commit 1298240f53
14 changed files with 46 additions and 39 deletions

143
internal/datafile.go Normal file
View File

@@ -0,0 +1,143 @@
package internal
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/pkg/errors"
pb "github.com/prologic/bitcask/internal/proto"
"github.com/prologic/bitcask/internal/streampb"
)
const (
DefaultDatafileFilename = "%09d.data"
)
var (
ErrReadonly = errors.New("error: read only datafile")
)
type Datafile struct {
sync.RWMutex
id int
r *os.File
w *os.File
offset int64
dec *streampb.Decoder
enc *streampb.Encoder
}
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
var (
r *os.File
w *os.File
err error
)
fn := filepath.Join(path, fmt.Sprintf(DefaultDatafileFilename, id))
if !readonly {
w, err = os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
if err != nil {
return nil, err
}
}
r, err = os.Open(fn)
if err != nil {
return nil, err
}
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
offset := stat.Size()
dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w)
return &Datafile{
id: id,
r: r,
w: w,
offset: offset,
dec: dec,
enc: enc,
}, nil
}
func (df *Datafile) FileID() int {
return df.id
}
func (df *Datafile) Name() string {
return df.r.Name()
}
func (df *Datafile) Close() error {
if df.w == nil {
return df.r.Close()
}
err := df.Sync()
if err != nil {
return err
}
return df.w.Close()
}
func (df *Datafile) Sync() error {
if df.w == nil {
return nil
}
return df.w.Sync()
}
func (df *Datafile) Size() (int64, error) {
df.RLock()
defer df.RUnlock()
return df.offset, nil
}
func (df *Datafile) Read() (e pb.Entry, err error) {
df.Lock()
defer df.Unlock()
return e, df.dec.Decode(&e)
}
func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) {
df.Lock()
defer df.Unlock()
_, err = df.r.Seek(index, os.SEEK_SET)
if err != nil {
return
}
return e, df.dec.Decode(&e)
}
func (df *Datafile) Write(e pb.Entry) (int64, error) {
if df.w == nil {
return -1, ErrReadonly
}
df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(&e)
if err != nil {
return -1, err
}
df.offset += n
return e.Offset, nil
}

17
internal/entry.go Normal file
View File

@@ -0,0 +1,17 @@
package internal
import (
"hash/crc32"
pb "github.com/prologic/bitcask/internal/proto"
)
func NewEntry(key string, value []byte) pb.Entry {
checksum := crc32.ChecksumIEEE(value)
return pb.Entry{
Checksum: checksum,
Key: key,
Value: value,
}
}

93
internal/keydir.go Normal file
View File

@@ -0,0 +1,93 @@
package internal
import (
"bytes"
"encoding/gob"
"io"
"io/ioutil"
"sync"
)
type Item struct {
FileID int
Offset int64
}
type Keydir struct {
sync.RWMutex
kv map[string]Item
}
func NewKeydir() *Keydir {
return &Keydir{
kv: make(map[string]Item),
}
}
func (k *Keydir) Add(key string, fileid int, offset int64) Item {
item := Item{
FileID: fileid,
Offset: offset,
}
k.Lock()
k.kv[key] = item
k.Unlock()
return item
}
func (k *Keydir) Get(key string) (Item, bool) {
k.RLock()
defer k.RUnlock()
item, ok := k.kv[key]
return item, ok
}
func (k *Keydir) Delete(key string) {
k.Lock()
defer k.Unlock()
delete(k.kv, key)
}
func (k *Keydir) Keys() chan string {
ch := make(chan string)
go func() {
for k := range k.kv {
ch <- k
}
close(ch)
}()
return ch
}
func (k *Keydir) Bytes() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(k.kv)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (k *Keydir) Save(fn string) error {
data, err := k.Bytes()
if err != nil {
return err
}
return ioutil.WriteFile(fn, data, 0644)
}
func NewKeydirFromBytes(r io.Reader) (*Keydir, error) {
k := NewKeydir()
dec := gob.NewDecoder(r)
err := dec.Decode(&k.kv)
if err != nil {
return nil, err
}
return k, nil
}

3
internal/proto/doc.go Normal file
View File

@@ -0,0 +1,3 @@
package proto
//go:generate protoc --go_out=. entry.proto

View File

@@ -0,0 +1,99 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: entry.proto
package proto
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct {
Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_3e91842c99935ae2, []int{0}
}
func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b)
}
func (m *Entry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Entry.Marshal(b, m, deterministic)
}
func (dst *Entry) XXX_Merge(src proto.Message) {
xxx_messageInfo_Entry.Merge(dst, src)
}
func (m *Entry) XXX_Size() int {
return xxx_messageInfo_Entry.Size(m)
}
func (m *Entry) XXX_DiscardUnknown() {
xxx_messageInfo_Entry.DiscardUnknown(m)
}
var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetChecksum() uint32 {
if m != nil {
return m.Checksum
}
return 0
}
func (m *Entry) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *Entry) GetOffset() int64 {
if m != nil {
return m.Offset
}
return 0
}
func (m *Entry) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry")
}
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_3e91842c99935ae2) }
var fileDescriptor_entry_3e91842c99935ae2 = []byte{
// 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
}

View File

@@ -0,0 +1,10 @@
syntax = "proto3";
package proto;
message Entry {
uint32 Checksum = 1;
string Key = 2;
int64 Offset = 3;
bytes Value = 4;
}

View File

@@ -0,0 +1,97 @@
package streampb
import (
"bufio"
"encoding/binary"
"io"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)
const (
// prefixSize is the number of bytes we preallocate for storing
// our big endian lenth prefix buffer.
prefixSize = 8
)
// NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(prefixBuf); err != nil {
return 0, errors.Wrap(err, "failed writing length prefix")
}
n, err := e.w.Write(buf)
if err != nil {
return 0, errors.Wrap(err, "failed writing marshaled data")
}
if err = e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(n + prefixSize), nil
}
// NewDecoder creates a streaming protobuf decoder.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: r}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// proto decodings on it.
type Decoder struct {
r io.Reader
}
// Decode takes a proto.Message and unmarshals the next payload in the
// underlying io.Reader. It returns an EOF when it's done.
func (d *Decoder) Decode(v proto.Message) error {
prefixBuf := make([]byte, prefixSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return err
}
n := binary.BigEndian.Uint64(prefixBuf)
buf := make([]byte, n)
idx := uint64(0)
for idx < n {
m, err := d.r.Read(buf[idx:n])
if err != nil {
return errors.Wrap(translateError(err), "failed reading marshaled data")
}
idx += uint64(m)
}
return proto.Unmarshal(buf[:n], v)
}
func translateError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}

36
internal/utils.go Normal file
View File

@@ -0,0 +1,36 @@
package internal
import (
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
)
func GetDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {
return nil, err
}
sort.Strings(fns)
return fns, nil
}
func ParseIds(fns []string) ([]int, error) {
var ids []int
for _, fn := range fns {
fn = filepath.Base(fn)
ext := filepath.Ext(fn)
if ext != ".data" {
continue
}
id, err := strconv.ParseInt(strings.TrimSuffix(fn, ext), 10, 32)
if err != nil {
return nil, err
}
ids = append(ids, int(id))
}
sort.Ints(ids)
return ids, nil
}

18
internal/version.go Normal file
View File

@@ -0,0 +1,18 @@
package internal
import (
"fmt"
)
var (
// Version release version
Version = "0.0.1"
// Commit will be overwritten automatically by the build system
Commit = "HEAD"
)
// FullVersion returns the full version and commit hash
func FullVersion() string {
return fmt.Sprintf("%s@%s", Version, Commit)
}

15
internal/version_test.go Normal file
View File

@@ -0,0 +1,15 @@
package internal
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestFullVersion(t *testing.T) {
assert := assert.New(t)
expected := fmt.Sprintf("%s@%s", Version, Commit)
assert.Equal(expected, FullVersion())
}