Merge pull request #2180 from nats-io/fs_updates

[IMPROVED] Stability when expireMsgs and writeMsg and Compact/Purge concurrent.
This commit is contained in:
Derek Collison
2021-05-04 20:43:41 -07:00
committed by GitHub
2 changed files with 122 additions and 104 deletions

View File

@@ -65,25 +65,24 @@ type FileConsumerInfo struct {
}
type fileStore struct {
mu sync.RWMutex
state StreamState
ld *LostStreamData
scb StorageUpdateHandler
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
fcfg FileStoreConfig
lmb *msgBlock
blks []*msgBlock
hh hash.Hash64
qch chan struct{}
cfs []*consumerFileStore
fsi map[string]seqSlice
fsis *simpleState
closed bool
expiring bool
fip bool
sips int
mu sync.RWMutex
state StreamState
ld *LostStreamData
scb StorageUpdateHandler
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
fcfg FileStoreConfig
lmb *msgBlock
blks []*msgBlock
hh hash.Hash64
qch chan struct{}
cfs []*consumerFileStore
fsi map[string]seqSlice
fsis *simpleState
closed bool
fip bool
sips int
}
// Represents a message store block and its data.
@@ -203,7 +202,7 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
}
func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time) (*fileStore, error) {
if cfg.Name == "" {
if cfg.Name == _EMPTY_ {
return nil, fmt.Errorf("name required")
}
if cfg.Storage != FileStorage {
@@ -299,7 +298,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
return ErrStoreClosed
}
if cfg.Name == "" {
if cfg.Name == _EMPTY_ {
return fmt.Errorf("name required")
}
if cfg.Storage != FileStorage {
@@ -315,9 +314,11 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.mu.Unlock()
return err
}
// Limits checks and enforcement.
fs.enforceMsgLimit()
fs.enforceBytesLimit()
// Do age timers.
if fs.ageChk == nil && fs.cfg.MaxAge != 0 {
fs.startAgeChk()
@@ -848,12 +849,15 @@ func (mb *msgBlock) setupWriteCache(buf []byte) {
if mb.cache != nil {
return
}
if buf != nil {
buf = buf[:0]
}
mb.cache = &cache{buf: buf}
// Make sure we set the proper cache offset if we have existing data.
var fi os.FileInfo
if mb.mfd != nil {
fi, _ = mb.mfd.Stat()
} else {
} else if mb.mfn != _EMPTY_ {
fi, _ = os.Stat(mb.mfn)
}
if fi != nil {
@@ -1132,7 +1136,7 @@ func (fs *fileStore) enforceMsgLimit() {
return
}
for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs {
if removed, err := fs.deleteFirstMsgLocked(); err != nil || !removed {
if removed, err := fs.deleteFirstMsg(); err != nil || !removed {
fs.rebuildFirst()
return
}
@@ -1146,28 +1150,20 @@ func (fs *fileStore) enforceBytesLimit() {
return
}
for bs := fs.state.Bytes; bs > uint64(fs.cfg.MaxBytes); bs = fs.state.Bytes {
if removed, err := fs.deleteFirstMsgLocked(); err != nil || !removed {
if removed, err := fs.deleteFirstMsg(); err != nil || !removed {
fs.rebuildFirst()
return
}
}
}
// Lock should be held but will be released during actual remove.
func (fs *fileStore) deleteFirstMsgLocked() (bool, error) {
// Lock should be held on entry but will be released during actual remove.
func (fs *fileStore) deleteFirstMsg() (bool, error) {
fs.mu.Unlock()
defer fs.mu.Lock()
return fs.removeMsg(fs.state.FirstSeq, false)
}
// Lock should NOT be held.
func (fs *fileStore) deleteFirstMsg() (bool, error) {
fs.mu.RLock()
seq := fs.state.FirstSeq
fs.mu.RUnlock()
return fs.removeMsg(seq, false)
}
// RemoveMsg will remove the message from this store.
// Will return the number of bytes removed.
func (fs *fileStore) RemoveMsg(seq uint64) (bool, error) {
@@ -1455,7 +1451,7 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) {
waiting = newWaiting
ts *= 2
}
mb.flushPendingMsgs()
mb.flushPendingMsgsAndWait()
// Check if we are no longer the last message block. If we are
// not we can close FDs and exit.
mb.fs.mu.RLock()
@@ -1770,48 +1766,44 @@ func (fs *fileStore) expireMsgsLocked() {
fs.mu.Lock()
}
// Lock should be held.
func (fs *fileStore) resetAgeChk(delta int64) {
fireIn := fs.cfg.MaxAge
if delta > 0 {
fireIn = time.Duration(delta)
}
if fs.ageChk != nil {
fs.ageChk.Reset(fireIn)
} else {
fs.ageChk = time.AfterFunc(fireIn, fs.expireMsgs)
}
}
// Lock should be held.
func (fs *fileStore) cancelAgeChk() {
if fs.ageChk != nil {
fs.ageChk.Stop()
fs.ageChk = nil
}
}
// Will expire msgs that are too old.
func (fs *fileStore) expireMsgs() {
// Make sure this is only running one at a time.
fs.mu.Lock()
if fs.expiring {
fs.mu.Unlock()
return
// We need to delete one by one here and can not optimize for the time being.
// Reason is that we need more information to adjust ack pending in consumers.
var sm *fileStoredMsg
minAge := time.Now().UnixNano() - int64(fs.cfg.MaxAge)
for sm, _ = fs.msgForSeq(0); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0) {
fs.removeMsg(sm.seq, false)
}
fs.expiring = true
fs.mu.Unlock()
defer func() {
fs.mu.Lock()
fs.expiring = false
fs.mu.Unlock()
}()
fs.mu.Lock()
defer fs.mu.Unlock()
now := time.Now().UnixNano()
minAge := now - int64(fs.cfg.MaxAge)
for {
sm, _ := fs.msgForSeq(0)
if sm != nil && sm.ts <= minAge {
fs.deleteFirstMsg()
} else {
fs.mu.Lock()
if sm == nil {
if fs.ageChk != nil {
fs.ageChk.Stop()
fs.ageChk = nil
}
} else {
fireIn := time.Duration(sm.ts-now) + fs.cfg.MaxAge
if fs.ageChk != nil {
fs.ageChk.Reset(fireIn)
} else {
fs.ageChk = time.AfterFunc(fireIn, fs.expireMsgs)
}
}
fs.mu.Unlock()
return
}
if sm == nil {
fs.cancelAgeChk()
} else {
fs.resetAgeChk(sm.ts - minAge)
}
}
@@ -1922,7 +1914,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// If we should be flushing in place do so here. We will also flip to flushing in place if we
// had a write error.
if flush || werr != nil {
if err := mb.flushPendingMsgs(); err != nil && err != errFlushRunning && err != errNoPending {
if err := mb.flushPendingMsgsAndWait(); err != nil {
return err
}
if writeIndex {
@@ -1952,8 +1944,10 @@ func (mb *msgBlock) pendingWriteSize() int {
return pending
}
// Lock should be held.
// Lock should NOT be held.
func (mb *msgBlock) clearFlushing() {
mb.mu.Lock()
defer mb.mu.Unlock()
if mb.cache != nil {
mb.cache.flush = false
}
@@ -2206,14 +2200,22 @@ func (mb *msgBlock) quitChan() chan struct{} {
// This function is called for in place flushing so we need to wait.
func (mb *msgBlock) flushPendingMsgsAndWait() error {
var err error
var t *time.Timer
const delay = time.Millisecond
// If we are in flush wait for that to clear.
for err = mb.flushPendingMsgs(); err == errFlushRunning; err = mb.flushPendingMsgs() {
qch := mb.quitChan()
if t == nil {
t = time.NewTimer(delay)
defer t.Stop()
} else {
t.Reset(delay)
}
select {
case <-qch:
return nil
case <-time.After(time.Millisecond):
case <-t.C:
}
}
return err
@@ -2236,7 +2238,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
if err != nil {
mb.mu.Unlock()
// No pending data to be written is not an error.
if err == errNoPending {
if err == errNoPending || err == errNoCache {
err = nil
}
return err
@@ -2247,6 +2249,9 @@ func (mb *msgBlock) flushPendingMsgs() error {
// Only one can be flushing at a time.
mb.setFlushing()
// Clear on exit.
defer mb.clearFlushing()
mfd := mb.mfd
mb.mu.Unlock()
@@ -2272,20 +2277,20 @@ func (mb *msgBlock) flushPendingMsgs() error {
woff += int64(n)
tn += n
// Success
if n == lbb {
// Partial write.
if n != lbb {
buf = buf[n:]
} else {
// Done.
break
}
// Partial write..
buf = buf[n:]
}
// We did a successful write.
// Re-acquire lock to update.
mb.mu.Lock()
defer mb.mu.Unlock()
// Clear on exit.
defer mb.clearFlushing()
// set write err to any error.
mb.werr = err
@@ -2629,7 +2634,7 @@ func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, []byte, int64, error)
if sm != nil {
return sm.subj, sm.hdr, sm.msg, sm.ts, nil
}
return "", nil, nil, 0, err
return _EMPTY_, nil, nil, 0, err
}
// Type returns the type of the underlying store.
@@ -2967,6 +2972,11 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
fs.mu.Unlock()
return 0, nil
}
if err := smb.loadMsgs(); err != nil {
fs.mu.Unlock()
return 0, err
}
// All msgblocks up to this one can be thrown away.
for i, mb := range fs.blks {
if mb == smb {
@@ -2979,16 +2989,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
}
fs.mu.Unlock()
if err := smb.loadMsgs(); err != nil {
return purged, err
}
smb.mu.Lock()
for mseq := smb.first.seq; mseq < seq; mseq++ {
if sm, _ := smb.cacheLookupWithLock(mseq); sm != nil && smb.msgs > 0 {
smb.bytes -= fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
smb.bytes -= sz
bytes += sz
smb.msgs--
purged++
}
@@ -3004,12 +3011,17 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
if sm != nil {
// Reset our version of first.
fs.mu.Lock()
fs.state.FirstSeq = sm.seq
fs.state.FirstTime = time.Unix(0, sm.ts).UTC()
fs.state.Msgs -= purged
fs.state.Bytes -= bytes
fs.mu.Unlock()
}
cb := fs.scb
fs.mu.Unlock()
if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
return purged, nil
@@ -3229,6 +3241,14 @@ func (fs *fileStore) Delete() error {
return os.RemoveAll(fs.fcfg.StoreDir)
}
// Lock should be held.
func (fs *fileStore) cancelSyncTimer() {
if fs.syncTmr != nil {
fs.syncTmr.Stop()
fs.syncTmr = nil
}
}
func (fs *fileStore) Stop() error {
fs.mu.Lock()
if fs.closed {
@@ -3241,14 +3261,8 @@ func (fs *fileStore) Stop() error {
fs.checkAndFlushAllBlocks()
fs.closeAllMsgBlocks(false)
if fs.syncTmr != nil {
fs.syncTmr.Stop()
fs.syncTmr = nil
}
if fs.ageChk != nil {
fs.ageChk.Stop()
fs.ageChk = nil
}
fs.cancelSyncTimer()
fs.cancelAgeChk()
var _cfs [256]*consumerFileStore
cfs := append(_cfs[:0], fs.cfs...)
@@ -3491,7 +3505,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
if fs.isClosed() {
return nil, ErrStoreClosed
}
if cfg == nil || name == "" {
if cfg == nil || name == _EMPTY_ {
return nil, fmt.Errorf("bad consumer config")
}
odir := path.Join(fs.fcfg.StoreDir, consumerDir, name)

View File

@@ -566,12 +566,15 @@ func TestFileStoreBytesLimit(t *testing.T) {
}
func TestFileStoreAgeLimit(t *testing.T) {
maxAge := 10 * time.Millisecond
maxAge := 100 * time.Millisecond
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge})
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 256},
StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge},
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -579,7 +582,7 @@ func TestFileStoreAgeLimit(t *testing.T) {
// Store some messages. Does not really matter how many.
subj, msg := "foo", []byte("Hello World")
toStore := 100
toStore := 500
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, nil, msg)
}
@@ -589,7 +592,7 @@ func TestFileStoreAgeLimit(t *testing.T) {
}
checkExpired := func(t *testing.T) {
t.Helper()
checkFor(t, time.Second, maxAge, func() error {
checkFor(t, 5*time.Second, maxAge, func() error {
state = fs.State()
if state.Msgs != 0 {
return fmt.Errorf("Expected no msgs, got %d", state.Msgs)
@@ -602,6 +605,7 @@ func TestFileStoreAgeLimit(t *testing.T) {
}
// Let them expire
checkExpired(t)
// Now add some more and make sure that timer will fire again.
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, nil, msg)