From 68860b39c3ad64150d3d36b0bde0f92de32bd93f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 2 Nov 2019 18:45:13 -0700 Subject: [PATCH] metadata and checksum storage Signed-off-by: Derek Collison --- server/filestore.go | 71 ++++++++++++++++++--- server/filestore_test.go | 104 ++++++++++++++++++++++++++++++- server/jetstream.go | 4 ++ server/memstore.go | 7 +-- server/msgset.go | 20 +++--- server/observable.go | 6 +- server/store.go | 131 ++++++++++++++++++++++++++++++++++++++- 7 files changed, 314 insertions(+), 29 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 3d4148bc..29f7b038 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -18,6 +18,8 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "encoding/hex" + "encoding/json" "fmt" "hash" "io" @@ -182,16 +184,21 @@ func newFileStore(fcfg FileStoreConfig, cfg MsgSetConfig) (*fileStore, error) { return nil, fmt.Errorf("could not create message storage directory - %v", err) } - // Create highway hash for message blocks. Use 256 hash of directory as key. + // Create highway hash for message blocks. Use sha256 of directory as key. key := sha256.Sum256([]byte(mdir)) fs.hh, err = highwayhash.New64(key[:]) if err != nil { return nil, fmt.Errorf("could not create hash: %v", err) } + // Recover our state. if err := fs.recoverState(); err != nil { return nil, err } + // Write our meta data iff new. + if err := fs.writeMsgSetMeta(); err != nil { + return nil, err + } go fs.flushLoop(fs.fch, fs.qch) @@ -210,6 +217,51 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64) uint64 { } } +// Write out meta and the checksum. +func (fs *fileStore) writeMsgSetMeta() error { + meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile) + if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil { + return err + } + b, err := json.MarshalIndent(fs.cfg, _EMPTY_, " ") + if err != nil { + return err + } + if err := ioutil.WriteFile(meta, b, 0644); err != nil { + return err + } + fs.hh.Reset() + fs.hh.Write(b) + checksum := hex.EncodeToString(fs.hh.Sum(nil)) + sum := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) + if err := ioutil.WriteFile(sum, []byte(checksum), 0644); err != nil { + return err + } + return nil +} + +func (obs *observableFileStore) writeObservableMeta() error { + meta := path.Join(obs.odir, JetStreamMetaFile) + if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil { + return err + } + b, err := json.MarshalIndent(obs.cfg, _EMPTY_, " ") + if err != nil { + return err + } + if err := ioutil.WriteFile(meta, b, 0644); err != nil { + return err + } + obs.hh.Reset() + obs.hh.Write(b) + checksum := hex.EncodeToString(obs.hh.Sum(nil)) + sum := path.Join(obs.odir, JetStreamMetaFileSum) + if err := ioutil.WriteFile(sum, []byte(checksum), 0644); err != nil { + return err + } + return nil +} + func (fs *fileStore) recoverState() error { return fs.recoverMsgs() // FIXME(dlc) - Observables @@ -1396,6 +1448,7 @@ func (fs *fileStore) Stop() { type observableFileStore struct { mu sync.Mutex fs *fileStore + cfg *ObservableConfig name string odir string ifn string @@ -1407,9 +1460,12 @@ type observableFileStore struct { closed bool } -func (fs *fileStore) ObservableStore(name string) (ObservableStore, error) { +func (fs *fileStore) ObservableStore(name string, cfg *ObservableConfig) (ObservableStore, error) { if fs == nil { - return nil, fmt.Errorf("fileStore is nil") + return nil, fmt.Errorf("filestore is nil") + } + if cfg == nil || name == "" { + return nil, fmt.Errorf("bad observable config") } odir := path.Join(fs.fcfg.StoreDir, obsDir, name) if err := os.MkdirAll(odir, 0755); err != nil { @@ -1417,6 +1473,7 @@ func (fs *fileStore) ObservableStore(name string) (ObservableStore, error) { } o := &observableFileStore{ fs: fs, + cfg: cfg, name: name, odir: odir, ifn: path.Join(odir, obsState), @@ -1430,6 +1487,10 @@ func (fs *fileStore) ObservableStore(name string) (ObservableStore, error) { } o.hh = hh + if err := o.writeObservableMeta(); err != nil { + return nil, err + } + fs.mu.Lock() fs.obs = append(fs.obs, o) fs.mu.Unlock() @@ -1662,10 +1723,6 @@ func (o *observableFileStore) State() (*ObservableState, error) { return state, nil } -func (o *observableFileStore) Config() (*ObservableConfig, error) { - return nil, nil -} - func (o *observableFileStore) Stop() { o.mu.Lock() if o.closed { diff --git a/server/filestore_test.go b/server/filestore_test.go index b4907559..dad63009 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -15,6 +15,8 @@ package server import ( "bytes" + "encoding/hex" + "encoding/json" "fmt" "io/ioutil" "math/bits" @@ -690,6 +692,103 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) { } } +func TestFileStoreMeta(t *testing.T) { + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + mconfig := MsgSetConfig{Name: "ZZ-22-33", Storage: FileStorage, Subjects: []string{"foo.*"}, Replicas: 22} + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, mconfig) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + metafile := path.Join(storeDir, JetStreamMetaFile) + metasum := path.Join(storeDir, JetStreamMetaFileSum) + + // Test to make sure meta file and checksum are present. + if _, err := os.Stat(metafile); os.IsNotExist(err) { + t.Fatalf("Expected metafile %q to exist", metafile) + } + if _, err := os.Stat(metasum); os.IsNotExist(err) { + t.Fatalf("Expected metafile's checksum %q to exist", metasum) + } + + buf, err := ioutil.ReadFile(metafile) + if err != nil { + t.Fatalf("Error reading metafile: %v", err) + } + var mconfig2 MsgSetConfig + if err := json.Unmarshal(buf, &mconfig2); err != nil { + t.Fatalf("Error unmarshalling: %v", err) + } + if !reflect.DeepEqual(mconfig, mconfig2) { + t.Fatalf("MsgSet configs not equal, got %+v vs %+v", mconfig2, mconfig) + } + checksum, err := ioutil.ReadFile(metasum) + if err != nil { + t.Fatalf("Error reading metafile checksum: %v", err) + } + fs.hh.Reset() + fs.hh.Write(buf) + mychecksum := hex.EncodeToString(fs.hh.Sum(nil)) + if mychecksum != string(checksum) { + t.Fatalf("Checksums do not match, got %q vs %q", mychecksum, checksum) + } + + // Now create an observable. Same deal for them. + oconfig := ObservableConfig{ + Delivery: "d", + DeliverAll: true, + Partition: "foo", + AckPolicy: AckAll, + } + oname := "obs22" + obs, err := fs.ObservableStore(oname, &oconfig) + if err != nil { + t.Fatalf("Unexepected error: %v", err) + } + + ometafile := path.Join(storeDir, obsDir, oname, JetStreamMetaFile) + ometasum := path.Join(storeDir, obsDir, oname, JetStreamMetaFileSum) + + // Test to make sure meta file and checksum are present. + if _, err := os.Stat(ometafile); os.IsNotExist(err) { + t.Fatalf("Expected observable metafile %q to exist", ometafile) + } + if _, err := os.Stat(ometasum); os.IsNotExist(err) { + t.Fatalf("Expected observable metafile's checksum %q to exist", ometasum) + } + + buf, err = ioutil.ReadFile(ometafile) + if err != nil { + t.Fatalf("Error reading observable metafile: %v", err) + } + + var oconfig2 ObservableConfig + if err := json.Unmarshal(buf, &oconfig2); err != nil { + t.Fatalf("Error unmarshalling: %v", err) + } + if oconfig2 != oconfig { + //if !reflect.DeepEqual(oconfig, oconfig2) { + t.Fatalf("Observable configs not equal, got %+v vs %+v", oconfig2, oconfig) + } + checksum, err = ioutil.ReadFile(ometasum) + + if err != nil { + t.Fatalf("Error reading observable metafile checksum: %v", err) + } + + hh := obs.(*observableFileStore).hh + hh.Reset() + hh.Write(buf) + mychecksum = hex.EncodeToString(hh.Sum(nil)) + if mychecksum != string(checksum) { + t.Fatalf("Checksums do not match, got %q vs %q", mychecksum, checksum) + } +} + func TestFileStoreCollapseDmap(t *testing.T) { storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) os.MkdirAll(storeDir, 0755) @@ -822,7 +921,7 @@ func TestFileStoreObservable(t *testing.T) { } defer fs.Stop() - o, err := fs.ObservableStore("obs22") + o, err := fs.ObservableStore("obs22", &ObservableConfig{}) if err != nil { t.Fatalf("Unexepected error: %v", err) } @@ -1060,7 +1159,7 @@ func TestFileStoreObservablesPerf(t *testing.T) { defer fs.Stop() // Test Observables. - o, err := fs.ObservableStore("obs22") + o, err := fs.ObservableStore("obs22", &ObservableConfig{}) if err != nil { t.Fatalf("Unexepected error: %v", err) } @@ -1108,5 +1207,4 @@ func TestFileStoreObservablesPerf(t *testing.T) { tt = time.Since(start) fmt.Printf("time is %v\n", tt) fmt.Printf("%.0f updates/sec\n", float64(toStore)/tt.Seconds()) - } diff --git a/server/jetstream.go b/server/jetstream.go index 2ff01f2e..7b1802c9 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -100,6 +100,10 @@ const ( // JetStreamRequestNextPre is the prefix for the request next message(s) for an observable in worker/pull mode. JetStreamRequestNextPre = "$JS.RN" + + // Metafiles for message sets and observables. + JetStreamMetaFile = "meta.inf" + JetStreamMetaFileSum = "meta.sum" ) // For easier handling of exports and imports. diff --git a/server/memstore.go b/server/memstore.go index 819ea2ed..3425ae85 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -284,7 +284,7 @@ func (ms *memStore) Stop() { type observableMemStore struct{} -func (ms *memStore) ObservableStore(_ string) (ObservableStore, error) { +func (ms *memStore) ObservableStore(_ string, _ *ObservableConfig) (ObservableStore, error) { return &observableMemStore{}, nil } @@ -292,6 +292,5 @@ func (ms *memStore) ObservableStore(_ string) (ObservableStore, error) { func (os *observableMemStore) Update(_ *ObservableState) error { return nil } -func (os *observableMemStore) Stop() {} -func (os *observableMemStore) State() (*ObservableState, error) { return nil, nil } -func (os *observableMemStore) Config() (*ObservableConfig, error) { return nil, nil } +func (os *observableMemStore) Stop() {} +func (os *observableMemStore) State() (*ObservableState, error) { return nil, nil } diff --git a/server/msgset.go b/server/msgset.go index 2eff002a..6babd052 100644 --- a/server/msgset.go +++ b/server/msgset.go @@ -24,16 +24,16 @@ import ( // MsgSetConfig will determine the name, subjects and retention policy // for a given message set. If subjects is empty the name will be used. type MsgSetConfig struct { - Name string - Subjects []string - Retention RetentionPolicy - MaxObservables int - MaxMsgs int64 - MaxBytes int64 - MaxAge time.Duration - Storage StorageType - Replicas int - NoAck bool + Name string `json:"name"` + Subjects []string `json:"subjects,omitempty"` + Retention RetentionPolicy `json:"retention"` + MaxObservables int `json:"max_observables"` + MaxMsgs int64 `json:"max_msgs"` + MaxBytes int64 `json:"max_bytes"` + MaxAge time.Duration `json:"max_age"` + Storage StorageType `json:"storage"` + Replicas int `json:"num_replicas"` + NoAck bool `json:"no_ack,omitempty"` } // RetentionPolicy determines how messages in a set are retained. diff --git a/server/observable.go b/server/observable.go index 5d3e74ab..6f9039d3 100644 --- a/server/observable.go +++ b/server/observable.go @@ -32,8 +32,8 @@ type ObservableConfig struct { DeliverAll bool `json:"deliver_all,omitempty"` DeliverLast bool `json:"deliver_last,omitempty"` AckPolicy AckPolicy `json:"ack_policy"` - AckWait time.Duration `json:"ack_wait"` - Partition string `json:"partition"` + AckWait time.Duration `json:"ack_wait,omitempty"` + Partition string `json:"partition,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` } @@ -204,7 +204,7 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) o.name = createObservableName() } - store, err := mset.store.ObservableStore(o.name) + store, err := mset.store.ObservableStore(o.name, config) if err != nil { mset.mu.Unlock() return nil, fmt.Errorf("error creating store for observable: %v", err) diff --git a/server/store.go b/server/store.go index 979fda16..5bbf3e09 100644 --- a/server/store.go +++ b/server/store.go @@ -14,7 +14,9 @@ package server import ( + "encoding/json" "errors" + "fmt" "time" ) @@ -39,7 +41,7 @@ type MsgSetStore interface { Stats() MsgSetStats Delete() Stop() - ObservableStore(name string) (ObservableStore, error) + ObservableStore(name string, cfg *ObservableConfig) (ObservableStore, error) } // MsgSetStats are stats about this given message set. @@ -52,7 +54,6 @@ type MsgSetStats struct { type ObservableStore interface { State() (*ObservableState, error) - Config() (*ObservableConfig, error) Update(*ObservableState) error Stop() } @@ -77,6 +78,132 @@ type ObservableState struct { Redelivery map[uint64]uint64 } +func jsonString(s string) string { + return "\"" + s + "\"" +} + +const ( + streamPolicyString = "stream_limits" + interestPolicyString = "interest_based" + workQueuePolicyString = "work_queue" +) + +func (rp RetentionPolicy) MarshalJSON() ([]byte, error) { + switch rp { + case StreamPolicy: + return json.Marshal(streamPolicyString) + case InterestPolicy: + return json.Marshal(interestPolicyString) + case WorkQueuePolicy: + return json.Marshal(workQueuePolicyString) + default: + return nil, fmt.Errorf("can not marshal %v", rp) + } +} + +func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString(streamPolicyString): + *rp = StreamPolicy + case jsonString(interestPolicyString): + *rp = InterestPolicy + case jsonString(workQueuePolicyString): + *rp = WorkQueuePolicy + default: + return fmt.Errorf("can not unmarshal %q", data) + } + return nil +} + +const ( + memoryStorageString = "memory" + fileStorageString = "file" +) + +func (st StorageType) MarshalJSON() ([]byte, error) { + switch st { + case MemoryStorage: + return json.Marshal(memoryStorageString) + case FileStorage: + return json.Marshal(fileStorageString) + default: + return nil, fmt.Errorf("can not marshal %v", st) + } +} + +func (st *StorageType) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString(memoryStorageString): + *st = MemoryStorage + case jsonString(fileStorageString): + *st = FileStorage + default: + return fmt.Errorf("can not unmarshal %q", data) + } + return nil +} + +const ( + ackNonePolicyString = "none" + ackAllPolicyString = "all" + ackExplicitPolicyString = "explicit" +) + +func (ap AckPolicy) MarshalJSON() ([]byte, error) { + switch ap { + case AckNone: + return json.Marshal(ackNonePolicyString) + case AckAll: + return json.Marshal(ackAllPolicyString) + case AckExplicit: + return json.Marshal(ackExplicitPolicyString) + default: + return nil, fmt.Errorf("can not marshal %v", ap) + } +} + +func (ap *AckPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString(ackNonePolicyString): + *ap = AckNone + case jsonString(ackAllPolicyString): + *ap = AckAll + case jsonString(ackExplicitPolicyString): + *ap = AckExplicit + default: + return fmt.Errorf("can not unmarshal %q", data) + } + return nil +} + +const ( + replayInstantPolicyString = "instant" + replayOriginalPolicyString = "original" +) + +func (rp ReplayPolicy) MarshalJSON() ([]byte, error) { + switch rp { + case ReplayInstant: + return json.Marshal(replayInstantPolicyString) + case ReplayOriginal: + return json.Marshal(replayOriginalPolicyString) + default: + return nil, fmt.Errorf("can not marshal %v", rp) + } +} + +func (rp *ReplayPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString(replayInstantPolicyString): + *rp = ReplayInstant + case jsonString(replayOriginalPolicyString): + *rp = ReplayOriginal + default: + return fmt.Errorf("can not unmarshal %q", data) + } + return nil +} + var ( ErrStoreMsgNotFound = errors.New("no message found") )