mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
When in async mode make sure to not flush or create new files after being closed.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3119,8 +3119,8 @@ func (mb *msgBlock) spinUpFlushLoop() {
|
||||
mb.mu.Lock()
|
||||
defer mb.mu.Unlock()
|
||||
|
||||
// Are we already running?
|
||||
if mb.flusher {
|
||||
// Are we already running or closed?
|
||||
if mb.flusher || mb.closed {
|
||||
return
|
||||
}
|
||||
mb.flusher = true
|
||||
@@ -3867,12 +3867,14 @@ func (mb *msgBlock) pendingWriteSize() int {
|
||||
if mb == nil {
|
||||
return 0
|
||||
}
|
||||
var pending int
|
||||
|
||||
mb.mu.RLock()
|
||||
if mb.mfd != nil && mb.cache != nil {
|
||||
defer mb.mu.RUnlock()
|
||||
|
||||
var pending int
|
||||
if !mb.closed && mb.mfd != nil && mb.cache != nil {
|
||||
pending = len(mb.cache.buf) - int(mb.cache.wp)
|
||||
}
|
||||
mb.mu.RUnlock()
|
||||
return pending
|
||||
}
|
||||
|
||||
@@ -4615,6 +4617,7 @@ var (
|
||||
errNoEncryption = errors.New("encryption not enabled")
|
||||
errBadKeySize = errors.New("encryption bad key size")
|
||||
errNoMsgBlk = errors.New("no message block")
|
||||
errMsgBlkClosed = errors.New("message block is closed")
|
||||
errMsgBlkTooBig = errors.New("message block size exceeded int capacity")
|
||||
errUnknownCipher = errors.New("unknown cipher")
|
||||
errDIOStalled = errors.New("IO is stalled")
|
||||
@@ -5115,6 +5118,10 @@ func (mb *msgBlock) writeIndexInfo() error {
|
||||
// Write index info to the appropriate file.
|
||||
// Filestore lock and mb lock should be held.
|
||||
func (mb *msgBlock) writeIndexInfoLocked() error {
|
||||
if mb.closed {
|
||||
return errMsgBlkClosed
|
||||
}
|
||||
|
||||
// HEADER: magic version msgs bytes fseq fts lseq lts ndel checksum
|
||||
// Make large enough to hold almost all possible maximum interior delete scenarios.
|
||||
var hdr [42 * 1024]byte
|
||||
@@ -6401,7 +6408,6 @@ func (mb *msgBlock) close(sync bool) {
|
||||
if mb.closed {
|
||||
return
|
||||
}
|
||||
mb.closed = true
|
||||
|
||||
// Stop cache expiration timer.
|
||||
if mb.ctmr != nil {
|
||||
@@ -6435,6 +6441,8 @@ func (mb *msgBlock) close(sync bool) {
|
||||
}
|
||||
mb.mfd = nil
|
||||
mb.ifd = nil
|
||||
// Mark as closed.
|
||||
mb.closed = true
|
||||
}
|
||||
|
||||
func (fs *fileStore) closeAllMsgBlocks(sync bool) {
|
||||
|
||||
Reference in New Issue
Block a user