mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 10:40:41 -07:00
When a JetStream stream was used as a KV, there could be times where we have lots of file storage unused.
This change introduces utilization, better interior block deletes, and individual block compaction when we are below 50% utilization of the block. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -178,6 +178,8 @@ const (
|
||||
purgeDir = "__msgs__"
|
||||
// used to scan blk file names.
|
||||
blkScan = "%d.blk"
|
||||
// used for compacted blocks that are staged.
|
||||
newScan = "%d.new"
|
||||
// used to scan index file names.
|
||||
indexScan = "%d.idx"
|
||||
// used to load per subject meta information.
|
||||
@@ -216,8 +218,12 @@ const (
|
||||
defaultStreamBlockSize = 16 * 1024 * 1024 // 16MB
|
||||
// Default for workqueue or interest based.
|
||||
defaultOtherBlockSize = 8 * 1024 * 1024 // 8MB
|
||||
// Default for KV based
|
||||
defaultKVBlockSize = 8 * 1024 * 1024 // 8MB
|
||||
// max block size for now.
|
||||
maxBlockSize = defaultStreamBlockSize
|
||||
// Compact minimum threshold.
|
||||
compactMinimum = 2 * 1024 * 1024 // 2MB
|
||||
// FileStoreMinBlkSize is minimum size we will do for a blk size.
|
||||
FileStoreMinBlkSize = 32 * 1000 // 32kib
|
||||
// FileStoreMaxBlkSize is maximum size we will do for a blk size.
|
||||
@@ -495,7 +501,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e
|
||||
ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
|
||||
if err != nil {
|
||||
// We do not seem to have keys even though we should. Could be a plaintext conversion.
|
||||
// Create the keys and we will doubel check below.
|
||||
// Create the keys and we will double check below.
|
||||
if err := fs.genEncryptionKeysForBlock(mb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -635,7 +641,10 @@ func (fs *fileStore) rebuildState(ld *LostStreamData) {
|
||||
func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
|
||||
mb.mu.Lock()
|
||||
defer mb.mu.Unlock()
|
||||
return mb.rebuildStateLocked()
|
||||
}
|
||||
|
||||
func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
|
||||
startLastSeq := mb.last.seq
|
||||
|
||||
// Clear state we need to rebuild.
|
||||
@@ -791,7 +800,6 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
|
||||
|
||||
mb.msgs++
|
||||
mb.bytes += uint64(rl)
|
||||
mb.rbytes += uint64(rl)
|
||||
|
||||
// Do per subject info.
|
||||
if mb.fss != nil {
|
||||
@@ -1812,12 +1820,23 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Out of order delete.
|
||||
if mb.dmap == nil {
|
||||
mb.dmap = make(map[uint64]struct{})
|
||||
// Check if we are empty first, as long as not the last message block.
|
||||
if isLast := mb != fs.lmb; isLast && mb.msgs == 0 {
|
||||
fs.removeMsgBlock(mb)
|
||||
firstSeqNeedsUpdate = seq == fs.state.FirstSeq
|
||||
} else {
|
||||
// Out of order delete.
|
||||
shouldWriteIndex = true
|
||||
if mb.dmap == nil {
|
||||
mb.dmap = make(map[uint64]struct{})
|
||||
}
|
||||
mb.dmap[seq] = struct{}{}
|
||||
}
|
||||
mb.dmap[seq] = struct{}{}
|
||||
shouldWriteIndex = true
|
||||
}
|
||||
|
||||
// Check if <50% utilization and minimum size met.
|
||||
if mb.rbytes > compactMinimum && mb.rbytes>>1 > mb.bytes {
|
||||
mb.compact()
|
||||
}
|
||||
|
||||
var qch, fch chan struct{}
|
||||
@@ -1870,6 +1889,93 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
|
||||
// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
|
||||
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
|
||||
// Write lock needs to be held.
|
||||
func (mb *msgBlock) compact() {
|
||||
if !mb.cacheAlreadyLoaded() {
|
||||
if err := mb.loadMsgsWithLock(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
buf := mb.cache.buf
|
||||
nbuf := make([]byte, 0, len(buf))
|
||||
|
||||
var le = binary.LittleEndian
|
||||
var firstSet bool
|
||||
|
||||
isDeleted := func(seq uint64) bool {
|
||||
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
|
||||
return true
|
||||
}
|
||||
if mb.dmap != nil {
|
||||
if _, ok := mb.dmap[seq]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
|
||||
if index+msgHdrSize >= lbuf {
|
||||
return
|
||||
}
|
||||
hdr := buf[index : index+msgHdrSize]
|
||||
rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:])
|
||||
// Clear any headers bit that could be set.
|
||||
rl &^= hbit
|
||||
dlen := int(rl) - msgHdrSize
|
||||
// Do some quick sanity checks here.
|
||||
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 || index+rl > lbuf {
|
||||
return
|
||||
}
|
||||
// Only need to process non-deleted messages.
|
||||
if seq := le.Uint64(hdr[4:]); !isDeleted(seq) {
|
||||
// Normal message here.
|
||||
nbuf = append(nbuf, buf[index:index+rl]...)
|
||||
if !firstSet {
|
||||
firstSet = true
|
||||
mb.first.seq = seq
|
||||
}
|
||||
mb.last.seq = seq
|
||||
}
|
||||
// Advance to next record.
|
||||
index += rl
|
||||
}
|
||||
|
||||
// Check for encryption.
|
||||
if mb.bek != nil && len(nbuf) > 0 {
|
||||
// Recreate to reset counter.
|
||||
rbek, err := chacha20.NewUnauthenticatedCipher(mb.seed, mb.nonce)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rbek.XORKeyStream(nbuf, nbuf)
|
||||
}
|
||||
|
||||
// We will write to a new file and mv/rename it in case of failure.
|
||||
mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
|
||||
defer os.Remove(mfn)
|
||||
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
|
||||
return
|
||||
}
|
||||
os.Rename(mfn, mb.mfn)
|
||||
|
||||
// Close cache and open FDs and index file.
|
||||
mb.clearCacheAndOffset()
|
||||
mb.closeFDsLocked()
|
||||
mb.removeIndexFileLocked()
|
||||
mb.deleteDmap()
|
||||
mb.rebuildStateLocked()
|
||||
}
|
||||
|
||||
// Nil out our dmap.
|
||||
func (mb *msgBlock) deleteDmap() {
|
||||
mb.dmap = nil
|
||||
}
|
||||
|
||||
// Grab info from a slot.
|
||||
// Lock should be held.
|
||||
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
|
||||
@@ -3290,6 +3396,18 @@ func (fs *fileStore) State() StreamState {
|
||||
return state
|
||||
}
|
||||
|
||||
func (fs *fileStore) Utilization() (total, reported uint64, err error) {
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
for _, mb := range fs.blks {
|
||||
mb.mu.RLock()
|
||||
reported += mb.bytes
|
||||
total += mb.rbytes
|
||||
mb.mu.RUnlock()
|
||||
}
|
||||
return total, reported, nil
|
||||
}
|
||||
|
||||
const emptyRecordLen = 22 + 8
|
||||
|
||||
func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 {
|
||||
@@ -3880,6 +3998,10 @@ func (fs *fileStore) numMsgBlocks() int {
|
||||
func (mb *msgBlock) removeIndexFile() {
|
||||
mb.mu.RLock()
|
||||
defer mb.mu.RUnlock()
|
||||
mb.removeIndexFileLocked()
|
||||
}
|
||||
|
||||
func (mb *msgBlock) removeIndexFileLocked() {
|
||||
if mb.ifd != nil {
|
||||
mb.ifd.Close()
|
||||
mb.ifd = nil
|
||||
|
||||
@@ -16,6 +16,8 @@ package server
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
@@ -3162,3 +3164,139 @@ func TestFileStoreExpireMsgsOnStart(t *testing.T) {
|
||||
checkFiltered("orders.5", SimpleState{Msgs: 5, First: 55, Last: 95})
|
||||
checkBlkState(0)
|
||||
}
|
||||
|
||||
func TestFileStoreSparseCompaction(t *testing.T) {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
defer removeDir(t, storeDir)
|
||||
|
||||
cfg := StreamConfig{Name: "KV", Subjects: []string{"kv.>"}, Storage: FileStorage}
|
||||
var fs *fileStore
|
||||
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes
|
||||
loadMsgs := func(n int) {
|
||||
t.Helper()
|
||||
for i := 1; i <= n; i++ {
|
||||
if _, _, err := fs.StoreMsg(fmt.Sprintf("kv.%d", i%10), nil, msg); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
checkState := func(msgs, first, last uint64) {
|
||||
t.Helper()
|
||||
if fs == nil {
|
||||
t.Fatalf("No fs")
|
||||
return
|
||||
}
|
||||
state := fs.State()
|
||||
if state.Msgs != msgs {
|
||||
t.Fatalf("Expected %d msgs, got %d", msgs, state.Msgs)
|
||||
}
|
||||
if state.FirstSeq != first {
|
||||
t.Fatalf("Expected %d as first, got %d", first, state.FirstSeq)
|
||||
}
|
||||
if state.LastSeq != last {
|
||||
t.Fatalf("Expected %d as last, got %d", last, state.LastSeq)
|
||||
}
|
||||
}
|
||||
|
||||
deleteMsgs := func(seqs ...uint64) {
|
||||
t.Helper()
|
||||
for _, seq := range seqs {
|
||||
removed, err := fs.RemoveMsg(seq)
|
||||
if err != nil || !removed {
|
||||
t.Fatalf("Got an error on remove of %d: %v", seq, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eraseMsgs := func(seqs ...uint64) {
|
||||
t.Helper()
|
||||
for _, seq := range seqs {
|
||||
removed, err := fs.EraseMsg(seq)
|
||||
if err != nil || !removed {
|
||||
t.Fatalf("Got an error on erase of %d: %v", seq, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
compact := func() {
|
||||
t.Helper()
|
||||
var ssb, ssa StreamState
|
||||
fs.FastState(&ssb)
|
||||
tb, ub, _ := fs.Utilization()
|
||||
|
||||
fs.mu.RLock()
|
||||
if len(fs.blks) == 0 {
|
||||
t.Fatalf("No blocks?")
|
||||
}
|
||||
mb := fs.blks[0]
|
||||
fs.mu.RUnlock()
|
||||
|
||||
mb.mu.Lock()
|
||||
mb.compact()
|
||||
mb.mu.Unlock()
|
||||
fs.FastState(&ssa)
|
||||
if !reflect.DeepEqual(ssb, ssa) {
|
||||
t.Fatalf("States do not match; %+v vs %+v", ssb, ssa)
|
||||
}
|
||||
ta, ua, _ := fs.Utilization()
|
||||
if ub != ua {
|
||||
t.Fatalf("Expected used to be the same, got %d vs %d", ub, ua)
|
||||
}
|
||||
if ta >= tb {
|
||||
t.Fatalf("Expected total after to be less then before, got %d vs %d", tb, ta)
|
||||
}
|
||||
if ta != ua {
|
||||
t.Fatalf("Expected compact to make total and used same, got %d vs %d", ta, ua)
|
||||
}
|
||||
}
|
||||
|
||||
// Actual testing here.
|
||||
loadMsgs(1000)
|
||||
checkState(1000, 1, 1000)
|
||||
|
||||
// Now delete a few messages.
|
||||
deleteMsgs(1)
|
||||
compact()
|
||||
|
||||
deleteMsgs(1000, 999, 998, 997)
|
||||
compact()
|
||||
|
||||
eraseMsgs(500, 502, 504, 506, 508, 510)
|
||||
compact()
|
||||
|
||||
// Now test encrypted mode.
|
||||
fs.Delete()
|
||||
|
||||
prf := func(context []byte) ([]byte, error) {
|
||||
h := hmac.New(sha256.New, []byte("dlc22"))
|
||||
if _, err := h.Write(context); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return h.Sum(nil), nil
|
||||
}
|
||||
|
||||
fs, err = newFileStoreWithCreated(FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, cfg, time.Now(), prf)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
loadMsgs(1000)
|
||||
checkState(1000, 1, 1000)
|
||||
|
||||
// Now delete a few messages.
|
||||
deleteMsgs(1)
|
||||
compact()
|
||||
|
||||
deleteMsgs(1000, 999, 998, 997)
|
||||
compact()
|
||||
|
||||
eraseMsgs(500, 502, 504, 506, 508, 510)
|
||||
compact()
|
||||
}
|
||||
|
||||
@@ -758,6 +758,12 @@ func (ms *memStore) State() StreamState {
|
||||
return state
|
||||
}
|
||||
|
||||
func (ms *memStore) Utilization() (total, reported uint64, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
return ms.state.Bytes, ms.state.Bytes, nil
|
||||
}
|
||||
|
||||
func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {
|
||||
return uint64(len(subj) + len(hdr) + len(msg) + 16) // 8*2 for seq + age
|
||||
}
|
||||
|
||||
@@ -2855,3 +2855,56 @@ func TestNoRaceJetStreamClusterExtendedStreamPurge(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoRaceJetStreamFileStoreCompaction(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
config := s.JetStreamConfig()
|
||||
if config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "KV",
|
||||
Subjects: []string{"KV.>"},
|
||||
MaxMsgsPerSubject: 1,
|
||||
}
|
||||
if _, err := js.AddStream(cfg); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
toSend := 10_000
|
||||
data := make([]byte, 4*1024)
|
||||
rand.Read(data)
|
||||
|
||||
// First one.
|
||||
js.PublishAsync("KV.FM", data)
|
||||
|
||||
for i := 0; i < toSend; i++ {
|
||||
js.PublishAsync(fmt.Sprintf("KV.%d", i+1), data)
|
||||
}
|
||||
// Do again and overwrite the previous batch.
|
||||
for i := 0; i < toSend; i++ {
|
||||
js.PublishAsync(fmt.Sprintf("KV.%d", i+1), data)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
// Now check by hand the utilization level.
|
||||
mset, err := s.GlobalAccount().lookupStream("KV")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
total, used, _ := mset.Store().Utilization()
|
||||
if pu := 100.0 * float32(used) / float32(total); pu < 90.0 {
|
||||
t.Fatalf("Utilization is less than 90%%, got %.2f", pu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,6 +90,7 @@ type StreamStore interface {
|
||||
Stop() error
|
||||
ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error)
|
||||
Snapshot(deadline time.Duration, includeConsumers, checkMsgs bool) (*SnapshotResult, error)
|
||||
Utilization() (total, reported uint64, err error)
|
||||
}
|
||||
|
||||
// RetentionPolicy determines how messages in a set are retained.
|
||||
|
||||
@@ -571,6 +571,9 @@ func (mset *stream) autoTuneFileStorageBlockSize(fsCfg *FileStoreConfig) {
|
||||
} else if mset.cfg.MaxMsgs > 0 {
|
||||
// Determine max message size to estimate.
|
||||
totalEstSize = mset.maxMsgSize() * uint64(mset.cfg.MaxMsgs)
|
||||
} else if mset.cfg.MaxMsgsPer > 0 {
|
||||
fsCfg.BlockSize = uint64(defaultKVBlockSize)
|
||||
return
|
||||
} else {
|
||||
// If nothing set will let underlying filestore determine blkSize.
|
||||
return
|
||||
@@ -3281,6 +3284,12 @@ func (mset *stream) stateWithDetail(details bool) StreamState {
|
||||
return state
|
||||
}
|
||||
|
||||
func (mset *stream) Store() StreamStore {
|
||||
mset.mu.RLock()
|
||||
defer mset.mu.RUnlock()
|
||||
return mset.store
|
||||
}
|
||||
|
||||
// Determines if the new proposed partition is unique amongst all consumers.
|
||||
// Lock should be held.
|
||||
func (mset *stream) partitionUnique(partition string) bool {
|
||||
|
||||
Reference in New Issue
Block a user