mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Make sure we have a correct next first after expiring on startup
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1311,6 +1311,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
}
|
||||
|
||||
var smv StoreMsg
|
||||
var needNextFirst bool
|
||||
|
||||
// Walk messages and remove if expired.
|
||||
mb.ensurePerSubjectInfoLoaded()
|
||||
@@ -1325,14 +1326,13 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
mb.dmap = nil
|
||||
}
|
||||
}
|
||||
// Keep this update just in case since we are removing dmap entries.
|
||||
mb.first.seq = seq
|
||||
// Keep this updated just in case since we are removing dmap entries.
|
||||
mb.first.seq, needNextFirst = seq, true
|
||||
continue
|
||||
}
|
||||
// Break on other errors.
|
||||
if err != nil || sm == nil {
|
||||
// Keep this update just in case since we could have removed dmap entries.
|
||||
mb.first.seq = seq
|
||||
mb.first.seq, needNextFirst = seq, true
|
||||
break
|
||||
}
|
||||
|
||||
@@ -1340,6 +1340,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
|
||||
// Check for done.
|
||||
if minAge < sm.ts {
|
||||
mb.first.seq, needNextFirst = sm.seq, false
|
||||
mb.first.seq = sm.seq
|
||||
mb.first.ts = sm.ts
|
||||
nts = sm.ts
|
||||
@@ -1348,6 +1349,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
|
||||
// Delete the message here.
|
||||
if mb.msgs > 0 {
|
||||
mb.first.seq, needNextFirst = seq, true
|
||||
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
||||
mb.bytes -= sz
|
||||
bytes += sz
|
||||
@@ -1359,7 +1361,10 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
mb.removeSeqPerSubject(sm.subj, seq, nil)
|
||||
fs.removePerSubject(sm.subj)
|
||||
}
|
||||
|
||||
// Make sure we have a proper next first sequence.
|
||||
if needNextFirst {
|
||||
mb.selectNextFirst()
|
||||
}
|
||||
// Check if empty after processing, could happen if tail of messages are all deleted.
|
||||
needWriteIndex := true
|
||||
if mb.msgs == 0 {
|
||||
@@ -2365,7 +2370,7 @@ func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, erro
|
||||
|
||||
// skipMsg will update this message block for a skipped message.
|
||||
// If we do not have any messages, just update the metadata, otherwise
|
||||
// we will place and empty record marking the sequence as used. The
|
||||
// we will place an empty record marking the sequence as used. The
|
||||
// sequence will be marked erased.
|
||||
// fs lock should be held.
|
||||
func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
|
||||
|
||||
Reference in New Issue
Block a user