mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
metadata and checksum storage
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -18,6 +18,8 @@ import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
@@ -182,16 +184,21 @@ func newFileStore(fcfg FileStoreConfig, cfg MsgSetConfig) (*fileStore, error) {
|
||||
return nil, fmt.Errorf("could not create message storage directory - %v", err)
|
||||
}
|
||||
|
||||
// Create highway hash for message blocks. Use 256 hash of directory as key.
|
||||
// Create highway hash for message blocks. Use sha256 of directory as key.
|
||||
key := sha256.Sum256([]byte(mdir))
|
||||
fs.hh, err = highwayhash.New64(key[:])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create hash: %v", err)
|
||||
}
|
||||
|
||||
// Recover our state.
|
||||
if err := fs.recoverState(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Write our meta data iff new.
|
||||
if err := fs.writeMsgSetMeta(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go fs.flushLoop(fs.fch, fs.qch)
|
||||
|
||||
@@ -210,6 +217,51 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64) uint64 {
|
||||
}
|
||||
}
|
||||
|
||||
// Write out meta and the checksum.
|
||||
func (fs *fileStore) writeMsgSetMeta() error {
|
||||
meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
|
||||
if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil {
|
||||
return err
|
||||
}
|
||||
b, err := json.MarshalIndent(fs.cfg, _EMPTY_, " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ioutil.WriteFile(meta, b, 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
fs.hh.Reset()
|
||||
fs.hh.Write(b)
|
||||
checksum := hex.EncodeToString(fs.hh.Sum(nil))
|
||||
sum := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
|
||||
if err := ioutil.WriteFile(sum, []byte(checksum), 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (obs *observableFileStore) writeObservableMeta() error {
|
||||
meta := path.Join(obs.odir, JetStreamMetaFile)
|
||||
if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil {
|
||||
return err
|
||||
}
|
||||
b, err := json.MarshalIndent(obs.cfg, _EMPTY_, " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ioutil.WriteFile(meta, b, 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
obs.hh.Reset()
|
||||
obs.hh.Write(b)
|
||||
checksum := hex.EncodeToString(obs.hh.Sum(nil))
|
||||
sum := path.Join(obs.odir, JetStreamMetaFileSum)
|
||||
if err := ioutil.WriteFile(sum, []byte(checksum), 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *fileStore) recoverState() error {
|
||||
return fs.recoverMsgs()
|
||||
// FIXME(dlc) - Observables
|
||||
@@ -1396,6 +1448,7 @@ func (fs *fileStore) Stop() {
|
||||
type observableFileStore struct {
|
||||
mu sync.Mutex
|
||||
fs *fileStore
|
||||
cfg *ObservableConfig
|
||||
name string
|
||||
odir string
|
||||
ifn string
|
||||
@@ -1407,9 +1460,12 @@ type observableFileStore struct {
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (fs *fileStore) ObservableStore(name string) (ObservableStore, error) {
|
||||
func (fs *fileStore) ObservableStore(name string, cfg *ObservableConfig) (ObservableStore, error) {
|
||||
if fs == nil {
|
||||
return nil, fmt.Errorf("fileStore is nil")
|
||||
return nil, fmt.Errorf("filestore is nil")
|
||||
}
|
||||
if cfg == nil || name == "" {
|
||||
return nil, fmt.Errorf("bad observable config")
|
||||
}
|
||||
odir := path.Join(fs.fcfg.StoreDir, obsDir, name)
|
||||
if err := os.MkdirAll(odir, 0755); err != nil {
|
||||
@@ -1417,6 +1473,7 @@ func (fs *fileStore) ObservableStore(name string) (ObservableStore, error) {
|
||||
}
|
||||
o := &observableFileStore{
|
||||
fs: fs,
|
||||
cfg: cfg,
|
||||
name: name,
|
||||
odir: odir,
|
||||
ifn: path.Join(odir, obsState),
|
||||
@@ -1430,6 +1487,10 @@ func (fs *fileStore) ObservableStore(name string) (ObservableStore, error) {
|
||||
}
|
||||
o.hh = hh
|
||||
|
||||
if err := o.writeObservableMeta(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fs.mu.Lock()
|
||||
fs.obs = append(fs.obs, o)
|
||||
fs.mu.Unlock()
|
||||
@@ -1662,10 +1723,6 @@ func (o *observableFileStore) State() (*ObservableState, error) {
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func (o *observableFileStore) Config() (*ObservableConfig, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (o *observableFileStore) Stop() {
|
||||
o.mu.Lock()
|
||||
if o.closed {
|
||||
|
||||
@@ -15,6 +15,8 @@ package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/bits"
|
||||
@@ -690,6 +692,103 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileStoreMeta(t *testing.T) {
|
||||
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
|
||||
os.MkdirAll(storeDir, 0755)
|
||||
defer os.RemoveAll(storeDir)
|
||||
|
||||
mconfig := MsgSetConfig{Name: "ZZ-22-33", Storage: FileStorage, Subjects: []string{"foo.*"}, Replicas: 22}
|
||||
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, mconfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
metafile := path.Join(storeDir, JetStreamMetaFile)
|
||||
metasum := path.Join(storeDir, JetStreamMetaFileSum)
|
||||
|
||||
// Test to make sure meta file and checksum are present.
|
||||
if _, err := os.Stat(metafile); os.IsNotExist(err) {
|
||||
t.Fatalf("Expected metafile %q to exist", metafile)
|
||||
}
|
||||
if _, err := os.Stat(metasum); os.IsNotExist(err) {
|
||||
t.Fatalf("Expected metafile's checksum %q to exist", metasum)
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadFile(metafile)
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading metafile: %v", err)
|
||||
}
|
||||
var mconfig2 MsgSetConfig
|
||||
if err := json.Unmarshal(buf, &mconfig2); err != nil {
|
||||
t.Fatalf("Error unmarshalling: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(mconfig, mconfig2) {
|
||||
t.Fatalf("MsgSet configs not equal, got %+v vs %+v", mconfig2, mconfig)
|
||||
}
|
||||
checksum, err := ioutil.ReadFile(metasum)
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading metafile checksum: %v", err)
|
||||
}
|
||||
fs.hh.Reset()
|
||||
fs.hh.Write(buf)
|
||||
mychecksum := hex.EncodeToString(fs.hh.Sum(nil))
|
||||
if mychecksum != string(checksum) {
|
||||
t.Fatalf("Checksums do not match, got %q vs %q", mychecksum, checksum)
|
||||
}
|
||||
|
||||
// Now create an observable. Same deal for them.
|
||||
oconfig := ObservableConfig{
|
||||
Delivery: "d",
|
||||
DeliverAll: true,
|
||||
Partition: "foo",
|
||||
AckPolicy: AckAll,
|
||||
}
|
||||
oname := "obs22"
|
||||
obs, err := fs.ObservableStore(oname, &oconfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexepected error: %v", err)
|
||||
}
|
||||
|
||||
ometafile := path.Join(storeDir, obsDir, oname, JetStreamMetaFile)
|
||||
ometasum := path.Join(storeDir, obsDir, oname, JetStreamMetaFileSum)
|
||||
|
||||
// Test to make sure meta file and checksum are present.
|
||||
if _, err := os.Stat(ometafile); os.IsNotExist(err) {
|
||||
t.Fatalf("Expected observable metafile %q to exist", ometafile)
|
||||
}
|
||||
if _, err := os.Stat(ometasum); os.IsNotExist(err) {
|
||||
t.Fatalf("Expected observable metafile's checksum %q to exist", ometasum)
|
||||
}
|
||||
|
||||
buf, err = ioutil.ReadFile(ometafile)
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading observable metafile: %v", err)
|
||||
}
|
||||
|
||||
var oconfig2 ObservableConfig
|
||||
if err := json.Unmarshal(buf, &oconfig2); err != nil {
|
||||
t.Fatalf("Error unmarshalling: %v", err)
|
||||
}
|
||||
if oconfig2 != oconfig {
|
||||
//if !reflect.DeepEqual(oconfig, oconfig2) {
|
||||
t.Fatalf("Observable configs not equal, got %+v vs %+v", oconfig2, oconfig)
|
||||
}
|
||||
checksum, err = ioutil.ReadFile(ometasum)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading observable metafile checksum: %v", err)
|
||||
}
|
||||
|
||||
hh := obs.(*observableFileStore).hh
|
||||
hh.Reset()
|
||||
hh.Write(buf)
|
||||
mychecksum = hex.EncodeToString(hh.Sum(nil))
|
||||
if mychecksum != string(checksum) {
|
||||
t.Fatalf("Checksums do not match, got %q vs %q", mychecksum, checksum)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileStoreCollapseDmap(t *testing.T) {
|
||||
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
|
||||
os.MkdirAll(storeDir, 0755)
|
||||
@@ -822,7 +921,7 @@ func TestFileStoreObservable(t *testing.T) {
|
||||
}
|
||||
defer fs.Stop()
|
||||
|
||||
o, err := fs.ObservableStore("obs22")
|
||||
o, err := fs.ObservableStore("obs22", &ObservableConfig{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexepected error: %v", err)
|
||||
}
|
||||
@@ -1060,7 +1159,7 @@ func TestFileStoreObservablesPerf(t *testing.T) {
|
||||
defer fs.Stop()
|
||||
|
||||
// Test Observables.
|
||||
o, err := fs.ObservableStore("obs22")
|
||||
o, err := fs.ObservableStore("obs22", &ObservableConfig{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexepected error: %v", err)
|
||||
}
|
||||
@@ -1108,5 +1207,4 @@ func TestFileStoreObservablesPerf(t *testing.T) {
|
||||
tt = time.Since(start)
|
||||
fmt.Printf("time is %v\n", tt)
|
||||
fmt.Printf("%.0f updates/sec\n", float64(toStore)/tt.Seconds())
|
||||
|
||||
}
|
||||
|
||||
@@ -100,6 +100,10 @@ const (
|
||||
|
||||
// JetStreamRequestNextPre is the prefix for the request next message(s) for an observable in worker/pull mode.
|
||||
JetStreamRequestNextPre = "$JS.RN"
|
||||
|
||||
// Metafiles for message sets and observables.
|
||||
JetStreamMetaFile = "meta.inf"
|
||||
JetStreamMetaFileSum = "meta.sum"
|
||||
)
|
||||
|
||||
// For easier handling of exports and imports.
|
||||
|
||||
@@ -284,7 +284,7 @@ func (ms *memStore) Stop() {
|
||||
|
||||
type observableMemStore struct{}
|
||||
|
||||
func (ms *memStore) ObservableStore(_ string) (ObservableStore, error) {
|
||||
func (ms *memStore) ObservableStore(_ string, _ *ObservableConfig) (ObservableStore, error) {
|
||||
return &observableMemStore{}, nil
|
||||
}
|
||||
|
||||
@@ -292,6 +292,5 @@ func (ms *memStore) ObservableStore(_ string) (ObservableStore, error) {
|
||||
func (os *observableMemStore) Update(_ *ObservableState) error {
|
||||
return nil
|
||||
}
|
||||
func (os *observableMemStore) Stop() {}
|
||||
func (os *observableMemStore) State() (*ObservableState, error) { return nil, nil }
|
||||
func (os *observableMemStore) Config() (*ObservableConfig, error) { return nil, nil }
|
||||
func (os *observableMemStore) Stop() {}
|
||||
func (os *observableMemStore) State() (*ObservableState, error) { return nil, nil }
|
||||
|
||||
@@ -24,16 +24,16 @@ import (
|
||||
// MsgSetConfig will determine the name, subjects and retention policy
|
||||
// for a given message set. If subjects is empty the name will be used.
|
||||
type MsgSetConfig struct {
|
||||
Name string
|
||||
Subjects []string
|
||||
Retention RetentionPolicy
|
||||
MaxObservables int
|
||||
MaxMsgs int64
|
||||
MaxBytes int64
|
||||
MaxAge time.Duration
|
||||
Storage StorageType
|
||||
Replicas int
|
||||
NoAck bool
|
||||
Name string `json:"name"`
|
||||
Subjects []string `json:"subjects,omitempty"`
|
||||
Retention RetentionPolicy `json:"retention"`
|
||||
MaxObservables int `json:"max_observables"`
|
||||
MaxMsgs int64 `json:"max_msgs"`
|
||||
MaxBytes int64 `json:"max_bytes"`
|
||||
MaxAge time.Duration `json:"max_age"`
|
||||
Storage StorageType `json:"storage"`
|
||||
Replicas int `json:"num_replicas"`
|
||||
NoAck bool `json:"no_ack,omitempty"`
|
||||
}
|
||||
|
||||
// RetentionPolicy determines how messages in a set are retained.
|
||||
|
||||
@@ -32,8 +32,8 @@ type ObservableConfig struct {
|
||||
DeliverAll bool `json:"deliver_all,omitempty"`
|
||||
DeliverLast bool `json:"deliver_last,omitempty"`
|
||||
AckPolicy AckPolicy `json:"ack_policy"`
|
||||
AckWait time.Duration `json:"ack_wait"`
|
||||
Partition string `json:"partition"`
|
||||
AckWait time.Duration `json:"ack_wait,omitempty"`
|
||||
Partition string `json:"partition,omitempty"`
|
||||
ReplayPolicy ReplayPolicy `json:"replay_policy"`
|
||||
}
|
||||
|
||||
@@ -204,7 +204,7 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
|
||||
o.name = createObservableName()
|
||||
}
|
||||
|
||||
store, err := mset.store.ObservableStore(o.name)
|
||||
store, err := mset.store.ObservableStore(o.name, config)
|
||||
if err != nil {
|
||||
mset.mu.Unlock()
|
||||
return nil, fmt.Errorf("error creating store for observable: %v", err)
|
||||
|
||||
131
server/store.go
131
server/store.go
@@ -14,7 +14,9 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -39,7 +41,7 @@ type MsgSetStore interface {
|
||||
Stats() MsgSetStats
|
||||
Delete()
|
||||
Stop()
|
||||
ObservableStore(name string) (ObservableStore, error)
|
||||
ObservableStore(name string, cfg *ObservableConfig) (ObservableStore, error)
|
||||
}
|
||||
|
||||
// MsgSetStats are stats about this given message set.
|
||||
@@ -52,7 +54,6 @@ type MsgSetStats struct {
|
||||
|
||||
type ObservableStore interface {
|
||||
State() (*ObservableState, error)
|
||||
Config() (*ObservableConfig, error)
|
||||
Update(*ObservableState) error
|
||||
Stop()
|
||||
}
|
||||
@@ -77,6 +78,132 @@ type ObservableState struct {
|
||||
Redelivery map[uint64]uint64
|
||||
}
|
||||
|
||||
func jsonString(s string) string {
|
||||
return "\"" + s + "\""
|
||||
}
|
||||
|
||||
const (
|
||||
streamPolicyString = "stream_limits"
|
||||
interestPolicyString = "interest_based"
|
||||
workQueuePolicyString = "work_queue"
|
||||
)
|
||||
|
||||
func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
|
||||
switch rp {
|
||||
case StreamPolicy:
|
||||
return json.Marshal(streamPolicyString)
|
||||
case InterestPolicy:
|
||||
return json.Marshal(interestPolicyString)
|
||||
case WorkQueuePolicy:
|
||||
return json.Marshal(workQueuePolicyString)
|
||||
default:
|
||||
return nil, fmt.Errorf("can not marshal %v", rp)
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
|
||||
switch string(data) {
|
||||
case jsonString(streamPolicyString):
|
||||
*rp = StreamPolicy
|
||||
case jsonString(interestPolicyString):
|
||||
*rp = InterestPolicy
|
||||
case jsonString(workQueuePolicyString):
|
||||
*rp = WorkQueuePolicy
|
||||
default:
|
||||
return fmt.Errorf("can not unmarshal %q", data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
memoryStorageString = "memory"
|
||||
fileStorageString = "file"
|
||||
)
|
||||
|
||||
func (st StorageType) MarshalJSON() ([]byte, error) {
|
||||
switch st {
|
||||
case MemoryStorage:
|
||||
return json.Marshal(memoryStorageString)
|
||||
case FileStorage:
|
||||
return json.Marshal(fileStorageString)
|
||||
default:
|
||||
return nil, fmt.Errorf("can not marshal %v", st)
|
||||
}
|
||||
}
|
||||
|
||||
func (st *StorageType) UnmarshalJSON(data []byte) error {
|
||||
switch string(data) {
|
||||
case jsonString(memoryStorageString):
|
||||
*st = MemoryStorage
|
||||
case jsonString(fileStorageString):
|
||||
*st = FileStorage
|
||||
default:
|
||||
return fmt.Errorf("can not unmarshal %q", data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
ackNonePolicyString = "none"
|
||||
ackAllPolicyString = "all"
|
||||
ackExplicitPolicyString = "explicit"
|
||||
)
|
||||
|
||||
func (ap AckPolicy) MarshalJSON() ([]byte, error) {
|
||||
switch ap {
|
||||
case AckNone:
|
||||
return json.Marshal(ackNonePolicyString)
|
||||
case AckAll:
|
||||
return json.Marshal(ackAllPolicyString)
|
||||
case AckExplicit:
|
||||
return json.Marshal(ackExplicitPolicyString)
|
||||
default:
|
||||
return nil, fmt.Errorf("can not marshal %v", ap)
|
||||
}
|
||||
}
|
||||
|
||||
func (ap *AckPolicy) UnmarshalJSON(data []byte) error {
|
||||
switch string(data) {
|
||||
case jsonString(ackNonePolicyString):
|
||||
*ap = AckNone
|
||||
case jsonString(ackAllPolicyString):
|
||||
*ap = AckAll
|
||||
case jsonString(ackExplicitPolicyString):
|
||||
*ap = AckExplicit
|
||||
default:
|
||||
return fmt.Errorf("can not unmarshal %q", data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
replayInstantPolicyString = "instant"
|
||||
replayOriginalPolicyString = "original"
|
||||
)
|
||||
|
||||
func (rp ReplayPolicy) MarshalJSON() ([]byte, error) {
|
||||
switch rp {
|
||||
case ReplayInstant:
|
||||
return json.Marshal(replayInstantPolicyString)
|
||||
case ReplayOriginal:
|
||||
return json.Marshal(replayOriginalPolicyString)
|
||||
default:
|
||||
return nil, fmt.Errorf("can not marshal %v", rp)
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *ReplayPolicy) UnmarshalJSON(data []byte) error {
|
||||
switch string(data) {
|
||||
case jsonString(replayInstantPolicyString):
|
||||
*rp = ReplayInstant
|
||||
case jsonString(replayOriginalPolicyString):
|
||||
*rp = ReplayOriginal
|
||||
default:
|
||||
return fmt.Errorf("can not unmarshal %q", data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
ErrStoreMsgNotFound = errors.New("no message found")
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user