Set initial min on dmap caused subtle bugs with dmap. Some minor cleanup.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-10-06 09:25:31 -07:00
parent 4e414f1f05
commit dd646f6b71
3 changed files with 111 additions and 52 deletions

View File

@@ -1632,20 +1632,22 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// First let's check the happy path, open the blk file that was the lmb when we created the full state.
// See if we have the last block available.
var matched bool
var mb *msgBlock
if mb = fs.bim[blkIndex]; mb != nil {
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
mb := fs.lmb
if mb == nil || mb.index != blkIndex {
fs.warn("Stream state block does not exist or index mismatch")
return errCorruptState
}
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}
// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
@@ -1669,7 +1671,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Check if we have to account for a partial message block.
if !matched && mb != nil && mb.index == nmb.index {
if err := fs.adjustAccounting(mb, nmb); err != nil {
fs.warn("Stream state could not adjust accounting: %v", err)
fs.warn("Stream state could not adjust accounting")
return err
}
}
@@ -1701,8 +1703,13 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error {
}
nmb.ensurePerSubjectInfoLoaded()
lookupAndAdjust := func(seq uint64) error {
var smv StoreMsg
// Walk all the original mb's sequences that were included in the stream state.
var smv StoreMsg
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
// If we had already declared it deleted we can move on since you can not undelete.
if mb.dmap.Exists(seq) {
continue
}
// Lookup the message.
sm, err := nmb.cacheLookup(seq, &smv)
if err != nil {
@@ -1714,29 +1721,10 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error {
if len(sm.subj) > 0 && fs.psim != nil {
fs.removePerSubject(sm.subj)
}
return nil
}
// Walk all the original mb's sequences that were included in the stream state.
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
// If we had already declared it deleted we can move on since you can not undelete.
if mb.dmap.Exists(seq) {
continue
}
// Lookup the message.
if err := lookupAndAdjust(seq); err != nil {
return err
}
}
// Now check to see if we had a higher first for the recovered state mb vs nmb.
if nmb.first.seq < mb.first.seq {
for seq := nmb.first.seq; seq < mb.first.seq; seq++ {
// Lookup the message.
if err := lookupAndAdjust(seq); err != nil {
return err
}
}
// Now set first for nmb.
nmb.first = mb.first
}
@@ -3329,7 +3317,9 @@ func (fs *fileStore) removePerSubject(subj string) {
// We do not update sense of fblk here but will do so when we resolve during lookup.
if info, ok := fs.psim[subj]; ok {
info.total--
if info.total == 0 {
if info.total == 1 {
info.fblk = info.lblk
} else if info.total == 0 {
delete(fs.psim, subj)
}
}
@@ -3499,10 +3489,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
}
}
} else if !isEmpty {
if mb.dmap.IsEmpty() {
// Mark initial base for delete set.
mb.dmap.SetInitialMin(mb.first.seq)
}
// Out of order delete.
mb.dmap.Insert(seq)
// Check if <25% utilization and minimum size met.
@@ -3592,10 +3578,7 @@ func (mb *msgBlock) compact() {
var firstSet bool
isDeleted := func(seq uint64) bool {
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
return true
}
return mb.dmap.Exists(seq)
return seq == 0 || seq&ebit != 0 || seq < mb.first.seq || mb.dmap.Exists(seq)
}
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
@@ -3638,6 +3621,12 @@ func (mb *msgBlock) compact() {
index += rl
}
// Handle compression
var err error
if nbuf, err = mb.cmp.Compress(nbuf); err != nil {
return
}
// Check for encryption.
if mb.bek != nil && len(nbuf) > 0 {
// Recreate to reset counter.
@@ -3652,7 +3641,7 @@ func (mb *msgBlock) compact() {
mb.closeFDsLocked()
// We will write to a new file and mv/rename it in case of failure.
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
mfn := filepath.Join(mb.fs.fcfg.StoreDir, msgDir, fmt.Sprintf(newScan, mb.index))
if err := os.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
os.Remove(mfn)
return
@@ -3662,7 +3651,7 @@ func (mb *msgBlock) compact() {
return
}
// Remove index file and wipe delete map, then rebuild.
// Wipe dmap and rebuild here.
mb.dmap.Empty()
mb.rebuildStateLocked()

View File

@@ -6136,7 +6136,7 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()
// Store 2 more msgs and delete 2 & 4, then another 2 msgs.
// Store 6 more msgs.
fs.StoreMsg("C", nil, msgA)
fs.StoreMsg("D", nil, msgZ)
fs.StoreMsg("E", nil, msgA)
@@ -6149,7 +6149,6 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
// Put back old stream state.
// This will test that we properly walk multiple blocks past where we snapshotted state.
fs.Stop()
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)
@@ -6165,6 +6164,77 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
})
}
// This tests we can successfully recover without having to rebuild the whole stream from a mid block index.db marker
// when they updated block has a removed entry.
// TODO(dlc) - This test will force a rebuild atm, leaving here for later.
func TestFileStoreFullStateMidBlockPastWAL(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1}
prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}
fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()
// This yields an internal record length of 50 bytes. So 2 msgs per blk.
msg := bytes.Repeat([]byte("Z"), 19)
// Store 5 msgs
fs.StoreMsg("A", nil, msg)
fs.StoreMsg("B", nil, msg)
fs.StoreMsg("C", nil, msg)
fs.StoreMsg("D", nil, msg)
fs.StoreMsg("E", nil, msg)
require_Equal(t, fs.numMsgBlocks(), 1)
fs.Stop()
// Grab the state from this stop.
sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(sfile)
require_NoError(t, err)
fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()
// Store 5 more messages, then remove seq 2, "B".
fs.StoreMsg("F", nil, msg)
fs.StoreMsg("G", nil, msg)
fs.StoreMsg("H", nil, msg)
fs.StoreMsg("I", nil, msg)
fs.StoreMsg("J", nil, msg)
fs.EraseMsg(2)
require_Equal(t, fs.numMsgBlocks(), 1)
state := fs.State()
fs.Stop()
// Put back old stream state.
// This will test that we properly walk multiple blocks past where we snapshotted state.
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)
fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()
if newState := fs.State(); !reflect.DeepEqual(state, newState) {
t.Fatalf("Restore state does not match:\n%+v\n%+v",
state, newState)
}
})
}
///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////

View File

@@ -8736,10 +8736,10 @@ func TestNoRaceBinaryStreamSnapshotEncodingBasic(t *testing.T) {
ss, err := DecodeStreamState(snap)
require_NoError(t, err)
require_True(t, ss.FirstSeq == 1)
require_True(t, ss.LastSeq == 3000)
require_True(t, ss.Msgs == 1000)
require_True(t, ss.Deleted.NumDeleted() == 2000)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 3000)
require_Equal(t, ss.Msgs, 1000)
require_Equal(t, ss.Deleted.NumDeleted(), 2000)
}
func TestNoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps(t *testing.T) {