diff --git a/server/filestore.go b/server/filestore.go index 82d3124a..77a6a2f5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2153,7 +2153,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error if firstSeqNeedsUpdate { fs.selectNextFirst() // Write out the new first message block if we have one. - if len(fs.blks) > 0 { + // We can ignore if we really have not changed message blocks from above. + if len(fs.blks) > 0 && fs.blks[0] != mb { fmb := fs.blks[0] fmb.writeIndexInfo() } @@ -4030,6 +4031,14 @@ func (mb *msgBlock) writeIndexInfoLocked() error { mb.lwits = time.Now().UnixNano() + // Check if this will be a short write, and if so truncate before writing here. + if int64(len(buf)) < mb.liwsz { + if err := mb.ifd.Truncate(0); err != nil { + mb.werr = err + return err + } + } + var err error if n, err = mb.ifd.WriteAt(buf, 0); err == nil { mb.liwsz = int64(n) @@ -4682,10 +4691,10 @@ func (mb *msgBlock) closeAndKeepIndex() { // We were closed, so just write out an empty file. ioutil.WriteFile(mb.mfn, nil, defaultFilePerms) } - // Close - mb.dirtyCloseWithRemove(false) // Make sure to write the index file so we can remember last seq and ts. mb.writeIndexInfoLocked() + // Close + mb.dirtyCloseWithRemove(false) // Clear any fss. if mb.sfn != _EMPTY_ { diff --git a/server/filestore_test.go b/server/filestore_test.go index 9249199c..6b905312 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -3907,3 +3907,71 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { fs.PurgeEx("foo", 1, 0) require_True(t, fs.State().Msgs == 0) } + +// When the N.idx file is shorter than the previous write we could fail to recover the idx properly. +// For instance, with encryption and an expiring stream that has no messages, when a restart happens the decrypt will fail +// since their are extra bytes, and this could lead to a stream sequence reset to zero. +func TestFileStoreShortIndexWriteBug(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + // Encrypted mode shows, but could effect non-encrypted mode. + prf := func(context []byte) ([]byte, error) { + h := hmac.New(sha256.New, []byte("offby1")) + if _, err := h.Write(context); err != nil { + return nil, err + } + return h.Sum(nil), nil + } + + created := time.Now() + + fs, err := newFileStoreWithCreated( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "TEST", Storage: FileStorage, MaxAge: time.Second}, + created, + prf, + ) + require_NoError(t, err) + defer fs.Stop() + + for i := 0; i < 100; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + // Wait til messages all go away. + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + if state := fs.State(); state.Msgs != 0 { + return fmt.Errorf("Expected no msgs, got %d", state.Msgs) + } + return nil + }) + + if state := fs.State(); state.FirstSeq != 101 { + t.Fatalf("Expected first sequence of 101 vs %d", state.FirstSeq) + } + + // I noticed that we also would dangle an open ifd when we did closeAndKeepIndex(), check that we do not anymore. + fs.mu.RLock() + mb := fs.lmb + mb.mu.RLock() + hasIfd := mb.ifd != nil + mb.mu.RUnlock() + fs.mu.RUnlock() + require_False(t, hasIfd) + + // Now restart.. + fs.Stop() + fs, err = newFileStoreWithCreated( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "TEST", Storage: FileStorage, MaxAge: time.Second}, + created, + prf, + ) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); state.FirstSeq != 101 { + t.Fatalf("Expected first sequence of 101 vs %d", state.FirstSeq) + } +}