Remove performance, dmap collapse logic

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-28 08:55:51 -07:00
parent 613d65c58c
commit 5cf03fd074
2 changed files with 232 additions and 27 deletions

View File

@@ -69,6 +69,8 @@ type msgBlock struct {
last msgId
cache map[uint64]*fileStoredMsg
dmap map[uint64]struct{}
dch chan struct{}
qch chan struct{}
lchk [8]byte
}
@@ -297,7 +299,7 @@ func (ms *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
if ms.lmb != nil {
index = ms.lmb.index + 1
ms.flushToFileLocked()
ms.flushPendingWritesLocked()
ms.closeLastMsgBlock(false)
} else {
index = 1
@@ -413,6 +415,7 @@ func (ms *fileStore) removeMsg(seq uint64, secure bool) bool {
sm = mb.cache[seq]
}
// FIXME(dlc) - We should not need this TBH.
if sm == nil {
sm = ms.readAndCacheMsgs(mb, seq)
}
@@ -425,15 +428,48 @@ func (ms *fileStore) removeMsg(seq uint64, secure bool) bool {
return true
}
// Loop on requests to write out our index file. This is used when calling
// remove for a message. Updates to the last.seq etc are handled by main
// flush loop when storing messages.
func (ms *fileStore) flushWriteIndexLoop(mb *msgBlock, dch, qch chan struct{}) {
for {
select {
case <-dch:
ms.mu.Lock()
mb.writeIndexInfo()
ms.mu.Unlock()
case <-qch:
return
}
}
}
func (mb *msgBlock) kickWriteFlusher() {
select {
case mb.dch <- struct{}{}:
default:
}
}
func (mb *msgBlock) selectNextFirst() {
var seq uint64
for seq = mb.first.seq + 1; seq <= mb.last.seq; seq++ {
if _, ok := mb.dmap[seq]; ok {
// We will move past this so we can delete the entry.
delete(mb.dmap, seq)
} else {
break
}
}
mb.first.seq = seq
}
// Lock should be held.
func (ms *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) {
// Update global accounting.
msz := fileStoreMsgSize(sm.subj, sm.msg)
ms.stats.Msgs--
ms.stats.Bytes -= msz
if seq == ms.stats.FirstSeq {
ms.stats.FirstSeq++
}
// Now local stats
mb.msgs--
@@ -442,13 +478,19 @@ func (ms *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
if mb.cache != nil {
delete(mb.cache, seq)
}
var shouldWriteIndex bool
// Optimize for FIFO case.
if seq == mb.first.seq {
mb.first.seq++
mb.selectNextFirst()
if seq == ms.stats.FirstSeq {
ms.stats.FirstSeq = mb.first.seq
}
if mb.first.seq > mb.last.seq {
ms.removeMsgBlock(mb)
} else {
mb.writeIndexInfo()
shouldWriteIndex = true
}
} else {
// Out of order delete.
@@ -456,11 +498,23 @@ func (ms *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
mb.dmap = make(map[uint64]struct{})
}
mb.dmap[seq] = struct{}{}
mb.writeIndexInfo()
shouldWriteIndex = true
}
if secure {
ms.eraseMsg(mb, sm)
}
if shouldWriteIndex {
if mb.dch == nil {
// Spin up the write flusher.
mb.qch = make(chan struct{})
mb.dch = make(chan struct{})
go ms.flushWriteIndexLoop(mb, mb.dch, mb.qch)
// Write first one in place.
mb.writeIndexInfo()
} else {
mb.kickWriteFlusher()
}
}
}
func (ms *fileStore) startAgeChk() {
@@ -537,7 +591,7 @@ func checkMsgBlockFile(fp *os.File, hh hash.Hash) []uint64 {
func (ms *fileStore) checkMsgs() []uint64 {
ms.mu.Lock()
if ms.wmb.Len() > 0 {
ms.flushToFileLocked()
ms.flushPendingWritesLocked()
}
ms.mu.Unlock()
@@ -577,7 +631,7 @@ func (ms *fileStore) flushLoop(fch, qch chan struct{}) {
for {
select {
case <-fch:
ms.flushToFile()
ms.flushPendingWrites()
case <-qch:
return
}
@@ -725,7 +779,7 @@ func (ms *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *fileStoredMsg {
// This detects if what we may be looking for is staged in the write buffer.
if mb == ms.lmb && ms.wmb.Len() > 0 {
ms.flushToFileLocked()
ms.flushPendingWritesLocked()
}
if mb.cache == nil {
mb.cache = make(map[uint64]*fileStoredMsg)
@@ -869,14 +923,14 @@ func fileStoreMsgSize(subj string, msg []byte) uint64 {
}
// Flush the write buffer to disk.
func (ms *fileStore) flushToFile() {
func (ms *fileStore) flushPendingWrites() {
ms.mu.Lock()
ms.flushToFileLocked()
ms.flushPendingWritesLocked()
ms.mu.Unlock()
}
// Lock should be held.
func (ms *fileStore) flushToFileLocked() {
func (ms *fileStore) flushPendingWritesLocked() {
mb := ms.lmb
if mb == nil {
return
@@ -919,8 +973,11 @@ func (mb *msgBlock) writeIndexInfo() error {
}
var err error
if mb.ifd != nil {
if n, _ := mb.ifd.WriteAt(buf, 0); n > 0 {
err = mb.ifd.Truncate(int64(n))
if fi, serr := mb.ifd.Stat(); serr == nil {
fsz := fi.Size()
if n, _ := mb.ifd.WriteAt(buf, 0); n > 0 && fsz > int64(n) {
err = mb.ifd.Truncate(int64(n))
}
}
} else {
err = ioutil.WriteFile(mb.ifn, buf, 0644)
@@ -994,11 +1051,22 @@ func syncAndClose(mfd, ifd *os.File) {
}
}
// Will return total number of dmapEntries for all msg blocks.
func (ms *fileStore) dmapEntries() int {
var total int
ms.mu.Lock()
for _, mb := range ms.blks {
total += len(mb.dmap)
}
ms.mu.Unlock()
return total
}
// Purge will remove all messages from this store.
// Will return the number of purged messages.
func (ms *fileStore) Purge() uint64 {
ms.mu.Lock()
ms.flushToFileLocked()
ms.flushPendingWritesLocked()
purged := ms.stats.Msgs
cb := ms.scb
bytes := int64(ms.stats.Bytes)
@@ -1066,19 +1134,37 @@ func (ms *fileStore) removeMsgBlock(mb *msgBlock) {
ms.lmb.last = mb.last
ms.lmb.writeIndexInfo()
}
mb.close(false)
}
func (mb *msgBlock) close(sync bool) {
if mb == nil {
return
}
if mb.qch != nil {
close(mb.qch)
mb.qch = nil
}
if sync {
syncAndClose(mb.mfd, mb.ifd)
} else {
go syncAndClose(mb.mfd, mb.ifd)
}
mb.mfd = nil
mb.ifd = nil
}
func (ms *fileStore) closeAllMsgBlocks(sync bool) {
for _, mb := range ms.blks {
mb.close(sync)
}
}
func (ms *fileStore) closeLastMsgBlock(sync bool) {
if ms.lmb == nil || ms.lmb.mfd == nil {
return
if ms.lmb != nil {
ms.lmb.close(sync)
}
if sync {
syncAndClose(ms.lmb.mfd, ms.lmb.ifd)
} else {
go syncAndClose(ms.lmb.mfd, ms.lmb.ifd)
}
ms.lmb.mfd = nil
ms.lmb.ifd = nil
ms.lmb = nil
}
@@ -1090,9 +1176,13 @@ func (ms *fileStore) Stop() {
}
ms.closed = true
close(ms.qch)
ms.flushToFileLocked()
ms.closeLastMsgBlock(true)
ms.flushPendingWritesLocked()
ms.wmb = &bytes.Buffer{}
ms.closeAllMsgBlocks(true)
ms.closeLastMsgBlock(true)
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil

View File

@@ -693,6 +693,80 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) {
}
}
func TestFileStoreCollapseDmap(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
subj, msg := "foo", []byte("Hello World!")
storedMsgSize := fileStoreMsgSize(subj, msg)
ms, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 4 * storedMsgSize},
MsgSetConfig{Name: "zzz", Storage: FileStorage},
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < 10; i++ {
ms.StoreMsg(subj, msg)
}
stats := ms.Stats()
if stats.Msgs != 10 {
t.Fatalf("Expected 10 msgs, got %d", stats.Msgs)
}
checkDmapTotal := func(total int) {
t.Helper()
if nde := ms.dmapEntries(); nde != total {
t.Fatalf("Expecting only %d entries, got %d", total, nde)
}
}
checkFirstSeq := func(seq uint64) {
t.Helper()
stats := ms.Stats()
if stats.FirstSeq != seq {
t.Fatalf("Expected first seq to be %d, got %d", seq, stats.FirstSeq)
}
}
// Now remove some out of order, forming gaps and entries in dmaps.
ms.RemoveMsg(2)
checkFirstSeq(1)
ms.RemoveMsg(4)
checkFirstSeq(1)
ms.RemoveMsg(8)
checkFirstSeq(1)
stats = ms.Stats()
if stats.Msgs != 7 {
t.Fatalf("Expected 7 msgs, got %d", stats.Msgs)
}
checkDmapTotal(3)
// Close gaps..
ms.RemoveMsg(1)
checkDmapTotal(2)
checkFirstSeq(3)
ms.RemoveMsg(3)
checkDmapTotal(1)
checkFirstSeq(5)
ms.RemoveMsg(5)
checkDmapTotal(1)
checkFirstSeq(6)
ms.RemoveMsg(7)
checkDmapTotal(2)
ms.RemoveMsg(6)
checkDmapTotal(0)
}
func TestFileStorePerf(t *testing.T) {
// Uncomment to run, holding place for now.
t.SkipNow()
@@ -763,4 +837,45 @@ func TestFileStorePerf(t *testing.T) {
fmt.Printf("time to read all back messages is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toStore)/tt.Seconds())
fmt.Printf("%s per sec\n", FriendlyBytes(int64(float64(toStore*storedMsgSize)/tt.Seconds())))
ms, err = newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 128 * 1024 * 1024},
MsgSetConfig{Name: "zzz", Storage: FileStorage},
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fmt.Printf("\nremoving [in order] %d msgs of %s each, totalling %s\n",
toStore,
FriendlyBytes(int64(storedMsgSize)),
FriendlyBytes(int64(toStore*storedMsgSize)),
)
start = time.Now()
for i := uint64(1); i <= toStore; i++ {
ms.RemoveMsg(i)
}
ms.Stop()
tt = time.Since(start)
fmt.Printf("time to remove all messages is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toStore)/tt.Seconds())
fmt.Printf("%s per sec\n", FriendlyBytes(int64(float64(toStore*storedMsgSize)/tt.Seconds())))
ms, err = newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 128 * 1024 * 1024},
MsgSetConfig{Name: "zzz", Storage: FileStorage},
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
stats := ms.Stats()
if stats.Msgs != 0 {
t.Fatalf("Expected no msgs, got %d", stats.Msgs)
}
if stats.Bytes != 0 {
t.Fatalf("Expected no bytes, got %d", stats.Bytes)
}
}