Added in error for Delete and Stop for store

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-01-11 09:36:57 -08:00
parent a7d7b6ff56
commit f1a81454b5
3 changed files with 46 additions and 30 deletions

View File

@@ -436,14 +436,14 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
func (fs *fileStore) enableLastMsgBlockForWriting() error {
mb := fs.lmb
if mb == nil {
return fmt.Errorf("No last message block assigned, can not enable for writing")
return fmt.Errorf("no last message block assigned, can not enable for writing")
}
if mb.mfd != nil {
return nil
}
mfd, err := os.OpenFile(mb.mfn, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return fmt.Errorf("Error opening msg block file [%q]: %v", mb.mfn, err)
return fmt.Errorf("error opening msg block file [%q]: %v", mb.mfn, err)
}
mb.mfd = mfd
return nil
@@ -1142,15 +1142,18 @@ func fileStoreMsgSize(subj string, msg []byte) uint64 {
}
// Lock should be held.
func (fs *fileStore) flushPendingWrites() {
func (fs *fileStore) flushPendingWrites() error {
mb := fs.lmb
if mb == nil || mb.mfd == nil {
return
return fmt.Errorf("filestore does not have last message block")
}
// Append new data to the message block file.
if lbb := fs.wmb.Len(); lbb > 0 {
n, _ := fs.wmb.WriteTo(mb.mfd)
n, err := fs.wmb.WriteTo(mb.mfd)
if err != nil {
fs.removeMsgBlockIndex(mb)
return err
}
if int(n) != lbb {
fs.wmb.Truncate(int(n))
} else if lbb <= maxBufReuse {
@@ -1161,7 +1164,7 @@ func (fs *fileStore) flushPendingWrites() {
}
// Now index info
mb.writeIndexInfo()
return mb.writeIndexInfo()
}
// Write index info to the appropriate file.
@@ -1373,14 +1376,19 @@ func (fs *fileStore) numMsgBlocks() int {
return len(fs.blks)
}
// Removes the msgBlock
// Lock should be held.
func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
func (fs *fileStore) removeMsgBlockIndex(mb *msgBlock) {
if mb.ifd != nil {
mb.ifd.Close()
mb.ifd = nil
}
os.Remove(mb.ifn)
}
// Removes the msgBlock
// Lock should be held.
func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
fs.removeMsgBlockIndex(mb)
if mb.mfd != nil {
mb.mfd.Close()
mb.mfd = nil
@@ -1435,22 +1443,24 @@ func (fs *fileStore) closeLastMsgBlock(sync bool) {
fs.lmb.close(sync)
}
func (fs *fileStore) Delete() {
func (fs *fileStore) Delete() error {
fs.Purge()
fs.Stop()
os.RemoveAll(fs.fcfg.StoreDir)
if err := fs.Stop(); err != nil {
return err
}
return os.RemoveAll(fs.fcfg.StoreDir)
}
func (fs *fileStore) Stop() {
func (fs *fileStore) Stop() error {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return
return nil
}
fs.closed = true
close(fs.qch)
fs.flushPendingWrites()
err := fs.flushPendingWrites()
fs.wmb = &bytes.Buffer{}
fs.lmb = nil
@@ -1473,6 +1483,7 @@ func (fs *fileStore) Stop() {
for _, o := range obs {
o.Stop()
}
return err
}
////////////////////////////////////////////////////////////////////////////////
@@ -1757,11 +1768,11 @@ func (o *observableFileStore) State() (*ObservableState, error) {
}
// Stop the processing of the observable's state.
func (o *observableFileStore) Stop() {
func (o *observableFileStore) Stop() error {
o.mu.Lock()
if o.closed {
o.mu.Unlock()
return
return nil
}
o.closed = true
if o.ifd != nil {
@@ -1772,17 +1783,20 @@ func (o *observableFileStore) Stop() {
fs := o.fs
o.mu.Unlock()
fs.removeObs(o)
return nil
}
// Delete the observable.
func (o *observableFileStore) Delete() {
func (o *observableFileStore) Delete() error {
// Call stop first. OK if already stopped.
o.Stop()
o.mu.Lock()
var err error
if o.odir != "" {
os.RemoveAll(o.odir)
err = os.RemoveAll(o.odir)
}
o.mu.Unlock()
return err
}
func (fs *fileStore) removeObs(obs *observableFileStore) {

View File

@@ -276,12 +276,12 @@ func memStoreMsgSize(subj string, msg []byte) uint64 {
}
// Delete is same as Stop for memory store.
func (ms *memStore) Delete() {
func (ms *memStore) Delete() error {
ms.Purge()
ms.Stop()
return ms.Stop()
}
func (ms *memStore) Stop() {
func (ms *memStore) Stop() error {
ms.mu.Lock()
if ms.ageChk != nil {
ms.ageChk.Stop()
@@ -289,6 +289,7 @@ func (ms *memStore) Stop() {
}
ms.msgs = nil
ms.mu.Unlock()
return nil
}
func (ms *memStore) incObsCount() {
@@ -323,12 +324,13 @@ func (ms *memStore) ObservableStore(_ string, _ *ObservableConfig) (ObservableSt
func (os *observableMemStore) Update(_ *ObservableState) error {
return nil
}
func (os *observableMemStore) Stop() {
func (os *observableMemStore) Stop() error {
os.ms.decObsCount()
return nil
}
func (os *observableMemStore) Delete() {
os.Stop()
func (os *observableMemStore) Delete() error {
return os.Stop()
}
func (os *observableMemStore) State() (*ObservableState, error) { return nil, nil }

View File

@@ -47,8 +47,8 @@ type MsgSetStore interface {
GetSeqFromTime(t time.Time) uint64
StorageBytesUpdate(func(int64))
Stats() MsgSetStats
Delete()
Stop()
Delete() error
Stop() error
ObservableStore(name string, cfg *ObservableConfig) (ObservableStore, error)
}
@@ -64,8 +64,8 @@ type MsgSetStats struct {
type ObservableStore interface {
State() (*ObservableState, error)
Update(*ObservableState) error
Stop()
Delete()
Stop() error
Delete() error
}
// SequencePair has both the observable and the message set sequence. This point to same message.