Merge pull request #2878 from nats-io/key_file_leak

Cleanup key files when removing message blocks.
This commit is contained in:
Derek Collison
2022-02-17 13:26:41 -07:00
committed by GitHub
2 changed files with 73 additions and 0 deletions

View File

@@ -122,6 +122,7 @@ type msgBlock struct {
msgs uint64 // User visible message count.
fss map[string]*SimpleState
sfn string
kfn string
lwits int64
lwts int64
llts int64
@@ -192,6 +193,8 @@ const (
fssScanAll = "*.fss"
// used to store our block encryption key.
keyScan = "%d.key"
// to look for orphans
keyScanAll = "*.key"
// This is where we keep state on consumers.
consumerDir = "obs"
// Index file for a consumer.
@@ -918,6 +921,23 @@ func (fs *fileStore) recoverMsgs() error {
os.Remove(fn)
}
}
// Same bug for keyfiles but for these we just need to identify orphans.
if kms, err := filepath.Glob(path.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
valid := make(map[uint64]bool)
for _, mb := range fs.blks {
valid[mb.index] = true
}
for _, fn := range kms {
var index uint64
shouldRemove := true
if n, err := fmt.Sscanf(path.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
shouldRemove = false
}
if shouldRemove {
os.Remove(fn)
}
}
}
// Limits checks and enforcement.
fs.enforceMsgLimit()
@@ -1564,6 +1584,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
if err := ioutil.WriteFile(keyFile, encrypted, defaultFilePerms); err != nil {
return err
}
mb.kfn = keyFile
return nil
}
@@ -4344,6 +4365,9 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
os.Remove(mb.sfn)
mb.sfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
os.Remove(mb.kfn)
}
}
}

View File

@@ -40,7 +40,9 @@ import (
"testing"
"time"
"crypto/hmac"
crand "crypto/rand"
"crypto/sha256"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
@@ -4324,3 +4326,50 @@ func TestNoRaceFileStoreSubjectInfoWithSnapshotCleanup(t *testing.T) {
t.Fatalf("Expected to find no fss files, found %d", len(fms))
}
}
func TestNoRaceFileStoreKeyFileCleanup(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)
prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024},
StreamConfig{Name: "TEST", Storage: FileStorage},
time.Now(),
prf)
require_NoError(t, err)
defer fs.Stop()
n, msg := 10_000, []byte(strings.Repeat("Z", 1024))
for i := 0; i < n; i++ {
_, _, err := fs.StoreMsg(fmt.Sprintf("X.%d", i), nil, msg)
require_NoError(t, err)
}
var seqs []uint64
for i := 1; i <= n; i++ {
seqs = append(seqs, uint64(i))
}
// Randomly delete msgs, make sure we cleanup as we empty the message blocks.
rand.Shuffle(len(seqs), func(i, j int) { seqs[i], seqs[j] = seqs[j], seqs[i] })
for _, seq := range seqs {
_, err := fs.RemoveMsg(seq)
require_NoError(t, err)
}
// We will have cleanup the main .blk and .idx sans the lmb, but we should not have any *.fss files.
kms, err := filepath.Glob(path.Join(storeDir, msgDir, keyScanAll))
require_NoError(t, err)
if len(kms) > 1 {
t.Fatalf("Expected to find only 1 key file, found %d", len(kms))
}
}