diff --git a/server/filestore.go b/server/filestore.go index 85b2d4d9..3c960ec9 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -602,6 +602,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.ifd = ifd + // Set cache time to creation time to start. + ts := time.Now().UnixNano() + mb.llts, mb.lrts, mb.lwts = ts, ts, ts + // We know we will need this so go ahead and spin up. mb.spinUpFlushLoop() @@ -1195,13 +1199,34 @@ func (mb *msgBlock) startCacheExpireTimer() { mb.resetCacheExpireTimer(0) } +// Used when we load in a message block. +// Lock should be held. +func (mb *msgBlock) clearCacheAndOffset() { + if mb.cache != nil { + mb.cache.off = 0 + mb.cache.wp = 0 + } + mb.clearCache() +} + // Lock should be held. func (mb *msgBlock) clearCache() { if mb.ctmr != nil { mb.ctmr.Stop() mb.ctmr = nil } - mb.cache = nil + if mb.cache == nil { + return + } + + if mb.cache.off == 0 { + mb.cache = nil + } else { + // Clear msgs and index. + mb.cache.buf = nil + mb.cache.idx = nil + mb.cache.wp = 0 + } } // Called to possibly expire a message block cache. @@ -1232,13 +1257,15 @@ func (mb *msgBlock) expireCache() { bufts = mb.lwts } - // Check for the underlying buffer first. + // Check for activity on the cache that would prevent us from expiring. if tns-bufts <= int64(mb.cexp) { mb.resetCacheExpireTimer(mb.cexp - time.Duration(tns-bufts)) return } // If we are here we will at least expire the core msg buffer. + // We need to capture offset in case we do a write next before a full load. + mb.cache.off += len(mb.cache.buf) mb.cache.buf = nil mb.cache.wp = 0 @@ -1617,7 +1644,6 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { if seq < fs.state.FirstSeq || seq > fs.state.LastSeq { return nil } - // blks are sorted in ascending order. // TODO(dlc) - Can be smarter here, when lots of blks maybe use binary search. // For now this is cache friendly for small to medium numbers of blks. @@ -1703,7 +1729,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { mb.cache.buf = buf mb.cache.idx = idx mb.cache.fseq = fseq - mb.cache.wp += len(buf) + mb.cache.wp += int(lbuf) return nil } @@ -1844,7 +1870,7 @@ func (mb *msgBlock) loadMsgs() error { checkCache: // Check to see if we have a full cache. - if mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 { + if mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 && len(mb.cache.buf) > 0 { return nil } @@ -1867,8 +1893,9 @@ checkCache: return err } + // Reset the cache since we just read everything in. // Make sure this is cleared in case we had a partial when we started. - mb.clearCache() + mb.clearCacheAndOffset() if err := mb.indexCacheBuf(buf); err != nil { return err @@ -1927,11 +1954,11 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { // Will do a lookup from cache assuming lock is held. func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) { - if mb.cache == nil { + if mb.cache == nil || len(mb.cache.idx) == 0 { return nil, errNoCache } - if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { + if seq < mb.first.seq || seq < mb.cache.fseq || seq > mb.last.seq { return nil, ErrStoreMsgNotFound } @@ -1947,6 +1974,11 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) { bi, _, hashChecked, _ := mb.slotInfo(int(seq - mb.cache.fseq)) + // Check if partial cache and we miss. + if mb.cache.off > 0 && bi <= uint32(mb.cache.off) { + return nil, errPartialCache + } + // We use the high bit to denote we have already checked the checksum. var hh hash.Hash64 if !hashChecked { @@ -1954,14 +1986,6 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) { mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit) } - // Check if partial - if mb.cache.off > 0 && bi < uint32(mb.cache.off) { - buf := mb.cache.buf - mb.cache.buf = nil - mb.cache.buf = buf - return nil, errPartialCache - } - li := int(bi) - mb.cache.off buf := mb.cache.buf[li:] @@ -2010,6 +2034,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) { fs.mu.RUnlock() return nil, err } + // TODO(dlc) - older design had a check to prefetch when we knew we were // loading in order and getting close to end of current mb. Should add // something like it back in. @@ -2385,7 +2410,7 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { return } // Close cache - mb.clearCache() + mb.clearCacheAndOffset() // Quit our loops. if mb.qch != nil { close(mb.qch) @@ -2423,7 +2448,7 @@ func (mb *msgBlock) close(sync bool) { } // Close cache - mb.clearCache() + mb.clearCacheAndOffset() // Quit our loops. if mb.qch != nil { close(mb.qch) diff --git a/server/filestore_test.go b/server/filestore_test.go index 9735a7c5..4c967770 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -853,6 +853,7 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) { // Make sure they expire. checkFor(t, time.Second, 2*maxAge, func() error { + t.Helper() state = fs.State() if state.Msgs != 0 { return fmt.Errorf("Expected no msgs, got %d", state.Msgs) @@ -1297,7 +1298,7 @@ func TestFileStoreReadCache(t *testing.T) { fs.StoreMsg(subj, nil, msg) } - // Wait for write cache portion to go to zero. + // Wait for cache to go to zero. checkFor(t, time.Second, 10*time.Millisecond, func() error { if csz := fs.cacheSize(); csz != 0 { return fmt.Errorf("cache size not 0, got %s", FriendlyBytes(int64(csz))) @@ -1331,6 +1332,32 @@ func TestFileStoreReadCache(t *testing.T) { } } +func TestFileStorePartialCacheExpiration(t *testing.T) { + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + cexp := 10 * time.Millisecond + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, CacheExpire: cexp}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + fs.StoreMsg("foo", nil, []byte("msg1")) + + // Should expire and be removed. + time.Sleep(2 * cexp) + fs.StoreMsg("bar", nil, []byte("msg2")) + + // Again wait for cache to expire. + time.Sleep(2 * cexp) + if _, _, _, _, err := fs.LoadMsg(1); err != nil { + t.Fatalf("Error loading message 1: %v", err) + } +} + func TestFileStoreSnapshot(t *testing.T) { storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) os.MkdirAll(storeDir, 0755) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 4529e4be..cbfce066 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -8856,3 +8856,89 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { }) } } + +func TestJetStreamStoredMsgsDontDisappearAfterCacheExpiration(t *testing.T) { + sc := &server.StreamConfig{ + Name: "MY_STREAM", + Storage: server.FileStorage, + Subjects: []string{"foo.>"}, + Retention: server.InterestPolicy, + } + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + // mset, err := s.GlobalAccount().AddStream(sc) + mset, err := s.GlobalAccount().AddStreamWithStore(sc, &server.FileStoreConfig{BlockSize: 128, CacheExpire: 15 * time.Millisecond}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc1 := clientConnectWithOldRequest(t, s) + defer nc1.Close() + + // Create a durable consumers + sub, _ := nc1.SubscribeSync(nats.NewInbox()) + defer sub.Unsubscribe() + nc1.Flush() + + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur", + DeliverSubject: sub.Subject, + FilterSubject: "foo.bar", + DeliverPolicy: server.DeliverNew, + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Unexpected error adding consumer: %v", err) + } + defer o.Delete() + + nc2 := clientConnectWithOldRequest(t, s) + defer nc2.Close() + + sendStreamMsg(t, nc2, "foo.bar", "msg1") + + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Did not get message: %v", err) + } + if string(msg.Data) != "msg1" { + t.Fatalf("Unexpected message: %q", msg.Data) + } + + nc1.Close() + + // Get the message from the stream + getMsgSeq := func(seq uint64) { + t.Helper() + mreq := &server.JSApiMsgGetRequest{Seq: seq} + req, err := json.Marshal(mreq) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + smsgj, err := nc2.Request(fmt.Sprintf(server.JSApiMsgGetT, sc.Name), req, time.Second) + if err != nil { + t.Fatalf("Could not retrieve stream message: %v", err) + } + if strings.Contains(string(smsgj.Data), "code") { + t.Fatalf("Error: %q", smsgj.Data) + } + } + + getMsgSeq(1) + + time.Sleep(time.Second) + + sendStreamMsg(t, nc2, "foo.bar", "msg2") + sendStreamMsg(t, nc2, "foo.bar", "msg3") + + getMsgSeq(1) + getMsgSeq(2) + getMsgSeq(3) +}