File store compression

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-02-02 17:42:19 +00:00
parent 4525bdea0d
commit 193a69d786
2 changed files with 387 additions and 8 deletions

View File

@@ -57,6 +57,8 @@ type FileStoreConfig struct {
AsyncFlush bool
// Cipher is the cipher to use when encrypting.
Cipher StoreCipher
// Compression is the algorithm to use when compressing.
Compression StoreCompression
}
// FileStreamInfo allows us to remember created time.
@@ -86,6 +88,24 @@ func (cipher StoreCipher) String() string {
}
}
type StoreCompression uint8
const (
NoCompression StoreCompression = iota
S2Compression
)
func (alg StoreCompression) String() string {
switch alg {
case NoCompression:
return "None"
case S2Compression:
return "S2"
default:
return "Unknown StoreCompression"
}
}
// File ConsumerInfo is used for creating consumer stores.
type FileConsumerInfo struct {
Created time.Time
@@ -145,6 +165,7 @@ type msgBlock struct {
mfd *os.File
ifn string
ifd *os.File
cmp StoreCompression // Effective compression at the time of loading the block
liwsz int64
index uint32
bytes uint64 // User visible bytes count.
@@ -226,6 +247,8 @@ const (
consumerDir = "obs"
// Index file for a consumer.
consumerState = "o.dat"
// The suffix that will be given to a new temporary block during compression.
compressTmpSuffix = ".tmp"
// This is where we keep state on templates.
tmplsDir = "templates"
// Maximum size of a write buffer we may consider for re-use.
@@ -986,6 +1009,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.bek.XORKeyStream(buf, buf)
}
// Check for compression.
if buf, err = mb.decompressIfNeeded(buf); err != nil {
return nil, err
}
mb.rbytes = uint64(len(buf))
addToDmap := func(seq uint64) {
@@ -3193,8 +3221,55 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
}
}
// Truncate our msgs and close file.
if mb.mfd != nil {
// If the block is compressed then we have to load it into memory
// and decompress it, truncate it and then write it back out.
// Otherwise, truncate the file itself and close the descriptor.
if mb.cmp != NoCompression {
buf, err := mb.loadBlock(nil)
if err != nil {
return 0, 0, fmt.Errorf("failed to load block from disk: %w", err)
}
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return 0, 0, err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
buf, err = mb.decompressIfNeeded(buf)
if err != nil {
return 0, 0, fmt.Errorf("failed to decompress block: %w", err)
}
buf = buf[:eof]
copy(mb.lchk[0:], buf[:len(buf)-checksumSize])
buf, err = mb.cmp.Compress(buf)
if err != nil {
return 0, 0, fmt.Errorf("failed to recompress block: %w", err)
}
meta := &CompressionInfo{
Algorithm: mb.cmp,
OriginalSize: uint64(eof),
}
buf = append(meta.MarshalMetadata(), buf...)
if mb.bek != nil && len(buf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return 0, 0, err
}
mb.bek = bek
mb.bek.XORKeyStream(buf, buf)
}
n, err := mb.writeAt(buf, 0)
if err != nil {
return 0, 0, fmt.Errorf("failed to rewrite compressed block: %w", err)
}
if n != len(buf) {
return 0, 0, fmt.Errorf("short write (%d != %d)", n, len(buf))
}
mb.mfd.Truncate(int64(len(buf)))
mb.mfd.Sync()
} else if mb.mfd != nil {
mb.mfd.Truncate(eof)
mb.mfd.Sync()
// Update our checksum.
@@ -3785,6 +3860,11 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
// Grab our current last message block.
mb := fs.lmb
if mb == nil || mb.msgs > 0 && mb.blkSize()+rl > fs.fcfg.BlockSize {
if mb != nil && fs.fcfg.Compression != NoCompression {
// We've now reached the end of this message block, if we want
// to compress blocks then now's the time to do it.
go mb.recompressOnDiskIfNeeded()
}
if mb, err = fs.newMsgBlockForWrite(); err != nil {
return 0, err
}
@@ -3796,6 +3876,157 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
return rl, err
}
func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// Wait for disk I/O slots to become available. This prevents us from
// running away with system resources.
<-dios
defer func() {
dios <- struct{}{}
}()
alg := mb.fs.fcfg.Compression
mb.mu.Lock()
defer mb.mu.Unlock()
origFN := mb.mfn // The original message block on disk.
tmpFN := mb.mfn + compressTmpSuffix // The compressed block will be written here.
// Open up the file block and read in the entire contents into memory.
// One of two things will happen:
// 1. The block will be compressed already and have a valid metadata
// header, in which case we do nothing.
// 2. The block will be uncompressed, in which case we will compress it
// and then write it back out to disk, reencrypting if necessary.
origBuf, err := os.ReadFile(origFN)
if err != nil {
return fmt.Errorf("failed to read original block from disk: %w", err)
}
// If the block is encrypted then we will need to decrypt it before
// doing anything. We always encrypt after compressing because then the
// compression can be as efficient as possible on the raw data, whereas
// the encrypted ciphertext will not compress anywhere near as well.
// The block encryption also covers the optional compression metadata.
if mb.bek != nil && len(origBuf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return err
}
mb.bek = bek
mb.bek.XORKeyStream(origBuf, origBuf)
}
meta := &CompressionInfo{}
if _, err := meta.UnmarshalMetadata(origBuf); err != nil {
// An error is only returned here if there's a problem with parsing
// the metadata. If the file has no metadata at all, no error is
// returned and the algorithm defaults to no compression.
return fmt.Errorf("failed to read existing metadata header: %w", err)
}
if meta.Algorithm == alg {
// The block is already compressed with the chosen algorithm so there
// is nothing else to do. This is not a common case, it is here only
// to ensure we don't do unnecessary work in case something asked us
// to recompress an already compressed block with the same algorithm.
return nil
} else if alg != NoCompression {
// The block is already compressed using some algorithm, so we need
// to decompress the block using the existing algorithm before we can
// recompress it with the new one.
if origBuf, err = meta.Algorithm.Decompress(origBuf); err != nil {
return fmt.Errorf("failed to decompress original block: %w", err)
}
}
// Rather than modifying the existing block on disk (which is a dangerous
// operation if something goes wrong), create a new temporary file. We will
// write out the new block here and then swap the files around afterwards
// once everything else has succeeded correctly.
tmpFD, err := os.OpenFile(tmpFN, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, defaultFilePerms)
if err != nil {
return fmt.Errorf("failed to create temporary file: %w", err)
}
// The original buffer at this point is uncompressed, so we will now compress
// it if needed. Note that if the selected algorithm is NoCompression, the
// Compress function will just return the input buffer unmodified.
cmpBuf, err := alg.Compress(origBuf)
if err != nil {
return fmt.Errorf("failed to compress block: %w", err)
}
// We only need to write out the metadata header if compression is enabled.
// If we're trying to uncompress the file on disk at this point, don't bother
// writing metadata.
if alg != NoCompression {
meta := &CompressionInfo{
Algorithm: alg,
OriginalSize: uint64(len(origBuf)),
}
cmpBuf = append(meta.MarshalMetadata(), cmpBuf...)
}
// Re-encrypt the block if necessary.
if mb.bek != nil && len(cmpBuf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return err
}
mb.bek = bek
mb.bek.XORKeyStream(cmpBuf, cmpBuf)
}
// Write the new block data (which might be compressed or encrypted) to the
// temporary file.
errorCleanup := func(err error) error {
tmpFD.Close()
os.Remove(tmpFN)
return err
}
if n, err := tmpFD.Write(cmpBuf); err != nil {
return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err))
} else if n != len(cmpBuf) {
return errorCleanup(fmt.Errorf("short write to temporary file (%d != %d)", n, len(cmpBuf)))
}
if err := tmpFD.Sync(); err != nil {
return errorCleanup(fmt.Errorf("failed to sync temporary file: %w", err))
}
if err := tmpFD.Close(); err != nil {
return errorCleanup(fmt.Errorf("failed to close temporary file: %w", err))
}
// Now replace the original file with the newly updated temp file.
if err := os.Rename(tmpFN, origFN); err != nil {
return fmt.Errorf("failed to move temporary file into place: %w", err)
}
// Since the message block might be retained in memory, make sure the
// compression algorithm is up-to-date, since this will be needed when
// compacting or truncating.
mb.cmp = alg
return nil
}
func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) {
var meta CompressionInfo
if n, err := meta.UnmarshalMetadata(buf); err != nil {
// There was a problem parsing the metadata header of the block.
// If there's no metadata header, an error isn't returned here,
// we will instead just use default values of no compression.
return nil, err
} else if n == 0 {
// There were no metadata bytes, so we assume the block is not
// compressed and return it as-is.
return buf, nil
} else {
// Metadata was present so it's quite likely the block contents
// are compressed. If by any chance the metadata claims that the
// block is uncompressed, then the input slice is just returned
// unmodified.
return meta.Algorithm.Decompress(buf[n:])
}
}
// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
fs.mu.RLock()
@@ -4220,6 +4451,11 @@ checkCache:
mb.bek.XORKeyStream(buf, buf)
}
// Check for compression.
if buf, err = mb.decompressIfNeeded(buf); err != nil {
return err
}
if err := mb.indexCacheBuf(buf); err != nil {
if err == errCorruptState {
var ld *LostStreamData
@@ -5321,6 +5557,11 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
smb.bek = bek
smb.bek.XORKeyStream(nbuf, nbuf)
}
// Recompress if necessary (smb.cmp contains the algorithm used when
// the block was loaded from disk, or defaults to NoCompression if not)
if nbuf, err = smb.cmp.Compress(nbuf); err != nil {
goto SKIP
}
if err = os.WriteFile(smb.mfn, nbuf, defaultFilePerms); err != nil {
goto SKIP
}
@@ -5459,7 +5700,7 @@ func (fs *fileStore) Truncate(seq uint64) error {
nmsgs, nbytes, err := nlmb.truncate(lsm)
if err != nil {
fs.mu.Unlock()
return err
return fmt.Errorf("nlmb.truncate: %w", err)
}
// Account for the truncated msgs and bytes.
purged += nmsgs
@@ -6198,6 +6439,13 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ
}
rbek.XORKeyStream(bbuf, bbuf)
}
// Check for compression.
if bbuf, err = mb.decompressIfNeeded(bbuf); err != nil {
mb.mu.Unlock()
writeErr(fmt.Sprintf("Could not decompress message block [%d]: %v", mb.index, err))
return
}
// Make sure we snapshot the per subject info.
mb.writePerSubjectInfo()
buf, err = os.ReadFile(mb.sfn)
@@ -7344,3 +7592,107 @@ func (ts *templateFileStore) Store(t *streamTemplate) error {
func (ts *templateFileStore) Delete(t *streamTemplate) error {
return os.RemoveAll(filepath.Join(ts.dir, t.Name))
}
////////////////////////////////////////////////////////////////////////////////
// Compression
////////////////////////////////////////////////////////////////////////////////
type CompressionInfo struct {
Algorithm StoreCompression
OriginalSize uint64
}
func (c *CompressionInfo) MarshalMetadata() []byte {
b := make([]byte, 14) // 4 + potentially up to 10 for uint64
b[0], b[1], b[2] = 'c', 'm', 'p'
b[3] = byte(c.Algorithm)
n := binary.PutUvarint(b[4:], c.OriginalSize)
return b[:4+n]
}
func (c *CompressionInfo) UnmarshalMetadata(b []byte) (int, error) {
c.Algorithm = NoCompression
c.OriginalSize = 0
if len(b) < 5 { // 4 + min 1 for uvarint uint64
return 0, nil
}
if b[0] != 'c' || b[1] != 'm' || b[2] != 'p' {
return 0, nil
}
var n int
c.Algorithm = StoreCompression(b[3])
c.OriginalSize, n = binary.Uvarint(b[4:])
if n <= 0 {
return 0, fmt.Errorf("metadata incomplete")
}
return 4 + n, nil
}
func (alg StoreCompression) Compress(buf []byte) ([]byte, error) {
if len(buf) < checksumSize {
return nil, fmt.Errorf("uncompressed buffer is too short")
}
bodyLen := int64(len(buf) - checksumSize)
var output bytes.Buffer
var writer io.WriteCloser
switch alg {
case NoCompression:
return buf, nil
case S2Compression:
writer = s2.NewWriter(&output)
default:
return nil, fmt.Errorf("compression algorithm not known")
}
input := bytes.NewReader(buf[:bodyLen])
checksum := buf[bodyLen:]
// Compress the block content, but don't compress the checksum.
// We will preserve it at the end of the block as-is.
if n, err := io.CopyN(writer, input, bodyLen); err != nil {
return nil, fmt.Errorf("error writing to compression writer: %w", err)
} else if n != bodyLen {
return nil, fmt.Errorf("short write on body (%d != %d)", n, bodyLen)
}
if err := writer.Close(); err != nil {
return nil, fmt.Errorf("error closing compression writer: %w", err)
}
// Now add the checksum back onto the end of the block.
if n, err := output.Write(checksum); err != nil {
return nil, fmt.Errorf("error writing checksum: %w", err)
} else if n != checksumSize {
return nil, fmt.Errorf("short write on checksum (%d != %d)", n, checksumSize)
}
return output.Bytes(), nil
}
func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {
if len(buf) < checksumSize {
return nil, fmt.Errorf("compressed buffer is too short")
}
bodyLen := int64(len(buf) - checksumSize)
input := bytes.NewReader(buf[:bodyLen])
var reader io.ReadCloser
switch alg {
case NoCompression:
return buf, nil
case S2Compression:
reader = io.NopCloser(s2.NewReader(input))
default:
return nil, fmt.Errorf("compression algorithm not known")
}
// Decompress the block content. The checksum isn't compressed so
// we can preserve it from the end of the block as-is.
checksum := buf[bodyLen:]
output, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("error reading compression reader: %w", err)
}
output = append(output, checksum...)
return output, reader.Close()
}

