Merge pull request #3283 from nats-io/short_idx_write

[FIXED] Short index write could lead to loss of stream sequence for an empty stream
This commit is contained in:
Ivan Kozlovic
2022-07-22 11:29:50 -06:00
committed by GitHub
2 changed files with 80 additions and 3 deletions

View File

@@ -2153,7 +2153,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
if firstSeqNeedsUpdate {
fs.selectNextFirst()
// Write out the new first message block if we have one.
if len(fs.blks) > 0 {
// We can ignore if we really have not changed message blocks from above.
if len(fs.blks) > 0 && fs.blks[0] != mb {
fmb := fs.blks[0]
fmb.writeIndexInfo()
}
@@ -4030,6 +4031,14 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
mb.lwits = time.Now().UnixNano()
// Check if this will be a short write, and if so truncate before writing here.
if int64(len(buf)) < mb.liwsz {
if err := mb.ifd.Truncate(0); err != nil {
mb.werr = err
return err
}
}
var err error
if n, err = mb.ifd.WriteAt(buf, 0); err == nil {
mb.liwsz = int64(n)
@@ -4682,10 +4691,10 @@ func (mb *msgBlock) closeAndKeepIndex() {
// We were closed, so just write out an empty file.
ioutil.WriteFile(mb.mfn, nil, defaultFilePerms)
}
// Close
mb.dirtyCloseWithRemove(false)
// Make sure to write the index file so we can remember last seq and ts.
mb.writeIndexInfoLocked()
// Close
mb.dirtyCloseWithRemove(false)
// Clear any fss.
if mb.sfn != _EMPTY_ {

View File

@@ -3907,3 +3907,71 @@ func TestFileStorePurgeExWithSubject(t *testing.T) {
fs.PurgeEx("foo", 1, 0)
require_True(t, fs.State().Msgs == 0)
}
// When the N.idx file is shorter than the previous write we could fail to recover the idx properly.
// For instance, with encryption and an expiring stream that has no messages, when a restart happens the decrypt will fail
// since their are extra bytes, and this could lead to a stream sequence reset to zero.
func TestFileStoreShortIndexWriteBug(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
// Encrypted mode shows, but could effect non-encrypted mode.
prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("offby1"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
created := time.Now()
fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "TEST", Storage: FileStorage, MaxAge: time.Second},
created,
prf,
)
require_NoError(t, err)
defer fs.Stop()
for i := 0; i < 100; i++ {
_, _, err = fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}
// Wait til messages all go away.
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
if state := fs.State(); state.Msgs != 0 {
return fmt.Errorf("Expected no msgs, got %d", state.Msgs)
}
return nil
})
if state := fs.State(); state.FirstSeq != 101 {
t.Fatalf("Expected first sequence of 101 vs %d", state.FirstSeq)
}
// I noticed that we also would dangle an open ifd when we did closeAndKeepIndex(), check that we do not anymore.
fs.mu.RLock()
mb := fs.lmb
mb.mu.RLock()
hasIfd := mb.ifd != nil
mb.mu.RUnlock()
fs.mu.RUnlock()
require_False(t, hasIfd)
// Now restart..
fs.Stop()
fs, err = newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "TEST", Storage: FileStorage, MaxAge: time.Second},
created,
prf,
)
require_NoError(t, err)
defer fs.Stop()
if state := fs.State(); state.FirstSeq != 101 {
t.Fatalf("Expected first sequence of 101 vs %d", state.FirstSeq)
}
}