diff --git a/server/filestore.go b/server/filestore.go index c1f71a01..e20d816c 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -65,25 +65,24 @@ type FileConsumerInfo struct { } type fileStore struct { - mu sync.RWMutex - state StreamState - ld *LostStreamData - scb StorageUpdateHandler - ageChk *time.Timer - syncTmr *time.Timer - cfg FileStreamInfo - fcfg FileStoreConfig - lmb *msgBlock - blks []*msgBlock - hh hash.Hash64 - qch chan struct{} - cfs []*consumerFileStore - fsi map[string]seqSlice - fsis *simpleState - closed bool - expiring bool - fip bool - sips int + mu sync.RWMutex + state StreamState + ld *LostStreamData + scb StorageUpdateHandler + ageChk *time.Timer + syncTmr *time.Timer + cfg FileStreamInfo + fcfg FileStoreConfig + lmb *msgBlock + blks []*msgBlock + hh hash.Hash64 + qch chan struct{} + cfs []*consumerFileStore + fsi map[string]seqSlice + fsis *simpleState + closed bool + fip bool + sips int } // Represents a message store block and its data. @@ -203,7 +202,7 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) { } func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created time.Time) (*fileStore, error) { - if cfg.Name == "" { + if cfg.Name == _EMPTY_ { return nil, fmt.Errorf("name required") } if cfg.Storage != FileStorage { @@ -299,7 +298,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { return ErrStoreClosed } - if cfg.Name == "" { + if cfg.Name == _EMPTY_ { return fmt.Errorf("name required") } if cfg.Storage != FileStorage { @@ -315,9 +314,11 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.mu.Unlock() return err } + // Limits checks and enforcement. fs.enforceMsgLimit() fs.enforceBytesLimit() + // Do age timers. if fs.ageChk == nil && fs.cfg.MaxAge != 0 { fs.startAgeChk() @@ -848,12 +849,15 @@ func (mb *msgBlock) setupWriteCache(buf []byte) { if mb.cache != nil { return } + if buf != nil { + buf = buf[:0] + } mb.cache = &cache{buf: buf} // Make sure we set the proper cache offset if we have existing data. var fi os.FileInfo if mb.mfd != nil { fi, _ = mb.mfd.Stat() - } else { + } else if mb.mfn != _EMPTY_ { fi, _ = os.Stat(mb.mfn) } if fi != nil { @@ -1132,7 +1136,7 @@ func (fs *fileStore) enforceMsgLimit() { return } for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs { - if removed, err := fs.deleteFirstMsgLocked(); err != nil || !removed { + if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return } @@ -1146,28 +1150,20 @@ func (fs *fileStore) enforceBytesLimit() { return } for bs := fs.state.Bytes; bs > uint64(fs.cfg.MaxBytes); bs = fs.state.Bytes { - if removed, err := fs.deleteFirstMsgLocked(); err != nil || !removed { + if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return } } } -// Lock should be held but will be released during actual remove. -func (fs *fileStore) deleteFirstMsgLocked() (bool, error) { +// Lock should be held on entry but will be released during actual remove. +func (fs *fileStore) deleteFirstMsg() (bool, error) { fs.mu.Unlock() defer fs.mu.Lock() return fs.removeMsg(fs.state.FirstSeq, false) } -// Lock should NOT be held. -func (fs *fileStore) deleteFirstMsg() (bool, error) { - fs.mu.RLock() - seq := fs.state.FirstSeq - fs.mu.RUnlock() - return fs.removeMsg(seq, false) -} - // RemoveMsg will remove the message from this store. // Will return the number of bytes removed. func (fs *fileStore) RemoveMsg(seq uint64) (bool, error) { @@ -1455,7 +1451,7 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) { waiting = newWaiting ts *= 2 } - mb.flushPendingMsgs() + mb.flushPendingMsgsAndWait() // Check if we are no longer the last message block. If we are // not we can close FDs and exit. mb.fs.mu.RLock() @@ -1770,48 +1766,44 @@ func (fs *fileStore) expireMsgsLocked() { fs.mu.Lock() } +// Lock should be held. +func (fs *fileStore) resetAgeChk(delta int64) { + fireIn := fs.cfg.MaxAge + if delta > 0 { + fireIn = time.Duration(delta) + } + if fs.ageChk != nil { + fs.ageChk.Reset(fireIn) + } else { + fs.ageChk = time.AfterFunc(fireIn, fs.expireMsgs) + } +} + +// Lock should be held. +func (fs *fileStore) cancelAgeChk() { + if fs.ageChk != nil { + fs.ageChk.Stop() + fs.ageChk = nil + } +} + // Will expire msgs that are too old. func (fs *fileStore) expireMsgs() { - // Make sure this is only running one at a time. - fs.mu.Lock() - if fs.expiring { - fs.mu.Unlock() - return + // We need to delete one by one here and can not optimize for the time being. + // Reason is that we need more information to adjust ack pending in consumers. + var sm *fileStoredMsg + minAge := time.Now().UnixNano() - int64(fs.cfg.MaxAge) + for sm, _ = fs.msgForSeq(0); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0) { + fs.removeMsg(sm.seq, false) } - fs.expiring = true - fs.mu.Unlock() - defer func() { - fs.mu.Lock() - fs.expiring = false - fs.mu.Unlock() - }() + fs.mu.Lock() + defer fs.mu.Unlock() - now := time.Now().UnixNano() - minAge := now - int64(fs.cfg.MaxAge) - - for { - sm, _ := fs.msgForSeq(0) - if sm != nil && sm.ts <= minAge { - fs.deleteFirstMsg() - } else { - fs.mu.Lock() - if sm == nil { - if fs.ageChk != nil { - fs.ageChk.Stop() - fs.ageChk = nil - } - } else { - fireIn := time.Duration(sm.ts-now) + fs.cfg.MaxAge - if fs.ageChk != nil { - fs.ageChk.Reset(fireIn) - } else { - fs.ageChk = time.AfterFunc(fireIn, fs.expireMsgs) - } - } - fs.mu.Unlock() - return - } + if sm == nil { + fs.cancelAgeChk() + } else { + fs.resetAgeChk(sm.ts - minAge) } } @@ -1922,7 +1914,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // If we should be flushing in place do so here. We will also flip to flushing in place if we // had a write error. if flush || werr != nil { - if err := mb.flushPendingMsgs(); err != nil && err != errFlushRunning && err != errNoPending { + if err := mb.flushPendingMsgsAndWait(); err != nil { return err } if writeIndex { @@ -1952,8 +1944,10 @@ func (mb *msgBlock) pendingWriteSize() int { return pending } -// Lock should be held. +// Lock should NOT be held. func (mb *msgBlock) clearFlushing() { + mb.mu.Lock() + defer mb.mu.Unlock() if mb.cache != nil { mb.cache.flush = false } @@ -2206,14 +2200,22 @@ func (mb *msgBlock) quitChan() chan struct{} { // This function is called for in place flushing so we need to wait. func (mb *msgBlock) flushPendingMsgsAndWait() error { var err error + var t *time.Timer + const delay = time.Millisecond // If we are in flush wait for that to clear. for err = mb.flushPendingMsgs(); err == errFlushRunning; err = mb.flushPendingMsgs() { qch := mb.quitChan() + if t == nil { + t = time.NewTimer(delay) + defer t.Stop() + } else { + t.Reset(delay) + } select { case <-qch: return nil - case <-time.After(time.Millisecond): + case <-t.C: } } return err @@ -2236,7 +2238,7 @@ func (mb *msgBlock) flushPendingMsgs() error { if err != nil { mb.mu.Unlock() // No pending data to be written is not an error. - if err == errNoPending { + if err == errNoPending || err == errNoCache { err = nil } return err @@ -2247,6 +2249,9 @@ func (mb *msgBlock) flushPendingMsgs() error { // Only one can be flushing at a time. mb.setFlushing() + // Clear on exit. + defer mb.clearFlushing() + mfd := mb.mfd mb.mu.Unlock() @@ -2272,20 +2277,20 @@ func (mb *msgBlock) flushPendingMsgs() error { woff += int64(n) tn += n - // Success - if n == lbb { + // Partial write. + if n != lbb { + buf = buf[n:] + } else { + // Done. break } - // Partial write.. - buf = buf[n:] } // We did a successful write. // Re-acquire lock to update. mb.mu.Lock() defer mb.mu.Unlock() - // Clear on exit. - defer mb.clearFlushing() + // set write err to any error. mb.werr = err @@ -2629,7 +2634,7 @@ func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, []byte, int64, error) if sm != nil { return sm.subj, sm.hdr, sm.msg, sm.ts, nil } - return "", nil, nil, 0, err + return _EMPTY_, nil, nil, 0, err } // Type returns the type of the underlying store. @@ -2967,6 +2972,11 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { fs.mu.Unlock() return 0, nil } + if err := smb.loadMsgs(); err != nil { + fs.mu.Unlock() + return 0, err + } + // All msgblocks up to this one can be thrown away. for i, mb := range fs.blks { if mb == smb { @@ -2979,16 +2989,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { mb.dirtyCloseWithRemove(true) mb.mu.Unlock() } - fs.mu.Unlock() - - if err := smb.loadMsgs(); err != nil { - return purged, err - } smb.mu.Lock() for mseq := smb.first.seq; mseq < seq; mseq++ { if sm, _ := smb.cacheLookupWithLock(mseq); sm != nil && smb.msgs > 0 { - smb.bytes -= fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) + sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) + smb.bytes -= sz + bytes += sz smb.msgs-- purged++ } @@ -3004,12 +3011,17 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { if sm != nil { // Reset our version of first. - fs.mu.Lock() fs.state.FirstSeq = sm.seq fs.state.FirstTime = time.Unix(0, sm.ts).UTC() fs.state.Msgs -= purged fs.state.Bytes -= bytes - fs.mu.Unlock() + } + + cb := fs.scb + fs.mu.Unlock() + + if cb != nil { + cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } return purged, nil @@ -3229,6 +3241,14 @@ func (fs *fileStore) Delete() error { return os.RemoveAll(fs.fcfg.StoreDir) } +// Lock should be held. +func (fs *fileStore) cancelSyncTimer() { + if fs.syncTmr != nil { + fs.syncTmr.Stop() + fs.syncTmr = nil + } +} + func (fs *fileStore) Stop() error { fs.mu.Lock() if fs.closed { @@ -3241,14 +3261,8 @@ func (fs *fileStore) Stop() error { fs.checkAndFlushAllBlocks() fs.closeAllMsgBlocks(false) - if fs.syncTmr != nil { - fs.syncTmr.Stop() - fs.syncTmr = nil - } - if fs.ageChk != nil { - fs.ageChk.Stop() - fs.ageChk = nil - } + fs.cancelSyncTimer() + fs.cancelAgeChk() var _cfs [256]*consumerFileStore cfs := append(_cfs[:0], fs.cfs...) @@ -3491,7 +3505,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt if fs.isClosed() { return nil, ErrStoreClosed } - if cfg == nil || name == "" { + if cfg == nil || name == _EMPTY_ { return nil, fmt.Errorf("bad consumer config") } odir := path.Join(fs.fcfg.StoreDir, consumerDir, name) diff --git a/server/filestore_test.go b/server/filestore_test.go index 93ea9e5a..d4e41324 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -566,12 +566,15 @@ func TestFileStoreBytesLimit(t *testing.T) { } func TestFileStoreAgeLimit(t *testing.T) { - maxAge := 10 * time.Millisecond + maxAge := 100 * time.Millisecond storeDir := createDir(t, JetStreamStoreDir) defer removeDir(t, storeDir) - fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge}) + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir, BlockSize: 256}, + StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge}, + ) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -579,7 +582,7 @@ func TestFileStoreAgeLimit(t *testing.T) { // Store some messages. Does not really matter how many. subj, msg := "foo", []byte("Hello World") - toStore := 100 + toStore := 500 for i := 0; i < toStore; i++ { fs.StoreMsg(subj, nil, msg) } @@ -589,7 +592,7 @@ func TestFileStoreAgeLimit(t *testing.T) { } checkExpired := func(t *testing.T) { t.Helper() - checkFor(t, time.Second, maxAge, func() error { + checkFor(t, 5*time.Second, maxAge, func() error { state = fs.State() if state.Msgs != 0 { return fmt.Errorf("Expected no msgs, got %d", state.Msgs) @@ -602,6 +605,7 @@ func TestFileStoreAgeLimit(t *testing.T) { } // Let them expire checkExpired(t) + // Now add some more and make sure that timer will fire again. for i := 0; i < toStore; i++ { fs.StoreMsg(subj, nil, msg)