Make sure to reset block encryption counter when clearing block but holding state for tracking sequences.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-07-31 07:59:19 -07:00
parent 8dc1e4b6de
commit 717969510d
2 changed files with 71 additions and 7 deletions

View File

@@ -3528,24 +3528,22 @@ checkCache:
// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
rbek, err := chacha20.NewUnauthenticatedCipher(mb.seed, mb.nonce)
bek, err := chacha20.NewUnauthenticatedCipher(mb.seed, mb.nonce)
if err != nil {
return err
}
rbek.XORKeyStream(buf, buf)
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
if err := mb.indexCacheBuf(buf); err != nil {
if err == errCorruptState {
fs := mb.fs
mb.mu.Unlock()
var ld *LostStreamData
if ld, err = mb.rebuildState(); ld != nil {
if ld, err = mb.rebuildStateLocked(); ld != nil {
// We do not know if fs is locked or not at this point.
// This should be an exceptional condition so do so in Go routine.
go fs.rebuildState(ld)
go mb.fs.rebuildState(ld)
}
mb.mu.Lock()
}
if err != nil {
return err
@@ -4725,6 +4723,13 @@ func (mb *msgBlock) closeAndKeepIndex() {
if mb.sfn != _EMPTY_ {
os.Remove(mb.sfn)
}
// If we are encrypted we should reset our bek counter.
if mb.bek != nil {
if bek, err := chacha20.NewUnauthenticatedCipher(mb.seed, mb.nonce); err == nil {
mb.bek = bek
}
}
}
// Called by purge to simply get rid of the cache and close our fds.

View File

@@ -4067,3 +4067,62 @@ func TestFileStoreDoubleCompactWithWriteInBetweenEncryptedBug(t *testing.T) {
t.Fatalf("Expected last sequence to be 10 but got %d", state.LastSeq)
}
}
// When we kept the empty block for tracking sequence, we needed to reset the bek
// counter when encrypted for subsequent writes to be correct. The bek in place could
// possibly still have a non-zero counter from previous writes.
// Happens when all messages expire and the are flushed and then subsequent writes occur.
func TestFileStoreEncryptedKeepIndexNeedBekResetBug(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer os.RemoveAll(storeDir)
prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
ttl := 250 * time.Millisecond
fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{Name: "zzz", Storage: FileStorage, MaxAge: ttl},
time.Now(),
prf,
)
require_NoError(t, err)
defer fs.Stop()
subj, msg := "foo", []byte("ouch")
for i := 0; i < 5; i++ {
fs.StoreMsg(subj, nil, msg)
}
// What to go to 0.
// This will leave the marker.
checkFor(t, time.Second, ttl, func() error {
if state := fs.State(); state.Msgs != 0 {
return fmt.Errorf("Expected no msgs, got %d", state.Msgs)
}
return nil
})
// Now write additional messages.
for i := 0; i < 5; i++ {
fs.StoreMsg(subj, nil, msg)
}
// Make sure the buffer is cleared.
fs.mu.RLock()
mb := fs.lmb
fs.mu.RUnlock()
mb.mu.Lock()
mb.clearCacheAndOffset()
mb.mu.Unlock()
// Now make sure we can read.
var smv StoreMsg
_, err = fs.LoadMsg(10, &smv)
require_NoError(t, err)
}