From 62fcfcd84db72ac7df2be529915bba80a3f29909 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 24 Oct 2019 07:44:46 -0700 Subject: [PATCH] filestore updates Signed-off-by: Derek Collison --- server/filestore.go | 225 ++++++++++++++++++++++----------------- server/filestore_test.go | 41 +++++++ 2 files changed, 171 insertions(+), 95 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index b7033ab1..6d5149c5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -24,7 +24,6 @@ import ( "io/ioutil" "os" "path" - "strings" "sync" "time" @@ -83,8 +82,6 @@ const ( blkScan = "%d.blk" // used to scan index file names. indexScan = "%d.idx" - // used to scan delete map file names. - dmapScan = "%d.dlm" // This is where we keep state on observers. obsDir = "obs" // Maximum size of a write buffer we may consider for re-use. @@ -178,85 +175,67 @@ func (ms *fileStore) recoverState() error { // FIXME(dlc) - Observables } -const msgHeaderSize = 22 +const msgHdrSize = 22 +const indexHdrSize = 56 -func (ms *fileStore) recoverBlock(fi os.FileInfo, index uint64) *msgBlock { +func (ms *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) *msgBlock { var le = binary.LittleEndian mb := &msgBlock{index: index} + mb.mfn = path.Join(ms.fcfg.StoreDir, msgDir, fi.Name()) + mb.ifn = path.Join(ms.fcfg.StoreDir, msgDir, fmt.Sprintf(indexScan, index)) + // Open up the message file, but we will try to recover from the index file. // We will check that the last checksums match. - mfile := path.Join(ms.fcfg.StoreDir, msgDir, fi.Name()) - file, err := os.Open(mfile) + file, err := os.Open(mb.mfn) if err != nil { return nil } defer file.Close() - // Check for presence of a delete map file. - dmapFile := path.Join(ms.fcfg.StoreDir, msgDir, fmt.Sprintf(dmapScan, index)) - if buf, err := ioutil.ReadFile(dmapFile); err == nil { - mb.dmap = make(map[uint64]struct{}) - fseq, i := binary.Uvarint(buf) - for { - seq, n := binary.Uvarint(buf[i:]) - if n <= 0 { - break - } - i += n - mb.dmap[seq+fseq] = struct{}{} - } - } - - // Check for an index file before processing raw message block file. - // We will make sure checksums match before we trust it. - indexFile := path.Join(ms.fcfg.StoreDir, msgDir, fmt.Sprintf(indexScan, index)) - if ifile, err := os.Open(indexFile); err == nil { - var le = binary.LittleEndian - var bs [56]byte - defer ifile.Close() - if n, _ := ifile.Read(bs[:]); n == len(bs) { - mb.msgs = le.Uint64(bs[0:]) - mb.bytes = le.Uint64(bs[8:]) - mb.first.seq = le.Uint64(bs[16:]) - mb.first.ts = int64(le.Uint64(bs[24:])) - mb.last.seq = le.Uint64(bs[32:]) - mb.last.ts = int64(le.Uint64(bs[40:])) - copy(mb.lchk[0:], bs[48:]) - // Quick sanity check here. - var lchk [8]byte - file.ReadAt(lchk[:], fi.Size()-8) - if bytes.Equal(lchk[:], mb.lchk[:]) { - ms.blks = append(ms.blks, mb) - ms.lmb = mb - return mb - } - // Fall back on the data file itself. - mb = &msgBlock{index: index, bytes: uint64(fi.Size())} + // Read our index file. Use this as source of truth if possible. + if err := mb.readIndexInfo(); err == nil { + // Quick sanity check here. + // Note this only checks that the message blk file is not newer then this file. + var lchk [8]byte + file.ReadAt(lchk[:], fi.Size()-8) + if bytes.Equal(lchk[:], mb.lchk[:]) { + ms.blks = append(ms.blks, mb) + ms.lmb = mb + return mb } + // Fall back on the data file itself. We will keep the delete map if present. + mb.msgs = 0 + mb.bytes = 0 + mb.first.seq = 0 } // Use data file itself to rebuild. - var hdr [msgHeaderSize]byte + var hdr [msgHdrSize]byte var offset int64 for { - // FIXME(dlc) - Might return EOF - n, err := file.ReadAt(hdr[:], offset) - if err != nil || n != msgHeaderSize { + if _, err := file.ReadAt(hdr[:], offset); err != nil { + // FIXME(dlc) - If this is not EOF we probably should try to fix. break } rl := le.Uint32(hdr[0:]) seq := le.Uint64(hdr[4:]) + ts := int64(le.Uint64(hdr[12:])) if mb.first.seq == 0 { mb.first.seq = seq + mb.first.ts = ts } mb.last.seq = seq + mb.last.ts = ts + mb.msgs++ + mb.bytes += uint64(rl) offset += int64(rl) } - + // Rewrite this to make sure we are synched. + mb.writeIndexInfo() ms.blks = append(ms.blks, mb) ms.lmb = mb return mb @@ -268,11 +247,11 @@ func (ms *fileStore) recoverMsgs() error { if err != nil { return fmt.Errorf("storage directory not readable") } - // FIXME(dlc) - Recover + // Recover the blocks. for _, fi := range fis { var index uint64 if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 { - if mb := ms.recoverBlock(fi, index); mb != nil { + if mb := ms.recoverMsgBlock(fi, index); mb != nil { if ms.stats.FirstSeq == 0 { ms.stats.FirstSeq = mb.first.seq } @@ -284,12 +263,24 @@ func (ms *fileStore) recoverMsgs() error { } } } + + // Limits checks and enforcement. + ms.enforceMsgLimit() + ms.enforceBytesLimit() + // Do age checks to, make sure to call in place. + if ms.cfg.MaxAge != 0 { + ms.startAgeChk() + ms.expireMsgs() + } + if len(ms.blks) == 0 { _, err = ms.newMsgBlockForWrite() } + return err } +// This rolls to a new append msg block. func (ms *fileStore) newMsgBlockForWrite() (*msgBlock, error) { var index uint64 @@ -443,7 +434,7 @@ func (ms *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *storedMsg) mb.dmap = make(map[uint64]struct{}) } mb.dmap[seq] = struct{}{} - mb.writeDeleteMap() + mb.writeIndexInfo() } } @@ -466,8 +457,10 @@ func (ms *fileStore) expireMsgs() { } else { ms.mu.Lock() if sm == nil { - ms.ageChk.Stop() - ms.ageChk = nil + if ms.ageChk != nil { + ms.ageChk.Stop() + ms.ageChk = nil + } } else { fireIn := time.Duration(sm.ts-now) + ms.cfg.MaxAge ms.ageChk.Reset(fireIn) @@ -499,28 +492,26 @@ func (ms *fileStore) flushLoop(fch, qch chan struct{}) { // Lock should be held. func (ms *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64, error) { - var le = binary.LittleEndian - var bs [msgHeaderSize]byte var err error - // Get size + // Get size for this message. rl := fileStoreMsgSize(subj, msg) - // Update accounting. + // Grab our current last message block. mb := ms.lmb - if mb == nil { - return 0, fmt.Errorf("no defined current message block") - } - - if mb.bytes+rl > ms.fcfg.BlockSize { + if mb == nil || mb.bytes+rl > ms.fcfg.BlockSize { if mb, err = ms.newMsgBlockForWrite(); err != nil { return 0, err } } + // Make sure we have room. + ms.wmb.Grow(int(rl)) + // Grab time ts := time.Now().UnixNano() + // Update accounting. // Update our index info. if mb.first.seq == 0 { mb.first.seq = seq @@ -531,23 +522,23 @@ func (ms *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64 mb.bytes += rl mb.msgs++ - // Make sure we have room. - ms.wmb.Grow(int(rl)) - // First write header, etc. - le.PutUint32(bs[0:], uint32(rl)) - le.PutUint64(bs[4:], seq) - le.PutUint64(bs[12:], uint64(ts)) - le.PutUint16(bs[20:], uint16(len(subj))) + var le = binary.LittleEndian + var hdr [msgHdrSize]byte + + le.PutUint32(hdr[0:], uint32(rl)) + le.PutUint64(hdr[4:], seq) + le.PutUint64(hdr[12:], uint64(ts)) + le.PutUint16(hdr[20:], uint16(len(subj))) // Now write to underlying buffer. - ms.wmb.Write(bs[:]) + ms.wmb.Write(hdr[:]) ms.wmb.WriteString(subj) ms.wmb.Write(msg) // Calculate hash. ms.hh.Reset() - ms.hh.Write(bs[4:12]) + ms.hh.Write(hdr[4:12]) ms.hh.Write([]byte(subj)) ms.hh.Write(msg) checksum := ms.hh.Sum(nil) @@ -601,12 +592,12 @@ func (ms *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *storedMsg { // Read until we get our message, or see a message that has higher sequence. for { - var hdr [msgHeaderSize]byte + var hdr [msgHdrSize]byte if _, err := io.ReadFull(r, hdr[:]); err != nil { break } rl := le.Uint32(hdr[0:]) - dlen := int(rl) - msgHeaderSize + dlen := int(rl) - msgHdrSize mseq := le.Uint64(hdr[4:]) if seq > mseq || mb.cache[mseq] != nil { // Skip over @@ -728,37 +719,77 @@ func (ms *fileStore) flushToFileLocked() { func (mb *msgBlock) writeIndexInfo() error { // msgs bytes fseq fts lseq lts var le = binary.LittleEndian - var bs [56]byte + var hdr [indexHdrSize]byte - le.PutUint64(bs[0:], mb.msgs) - le.PutUint64(bs[8:], mb.bytes) - le.PutUint64(bs[16:], mb.first.seq) - le.PutUint64(bs[24:], uint64(mb.first.ts)) - le.PutUint64(bs[32:], mb.last.seq) - le.PutUint64(bs[40:], uint64(mb.last.ts)) - copy(bs[48:], mb.lchk[0:]) - return ioutil.WriteFile(mb.ifn, bs[:], 0644) + le.PutUint64(hdr[0:], mb.msgs) + le.PutUint64(hdr[8:], mb.bytes) + le.PutUint64(hdr[16:], mb.first.seq) + le.PutUint64(hdr[24:], uint64(mb.first.ts)) + le.PutUint64(hdr[32:], mb.last.seq) + le.PutUint64(hdr[40:], uint64(mb.last.ts)) + // copy last checksum + copy(hdr[48:], mb.lchk[0:]) + buf := hdr[:] + // Append a delete map if needed + if len(mb.dmap) > 0 { + buf = append(buf, mb.genDeleteMap()...) + } + return ioutil.WriteFile(mb.ifn, buf, 0644) } -// Writes a delete map. -func (mb *msgBlock) writeDeleteMap() error { - // FIXME(dlc) - Make this more sane. - dfn := strings.Replace(mb.mfn, ".blk", ".dlm", 1) +func (mb *msgBlock) readIndexInfo() error { + fp, err := os.Open(mb.ifn) + if err != nil { + return err + } + defer fp.Close() + + var le = binary.LittleEndian + var hdr [indexHdrSize]byte + + if n, _ := fp.Read(hdr[:]); n != indexHdrSize { + defer os.Remove(mb.ifn) + return fmt.Errorf("bad index file") + } + // Header first + mb.msgs = le.Uint64(hdr[0:]) + mb.bytes = le.Uint64(hdr[8:]) + mb.first.seq = le.Uint64(hdr[16:]) + mb.first.ts = int64(le.Uint64(hdr[24:])) + mb.last.seq = le.Uint64(hdr[32:]) + mb.last.ts = int64(le.Uint64(hdr[40:])) + copy(mb.lchk[0:], hdr[48:]) + // Now check for presence of a delete map + if buf, err := ioutil.ReadAll(fp); err == nil { + mb.dmap = make(map[uint64]struct{}) + for i := 0; ; { + if seq, n := binary.Uvarint(buf[i:]); n <= 0 { + break + } else { + i += n + mb.dmap[seq+mb.first.seq] = struct{}{} + } + } + } + return nil +} + +func (mb *msgBlock) genDeleteMap() []byte { if len(mb.dmap) == 0 { - os.Remove(dfn) return nil } - buf := make([]byte, (len(mb.dmap)+1)*binary.MaxVarintLen64) - fseq := uint64(mb.first.seq) - n := binary.PutUvarint(buf, fseq) + buf := make([]byte, len(mb.dmap)*binary.MaxVarintLen64) + // We use first seq as an offset to cut down on size. + fseq, n := uint64(mb.first.seq), 0 for seq := range mb.dmap { + // This is for lazy cleanup as the first sequence moves up. if seq <= fseq { delete(mb.dmap, seq) } else { n += binary.PutUvarint(buf[n:], seq-fseq) } } - return ioutil.WriteFile(dfn, buf[:n], 0644) + return buf[:n] } func syncAndClose(mfd *os.File) { @@ -857,5 +888,9 @@ func (ms *fileStore) Stop() { ms.flushToFileLocked() ms.closeLastMsgBlock() ms.wmb = &bytes.Buffer{} + if ms.ageChk != nil { + ms.ageChk.Stop() + ms.ageChk = nil + } ms.mu.Unlock() } diff --git a/server/filestore_test.go b/server/filestore_test.go index 7cdd1167..8a6f3983 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -475,3 +475,44 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) { } } } + +func TestFileStoreAgeLimitRecovery(t *testing.T) { + maxAge := 10 * time.Millisecond + + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + ms, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, MsgSetConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer ms.Stop() + + // Store some messages. Does not really matter how many. + subj, msg := "foo", []byte("Hello World") + toStore := 100 + for i := 0; i < toStore; i++ { + ms.StoreMsg(subj, msg) + } + stats := ms.Stats() + if stats.Msgs != uint64(toStore) { + t.Fatalf("Expected %d msgs, got %d", toStore, stats.Msgs) + } + ms.Stop() + time.Sleep(2 * maxAge) + + ms, err = newFileStore(FileStoreConfig{StoreDir: storeDir}, MsgSetConfig{Name: "zzz", Storage: FileStorage, MaxAge: maxAge}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer ms.Stop() + + stats = ms.Stats() + if stats.Msgs != 0 { + t.Fatalf("Expected no msgs, got %d", stats.Msgs) + } + if stats.Bytes != 0 { + t.Fatalf("Expected no bytes, got %d", stats.Bytes) + } +}