mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Fixes for filestore consistency on server restarts under heavy load with purge operations
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.2.0-beta.6"
|
||||
VERSION = "2.2.0-beta.7"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -604,6 +604,7 @@ func (o *Consumer) updateStateLoop() {
|
||||
// just block and not fire.
|
||||
o.updateDeliveryInterest(interest)
|
||||
case <-fch:
|
||||
// FIXME(dlc) - Check for fast changes at quick intervals.
|
||||
time.Sleep(25 * time.Millisecond)
|
||||
o.mu.Lock()
|
||||
if o.store != nil {
|
||||
@@ -850,7 +851,7 @@ func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) {
|
||||
return
|
||||
}
|
||||
|
||||
// can be nil during shutdown, locks are help in the caller
|
||||
// can be nil during shutdown, locks are held in the caller
|
||||
if o.mset != nil && o.mset.sendq != nil {
|
||||
o.mset.sendq <- &jsPubMsg{o.deliveryExcEventT, o.deliveryExcEventT, _EMPTY_, j, nil, 0}
|
||||
}
|
||||
@@ -892,7 +893,7 @@ func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, error) {
|
||||
}
|
||||
// We got an error here. If this is an EOF we will return, otherwise
|
||||
// we can continue looking.
|
||||
if err == ErrStoreEOF {
|
||||
if err == ErrStoreEOF || err == ErrStoreClosed {
|
||||
return "", nil, 0, 0, err
|
||||
}
|
||||
// Skip since its probably deleted or expired.
|
||||
@@ -1162,7 +1163,6 @@ func (o *Consumer) deliverMsg(dsubj, subj string, msg []byte, seq, dcount uint64
|
||||
}
|
||||
o.dseq++
|
||||
o.updateStore()
|
||||
|
||||
}
|
||||
|
||||
// Tracks our outstanding pending acks. Only applicable to AckExplicit mode.
|
||||
|
||||
@@ -232,6 +232,10 @@ func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
|
||||
}
|
||||
|
||||
func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
|
||||
if fs.isClosed() {
|
||||
return ErrStoreClosed
|
||||
}
|
||||
|
||||
if cfg.Name == "" {
|
||||
return fmt.Errorf("name required")
|
||||
}
|
||||
@@ -387,7 +391,7 @@ func (fs *fileStore) recoverMsgs() error {
|
||||
// Check for any left over purged messages.
|
||||
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
|
||||
if _, err := os.Stat(pdir); err == nil {
|
||||
go os.RemoveAll(pdir)
|
||||
os.RemoveAll(pdir)
|
||||
}
|
||||
|
||||
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
|
||||
@@ -444,8 +448,13 @@ func (fs *fileStore) recoverMsgs() error {
|
||||
func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
|
||||
fs.mu.RLock()
|
||||
lastSeq := fs.state.LastSeq
|
||||
closed := fs.closed
|
||||
fs.mu.RUnlock()
|
||||
|
||||
if closed {
|
||||
return 0
|
||||
}
|
||||
|
||||
mb := fs.selectMsgBlockForStart(t)
|
||||
if mb == nil {
|
||||
return lastSeq + 1
|
||||
@@ -540,6 +549,10 @@ func (fs *fileStore) enableLastMsgBlockForWriting() error {
|
||||
// Store stores a message.
|
||||
func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, error) {
|
||||
fs.mu.Lock()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return 0, ErrStoreClosed
|
||||
}
|
||||
|
||||
seq := fs.state.LastSeq + 1
|
||||
if fs.state.FirstSeq == 0 {
|
||||
@@ -622,6 +635,13 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
|
||||
return fs.removeMsg(seq, true)
|
||||
}
|
||||
|
||||
func (fs *fileStore) isClosed() bool {
|
||||
fs.mu.RLock()
|
||||
closed := fs.closed
|
||||
fs.mu.RUnlock()
|
||||
return closed
|
||||
}
|
||||
|
||||
func (fs *fileStore) isSnapshotting() bool {
|
||||
fs.mu.RLock()
|
||||
iss := fs.sips > 0
|
||||
@@ -631,6 +651,9 @@ func (fs *fileStore) isSnapshotting() bool {
|
||||
|
||||
// Remove a message, optionally rewriting the mb file.
|
||||
func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) {
|
||||
if fs.isClosed() {
|
||||
return false, ErrStoreClosed
|
||||
}
|
||||
if fs.isSnapshotting() {
|
||||
return false, ErrStoreSnapshotInProgress
|
||||
}
|
||||
@@ -884,7 +907,7 @@ func (fs *fileStore) kickFlusher() {
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *fileStore) writePendingSize() int {
|
||||
func (fs *fileStore) pendingWriteSize() int {
|
||||
var sz int
|
||||
fs.mu.RLock()
|
||||
if fs.wmb != nil {
|
||||
@@ -898,11 +921,14 @@ func (fs *fileStore) flushLoop(fch, qch chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case <-fch:
|
||||
waiting := fs.writePendingSize()
|
||||
waiting := fs.pendingWriteSize()
|
||||
if waiting == 0 {
|
||||
continue
|
||||
}
|
||||
ts := 1 * time.Millisecond
|
||||
for waiting < coalesceMinimum {
|
||||
time.Sleep(ts)
|
||||
newWaiting := fs.writePendingSize()
|
||||
newWaiting := fs.pendingWriteSize()
|
||||
if newWaiting <= waiting {
|
||||
break
|
||||
}
|
||||
@@ -1318,11 +1344,17 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
|
||||
|
||||
// Will return message for the given sequence number.
|
||||
func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
|
||||
fs.mu.RLock()
|
||||
if fs.closed {
|
||||
fs.mu.RUnlock()
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
fseq := fs.state.FirstSeq
|
||||
fs.mu.RUnlock()
|
||||
|
||||
// Indicates we want first msg.
|
||||
if seq == 0 {
|
||||
fs.mu.RLock()
|
||||
seq = fs.state.FirstSeq
|
||||
fs.mu.RUnlock()
|
||||
seq = fseq
|
||||
}
|
||||
|
||||
mb := fs.selectMsgBlock(seq)
|
||||
@@ -1608,41 +1640,52 @@ func (fs *fileStore) dmapEntries() int {
|
||||
// Will return the number of purged messages.
|
||||
func (fs *fileStore) Purge() uint64 {
|
||||
fs.mu.Lock()
|
||||
fs.flushPendingWrites()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return 0
|
||||
}
|
||||
|
||||
purged := fs.state.Msgs
|
||||
bytes := int64(fs.state.Bytes)
|
||||
rbytes := int64(fs.state.Bytes)
|
||||
|
||||
fs.state.FirstSeq = fs.state.LastSeq + 1
|
||||
fs.state.Bytes = 0
|
||||
fs.state.Msgs = 0
|
||||
fs.writeStreamMeta()
|
||||
|
||||
lmb := fs.lmb
|
||||
for _, mb := range fs.blks {
|
||||
mb.dirtyClose()
|
||||
}
|
||||
|
||||
fs.blks = nil
|
||||
fs.wmb = &bytes.Buffer{}
|
||||
fs.lmb = nil
|
||||
|
||||
// Move the msgs directory out of the way, will delete out of band.
|
||||
// FIXME(dlc) - These can error and we need to change api.
|
||||
// FIXME(dlc) - These can error and we need to change api above to propagate?
|
||||
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
|
||||
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
|
||||
os.Rename(mdir, pdir)
|
||||
os.MkdirAll(mdir, 0755)
|
||||
go os.RemoveAll(pdir)
|
||||
|
||||
// Now place new write msg block with correct info.
|
||||
fs.newMsgBlockForWrite()
|
||||
if lmb != nil {
|
||||
fs.lmb.first = lmb.last
|
||||
fs.lmb.first.seq += 1
|
||||
fs.lmb.last = lmb.last
|
||||
fs.lmb.writeIndexInfo()
|
||||
// If purge directory still exists then we need to wait
|
||||
// in place and remove since rename would fail.
|
||||
if _, err := os.Stat(pdir); err == nil {
|
||||
os.RemoveAll(pdir)
|
||||
}
|
||||
os.Rename(mdir, pdir)
|
||||
go os.RemoveAll(pdir)
|
||||
// Create new one.
|
||||
os.MkdirAll(mdir, 0755)
|
||||
|
||||
// Make sure we have a lmb to write to.
|
||||
fs.newMsgBlockForWrite()
|
||||
|
||||
fs.lmb.first.seq = fs.state.FirstSeq
|
||||
fs.lmb.last.seq = fs.state.LastSeq
|
||||
fs.lmb.writeIndexInfo()
|
||||
|
||||
cb := fs.scb
|
||||
fs.mu.Unlock()
|
||||
|
||||
if cb != nil {
|
||||
cb(-bytes)
|
||||
cb(-rbytes)
|
||||
}
|
||||
|
||||
return purged
|
||||
@@ -1691,6 +1734,31 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
|
||||
go mb.close(true)
|
||||
}
|
||||
|
||||
// Called by purge to simply get rid of the cache and close and fds.
|
||||
// FIXME(dlc) - Merge with below func.
|
||||
func (mb *msgBlock) dirtyClose() {
|
||||
if mb == nil {
|
||||
return
|
||||
}
|
||||
mb.mu.Lock()
|
||||
// Close cache
|
||||
mb.cache = nil
|
||||
// Quit our loops.
|
||||
if mb.qch != nil {
|
||||
close(mb.qch)
|
||||
mb.qch = nil
|
||||
}
|
||||
if mb.mfd != nil {
|
||||
mb.mfd.Close()
|
||||
mb.mfd = nil
|
||||
}
|
||||
if mb.ifd != nil {
|
||||
mb.ifd.Close()
|
||||
mb.ifd = nil
|
||||
}
|
||||
mb.mu.Unlock()
|
||||
}
|
||||
|
||||
func (mb *msgBlock) close(sync bool) {
|
||||
if mb == nil {
|
||||
return
|
||||
@@ -1724,6 +1792,10 @@ func (fs *fileStore) closeLastMsgBlock(sync bool) {
|
||||
}
|
||||
|
||||
func (fs *fileStore) Delete() error {
|
||||
if fs.isClosed() {
|
||||
return ErrStoreClosed
|
||||
}
|
||||
// TODO(dlc) - check error here?
|
||||
fs.Purge()
|
||||
if err := fs.Stop(); err != nil {
|
||||
return err
|
||||
@@ -1735,7 +1807,7 @@ func (fs *fileStore) Stop() error {
|
||||
fs.mu.Lock()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return nil
|
||||
return ErrStoreClosed
|
||||
}
|
||||
fs.closed = true
|
||||
close(fs.qch)
|
||||
@@ -1763,6 +1835,7 @@ func (fs *fileStore) Stop() error {
|
||||
for _, o := range cfs {
|
||||
o.Stop()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1911,6 +1984,10 @@ func (fs *fileStore) streamSnapshot(w io.Closer, zw *zip.Writer) {
|
||||
// Create a snapshot of this stream and its consumer's state along with messages.
|
||||
func (fs *fileStore) Snapshot() (io.ReadCloser, error) {
|
||||
fs.mu.Lock()
|
||||
if fs.closed {
|
||||
fs.mu.Unlock()
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
// Mark us as snapshotting
|
||||
fs.sips += 1
|
||||
fs.mu.Unlock()
|
||||
@@ -1944,6 +2021,9 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
|
||||
if fs == nil {
|
||||
return nil, fmt.Errorf("filestore is nil")
|
||||
}
|
||||
if fs.isClosed() {
|
||||
return nil, ErrStoreClosed
|
||||
}
|
||||
if cfg == nil || name == "" {
|
||||
return nil, fmt.Errorf("bad consumer config")
|
||||
}
|
||||
|
||||
@@ -72,7 +72,10 @@ func TestFileStoreBasics(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
|
||||
storeDir := filepath.Join("", JetStreamStoreDir)
|
||||
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
|
||||
os.MkdirAll(storeDir, 0755)
|
||||
defer os.RemoveAll(storeDir)
|
||||
|
||||
fcfg := FileStoreConfig{StoreDir: storeDir}
|
||||
|
||||
if _, err := newFileStore(fcfg, StreamConfig{Storage: MemoryStorage}); err == nil {
|
||||
@@ -82,10 +85,6 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
|
||||
t.Fatalf("Expected an error with no name")
|
||||
}
|
||||
|
||||
// Make the directories to succeed in setup.
|
||||
os.MkdirAll(storeDir, 0755)
|
||||
defer os.RemoveAll(storeDir)
|
||||
|
||||
fs, err := newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
@@ -117,6 +116,11 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
|
||||
// Stop will flush to disk.
|
||||
fs.Stop()
|
||||
|
||||
// Make sure Store call after does not work.
|
||||
if _, err := fs.StoreMsg(subj, []byte("no work")); err == nil {
|
||||
t.Fatalf("Expected an error for StoreMsg call after Stop, got none")
|
||||
}
|
||||
|
||||
// Restart
|
||||
fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage})
|
||||
if err != nil {
|
||||
@@ -337,7 +341,8 @@ func TestFileStorePurge(t *testing.T) {
|
||||
os.MkdirAll(storeDir, 0755)
|
||||
defer os.RemoveAll(storeDir)
|
||||
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 64 * 1024}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
blkSize := uint64(64 * 1024)
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: blkSize}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -358,8 +363,9 @@ func TestFileStorePurge(t *testing.T) {
|
||||
t.Fatalf("Expected bytes to be %d, got %d", storedMsgSize*toStore, state.Bytes)
|
||||
}
|
||||
|
||||
if numBlocks := fs.numMsgBlocks(); numBlocks <= 1 {
|
||||
t.Fatalf("Expected to have more then 1 msg block, got %d", numBlocks)
|
||||
expectedBlocks := int(storedMsgSize * toStore / blkSize)
|
||||
if numBlocks := fs.numMsgBlocks(); numBlocks <= expectedBlocks {
|
||||
t.Fatalf("Expected to have more then %d msg blocks, got %d", blkSize, numBlocks)
|
||||
}
|
||||
|
||||
fs.Purge()
|
||||
@@ -389,7 +395,7 @@ func TestFileStorePurge(t *testing.T) {
|
||||
// Make sure we recover same state.
|
||||
fs.Stop()
|
||||
|
||||
fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 64 * 1024}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: blkSize}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -428,7 +434,7 @@ func TestFileStorePurge(t *testing.T) {
|
||||
purgeDir := path.Join(fs.fcfg.StoreDir, purgeDir)
|
||||
os.Rename(pdir, purgeDir)
|
||||
|
||||
fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 64 * 1024}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: blkSize}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
@@ -195,7 +195,6 @@ func (s *Server) shutdownJetStream() {
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, jsa := range jsas {
|
||||
jsa.flushState()
|
||||
s.js.disableJetStream(jsa)
|
||||
}
|
||||
|
||||
@@ -622,27 +621,6 @@ func (js *jetStream) disableJetStream(jsa *jsAccount) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush JetStream state for the account.
|
||||
func (jsa *jsAccount) flushState() error {
|
||||
if jsa == nil {
|
||||
return fmt.Errorf("jetstream not enabled for account")
|
||||
}
|
||||
|
||||
// Collect the streams.
|
||||
var _msets [64]*Stream
|
||||
msets := _msets[:0]
|
||||
jsa.mu.Lock()
|
||||
for _, mset := range jsa.streams {
|
||||
msets = append(msets, mset)
|
||||
}
|
||||
jsa.mu.Unlock()
|
||||
|
||||
for _, mset := range msets {
|
||||
mset.store.Stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// JetStreamEnabled is a helper to determine if jetstream is enabled for an account.
|
||||
func (a *Account) JetStreamEnabled() bool {
|
||||
if a == nil {
|
||||
|
||||
@@ -33,6 +33,8 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrStoreClosed is returned when the store has been closed
|
||||
ErrStoreClosed = errors.New("store is closed")
|
||||
// ErrStoreMsgNotFound when message was not found but was expected to be.
|
||||
ErrStoreMsgNotFound = errors.New("no message found")
|
||||
// ErrStoreEOF is returned when message seq is greater than the last sequence.
|
||||
|
||||
@@ -527,7 +527,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje
|
||||
// Check to see if we are over the account limit.
|
||||
seq, err = store.StoreMsg(subject, msg)
|
||||
if err != nil {
|
||||
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
|
||||
if err != ErrStoreClosed {
|
||||
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
|
||||
}
|
||||
response = []byte(fmt.Sprintf("-ERR '%v'", err))
|
||||
} else if jsa.limitsExceeded(stype) {
|
||||
c.Warnf("JetStream resource limits exceeded for account: %q", accName)
|
||||
|
||||
@@ -1609,6 +1609,7 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) {
|
||||
defer nc.Close()
|
||||
|
||||
sub, _ := nc.SubscribeSync(dsubj)
|
||||
nc.Flush()
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
// Send one message.
|
||||
@@ -3497,7 +3498,7 @@ func TestJetStreamSimpleFileStorageRecovery(t *testing.T) {
|
||||
delta := (runtime.NumGoroutine() - base)
|
||||
if delta > 3 {
|
||||
t.Logf("%d Go routines still exist post Shutdown()", delta)
|
||||
time.Sleep(30 * time.Second)
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
||||
s = RunBasicJetStreamServer()
|
||||
|
||||
Reference in New Issue
Block a user