Improved stability when expireMsgs and writeMsg and Compact/Purge all concurrent.

We had issues of instability and incorrect behavior during concurrent operations.
This CL optimizes expiring msgs to be more efficient and hold the lock until completion.
Compact will also now hold the top level lock through completion.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-05-04 19:19:47 -07:00
parent 4533a70667
commit 71ba4b1bf6
2 changed files with 122 additions and 104 deletions

View File

@@ -65,25 +65,24 @@ type FileConsumerInfo struct {
}
type fileStore struct {
mu sync.RWMutex
state StreamState
ld *LostStreamData
scb StorageUpdateHandler
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
fcfg FileStoreConfig
lmb *msgBlock
blks []*msgBlock
hh hash.Hash64
qch chan struct{}
cfs []*consumerFileStore
fsi map[string]seqSlice
fsis *simpleState
closed bool
expiring bool
fip bool
sips int
mu sync.RWMutex
state StreamState
ld *LostStreamData
scb StorageUpdateHandler
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
fcfg FileStoreConfig
lmb *msgBlock
blks []*msgBlock
hh hash.Hash64
qch chan struct{}
cfs []*consumerFileStore
fsi map[string]seqSlice
fsis *simpleState
closed bool
fip bool
sips int
}
// Represents a message store block and its data.
@@ -203,7 +202,7 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
}
func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time) (*fileStore, error) {
if cfg.Name == "" {
if cfg.Name == _EMPTY_ {
return nil, fmt.Errorf("name required")
}
if cfg.Storage != FileStorage {
@@ -299,7 +298,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
return ErrStoreClosed
}
if cfg.Name == "" {
if cfg.Name == _EMPTY_ {
return fmt.Errorf("name required")
}
if cfg.Storage != FileStorage {
@@ -315,9 +314,11 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.mu.Unlock()
return err
}
// Limits checks and enforcement.
fs.enforceMsgLimit()
fs.enforceBytesLimit()
// Do age timers.
if fs.ageChk == nil && fs.cfg.MaxAge != 0 {
fs.startAgeChk()
@@ -848,12 +849,15 @@ func (mb *msgBlock) setupWriteCache(buf []byte) {
if mb.cache != nil {
return
}
if buf != nil {
buf = buf[:0]
}
mb.cache = &cache{buf: buf}
// Make sure we set the proper cache offset if we have existing data.
var fi os.FileInfo
if mb.mfd != nil {
fi, _ = mb.mfd.Stat()
} else {
} else if mb.mfn != _EMPTY_ {
fi, _ = os.Stat(mb.mfn)
}
if fi != nil {
@@ -1132,7 +1136,7 @@ func (fs *fileStore) enforceMsgLimit() {
return
}
for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs {
if removed, err := fs.deleteFirstMsgLocked(); err != nil || !removed {
if removed, err := fs.deleteFirstMsg(); err != nil || !removed {
fs.rebuildFirst()
return
}
@@ -1146,28 +1150,20 @@ func (fs *fileStore) enforceBytesLimit() {
return
}
for bs := fs.state.Bytes; bs > uint64(fs.cfg.MaxBytes); bs = fs.state.Bytes {
if removed, err := fs.deleteFirstMsgLocked(); err != nil || !removed {
if removed, err := fs.deleteFirstMsg(); err != nil || !removed {
fs.rebuildFirst()
return
}
}
}
// Lock should be held but will be released during actual remove.
func (fs *fileStore) deleteFirstMsgLocked() (bool, error) {
// Lock should be held on entry but will be released during actual remove.
func (fs *fileStore) deleteFirstMsg() (bool, error) {
fs.mu.Unlock()
defer fs.mu.Lock()
return fs.removeMsg(fs.state.FirstSeq, false)
}
// Lock should NOT be held.
func (fs *fileStore) deleteFirstMsg() (bool, error) {
fs.mu.RLock()
seq := fs.state.FirstSeq
fs.mu.RUnlock()
return fs.removeMsg(seq, false)
}
// RemoveMsg will remove the message from this store.
// Will return the number of bytes removed.
func (fs *fileStore) RemoveMsg(seq uint64) (bool, error) {
@@ -1455,7 +1451,7 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) {
waiting = newWaiting
ts *= 2
}
mb.flushPendingMsgs()
mb.flushPendingMsgsAndWait()
// Check if we are no longer the last message block. If we are
// not we can close FDs and exit.
mb.fs.mu.RLock()
@@ -1770,48 +1766,44 @@ func (fs *fileStore) expireMsgsLocked() {
fs.mu.Lock()
}
// Lock should be held.
func (fs *fileStore) resetAgeChk(delta int64) {
fireIn := fs.cfg.MaxAge
if delta > 0 {
fireIn = time.Duration(delta)
}
if fs.ageChk != nil {
fs.ageChk.Reset(fireIn)
} else {
fs.ageChk = time.AfterFunc(fireIn, fs.expireMsgs)
}
}
// Lock should be held.
func (fs *fileStore) cancelAgeChk() {
if fs.ageChk != nil {
fs.ageChk.Stop()
fs.ageChk = nil
}
}
// Will expire msgs that are too old.
func (fs *fileStore) expireMsgs() {
// Make sure this is only running one at a time.
fs.mu.Lock()
if fs.expiring {
fs.mu.Unlock()
return
// We need to delete one by one here and can not optimize for the time being.
// Reason is that we need more information to adjust ack pending in consumers.
var sm *fileStoredMsg
minAge := time.Now().UnixNano() - int64(fs.cfg.MaxAge)
for sm, _ = fs.msgForSeq(0); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0) {
fs.removeMsg(sm.seq, false)
}
fs.expiring = true
fs.mu.Unlock()
defer func() {
fs.mu.Lock()
fs.expiring = false
fs.mu.Unlock()
}()
fs.mu.Lock()
defer fs.mu.Unlock()
now := time.Now().UnixNano()
minAge := now - int64(fs.cfg.MaxAge)
for {
sm, _ := fs.msgForSeq(0)
if sm != nil && sm.ts <= minAge {
fs.deleteFirstMsg()
} else {
fs.mu.Lock()
if sm == nil {
if fs.ageChk != nil {
fs.ageChk.Stop()
fs.ageChk = nil
}
} else {
fireIn := time.Duration(sm.ts-now) + fs.cfg.MaxAge
if fs.ageChk != nil {
fs.ageChk.Reset(fireIn)
} else {
fs.ageChk = time.AfterFunc(fireIn, fs.expireMsgs)
}
}
fs.mu.Unlock()
return
}
if sm == nil {
fs.cancelAgeChk()
} else {
fs.resetAgeChk(sm.ts - minAge)
}
}
@@ -1922,7 +1914,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// If we should be flushing in place do so here. We will also flip to flushing in place if we
// had a write error.
if flush || werr != nil {
if err := mb.flushPendingMsgs(); err != nil && err != errFlushRunning && err != errNoPending {
if err := mb.flushPendingMsgsAndWait(); err != nil {
return err
}
if writeIndex {
@@ -1952,8 +1944,10 @@ func (mb *msgBlock) pendingWriteSize() int {
return pending
}
// Lock should be held.
// Lock should NOT be held.
func (mb *msgBlock) clearFlushing() {
mb.mu.Lock()
defer mb.mu.Unlock()
if mb.cache != nil {
mb.cache.flush = false
}
@@ -2206,14 +2200,22 @@ func (mb *msgBlock) quitChan() chan struct{} {
// This function is called for in place flushing so we need to wait.
func (mb *msgBlock) flushPendingMsgsAndWait() error {
var err error
var t *time.Timer
const delay = time.Millisecond
// If we are in flush wait for that to clear.
for err = mb.flushPendingMsgs(); err == errFlushRunning; err = mb.flushPendingMsgs() {
qch := mb.quitChan()
if t == nil {
t = time.NewTimer(delay)
defer t.Stop()
} else {
t.Reset(delay)
}
select {
case <-qch:
return nil
case <-time.After(time.Millisecond):
case <-t.C:
}
}
return err
@@ -2236,7 +2238,7 @@ func (mb *msgBlock) flushPendingMsgs() error {
if err != nil {
mb.mu.Unlock()
// No pending data to be written is not an error.
if err == errNoPending {
if err == errNoPending || err == errNoCache {
err = nil
}
return err
@@ -2247,6 +2249,9 @@ func (mb *msgBlock) flushPendingMsgs() error {
// Only one can be flushing at a time.
mb.setFlushing()
// Clear on exit.
defer mb.clearFlushing()
mfd := mb.mfd
mb.mu.Unlock()
@@ -2272,20 +2277,20 @@ func (mb *msgBlock) flushPendingMsgs() error {
woff += int64(n)
tn += n
// Success
if n == lbb {
// Partial write.
if n != lbb {
buf = buf[n:]
} else {
// Done.
break
}
// Partial write..
buf = buf[n:]
}
// We did a successful write.
// Re-acquire lock to update.
mb.mu.Lock()
defer mb.mu.Unlock()
// Clear on exit.
defer mb.clearFlushing()
// set write err to any error.
mb.werr = err
@@ -2629,7 +2634,7 @@ func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, []byte, int64, error)
if sm != nil {
return sm.subj, sm.hdr, sm.msg, sm.ts, nil
}
return "", nil, nil, 0, err
return _EMPTY_, nil, nil, 0, err
}
// Type returns the type of the underlying store.
@@ -2967,6 +2972,11 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
fs.mu.Unlock()
return 0, nil
}
if err := smb.loadMsgs(); err != nil {
fs.mu.Unlock()
return 0, err
}
// All msgblocks up to this one can be thrown away.
for i, mb := range fs.blks {
if mb == smb {
@@ -2979,16 +2989,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
}
fs.mu.Unlock()
if err := smb.loadMsgs(); err != nil {
return purged, err
}
smb.mu.Lock()
for mseq := smb.first.seq; mseq < seq; mseq++ {
if sm, _ := smb.cacheLookupWithLock(mseq); sm != nil && smb.msgs > 0 {
smb.bytes -= fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
smb.bytes -= sz
bytes += sz
smb.msgs--
purged++
}
@@ -3004,12 +3011,17 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
if sm != nil {
// Reset our version of first.
fs.mu.Lock()
fs.state.FirstSeq = sm.seq
fs.state.FirstTime = time.Unix(0, sm.ts).UTC()
fs.state.Msgs -= purged
fs.state.Bytes -= bytes
fs.mu.Unlock()
}
cb := fs.scb
fs.mu.Unlock()
if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
return purged, nil
@@ -3229,6 +3241,14 @@ func (fs *fileStore) Delete() error {
return os.RemoveAll(fs.fcfg.StoreDir)
}
// Lock should be held.
func (fs *fileStore) cancelSyncTimer() {
if fs.syncTmr != nil {
fs.syncTmr.Stop()
fs.syncTmr = nil
}
}
func (fs *fileStore) Stop() error {
fs.mu.Lock()
if fs.closed {
@@ -3241,14 +3261,8 @@ func (fs *fileStore) Stop() error {
fs.checkAndFlushAllBlocks()
fs.closeAllMsgBlocks(false)
if fs.syncTmr != nil {
fs.syncTmr.Stop()
fs.syncTmr = nil
}
if fs.ageChk != nil {
fs.ageChk.Stop()
fs.ageChk = nil
}
fs.cancelSyncTimer()
fs.cancelAgeChk()
var _cfs [256]*consumerFileStore
cfs := append(_cfs[:0], fs.cfs...)
@@ -3491,7 +3505,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
if fs.isClosed() {
return nil, ErrStoreClosed
}
if cfg == nil || name == "" {
if cfg == nil || name == _EMPTY_ {
return nil, fmt.Errorf("bad consumer config")
}
odir := path.Join(fs.fcfg.StoreDir, consumerDir, name)

View File

@@ -566,12 +566,15 @@ func TestFileStoreBytesLimit(t *testing.T) {
}
func TestFileStoreAgeLimit(t *testing.T) {
maxAge := 10 * time.Millisecond
maxAge := 100 * time.Millisecond
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge})
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 256},
StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge},
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -579,7 +582,7 @@ func TestFileStoreAgeLimit(t *testing.T) {
// Store some messages. Does not really matter how many.
subj, msg := "foo", []byte("Hello World")
toStore := 100
toStore := 500
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, nil, msg)
}
@@ -589,7 +592,7 @@ func TestFileStoreAgeLimit(t *testing.T) {
}
checkExpired := func(t *testing.T) {
t.Helper()
checkFor(t, time.Second, maxAge, func() error {
checkFor(t, 5*time.Second, maxAge, func() error {
state = fs.State()
if state.Msgs != 0 {
return fmt.Errorf("Expected no msgs, got %d", state.Msgs)
@@ -602,6 +605,7 @@ func TestFileStoreAgeLimit(t *testing.T) {
}
// Let them expire
checkExpired(t)
// Now add some more and make sure that timer will fire again.
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, nil, msg)