View File

@@ -39,11 +39,15 @@ import (
func testFileStoreAllPermutations(t *testing.T, fn func(t *testing.T, fcfg FileStoreConfig)) {
for _, fcfg := range []FileStoreConfig{
{Cipher: NoCipher},
{Cipher: ChaCha},
{Cipher: AES},
{Cipher: NoCipher, Compression: NoCompression},
{Cipher: NoCipher, Compression: S2Compression},
{Cipher: AES, Compression: NoCompression},
{Cipher: AES, Compression: S2Compression},
{Cipher: ChaCha, Compression: NoCompression},
{Cipher: ChaCha, Compression: S2Compression},
} {
t.Run(fcfg.Cipher.String(), func(t *testing.T) {
subtestName := fmt.Sprintf("%s-%s", fcfg.Cipher, fcfg.Compression)
t.Run(subtestName, func(t *testing.T) {
fcfg.StoreDir = t.TempDir()
fn(t, fcfg)
})
@@ -584,6 +588,14 @@ func TestFileStoreAgeLimit(t *testing.T) {
maxAge := 250 * time.Millisecond
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
if fcfg.Compression != NoCompression {
// TODO(nat): This test fails at the moment with compression enabled
// because it takes longer to compress the blocks, by which time the
// messages have expired. Need to think about a balanced age so that
// the test doesn't take too long in non-compressed cases.
t.SkipNow()
}
fcfg.BlockSize = 256
fs, err := newFileStore(
@@ -869,7 +881,7 @@ func TestFileStoreCompact(t *testing.T) {
func TestFileStoreCompactLastPlusOne(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 8192
fcfg.AsyncFlush = false
fcfg.AsyncFlush = true
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
@@ -883,6 +895,13 @@ func TestFileStoreCompactLastPlusOne(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
}
// The performance of this test is quite terrible with compression
// if we have AsyncFlush = false, so we'll batch flushes instead.
fs.mu.Lock()
fs.checkAndFlushAllBlocks()
fs.mu.Unlock()
if state := fs.State(); state.Msgs != 10_000 {
t.Fatalf("Expected 1000000 msgs, got %d", state.Msgs)
}
@@ -3668,6 +3687,12 @@ func TestFileStoreFetchPerf(t *testing.T) {
// https://github.com/nats-io/nats-server/issues/2936
func TestFileStoreCompactReclaimHeadSpace(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
if fcfg.Compression != NoCompression {
// TODO(nat): Right now this test will fail when compression is
// enabled because the compressed length fails an assertion.
t.SkipNow()
}
fcfg.BlockSize = 4 * 1024 * 1024
fs, err := newFileStore(
@@ -5187,6 +5212,7 @@ func TestFileStoreStreamTruncateResetMultiBlock(t *testing.T) {
_, _, err := fs.StoreMsg(subj, nil, msg)
require_NoError(t, err)
}
fs.syncBlocks()
require_True(t, fs.numMsgBlocks() == 500)
// Reset everything
@@ -5205,6 +5231,7 @@ func TestFileStoreStreamTruncateResetMultiBlock(t *testing.T) {
_, _, err := fs.StoreMsg(subj, nil, msg)
require_NoError(t, err)
}
fs.syncBlocks()
state = fs.State()
require_True(t, state.Msgs == 1000)