From 5a6bb3756405618453fca4bb7972b7ccdd20c4e1 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 4 Nov 2020 15:37:38 -0700 Subject: [PATCH 1/2] Test that shows message disappear from filestore Signed-off-by: Ivan Kozlovic --- test/jetstream_test.go | 83 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 4529e4be..56fe6705 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -8856,3 +8856,86 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { }) } } + +func TestJetStreamStoredMsgsDontDisappear(t *testing.T) { + sc := &server.StreamConfig{ + Name: "MY_STREAM", + Storage: server.FileStorage, + Subjects: []string{"foo.>"}, + Retention: server.InterestPolicy, + } + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + // mset, err := s.GlobalAccount().AddStream(sc) + mset, err := s.GlobalAccount().AddStreamWithStore(sc, &server.FileStoreConfig{BlockSize: 128, CacheExpire: 15 * time.Millisecond}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc1 := clientConnectWithOldRequest(t, s) + defer nc1.Close() + + // Create a durable consumers + sub, _ := nc1.SubscribeSync(nats.NewInbox()) + defer sub.Unsubscribe() + nc1.Flush() + + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur", + DeliverSubject: sub.Subject, + FilterSubject: "foo.bar", + DeliverPolicy: server.DeliverNew, + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Unexpected error adding consumer: %v", err) + } + defer o.Delete() + + nc2 := clientConnectWithOldRequest(t, s) + defer nc2.Close() + + sendStreamMsg(t, nc2, "foo.bar", "msg1") + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Did not get message: %v", err) + } + if string(msg.Data) != "msg1" { + t.Fatalf("Unexpected message: %q", msg.Data) + } + + nc1.Close() + + // Get the message from the stream + getMsgSeq := func(seq uint64) { + t.Helper() + mreq := &server.JSApiMsgGetRequest{Seq: seq} + req, err := json.Marshal(mreq) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + smsgj, err := nc2.Request(fmt.Sprintf(server.JSApiMsgGetT, sc.Name), req, time.Second) + if err != nil { + t.Fatalf("Could not retrieve stream message: %v", err) + } + if strings.Contains(string(smsgj.Data), "code") { + t.Fatalf("Error: %q", smsgj.Data) + } + } + getMsgSeq(1) + + time.Sleep(time.Second) + sendStreamMsg(t, nc2, "foo.bar", "msg2") + sendStreamMsg(t, nc2, "foo.bar", "msg3") + + getMsgSeq(1) + getMsgSeq(2) + getMsgSeq(3) +} From 67b1a3340133c06c64625539364cca920c71835d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 4 Nov 2020 20:01:53 -0800 Subject: [PATCH 2/2] Fix for partial cache overwrite bug. When we moved to a write through cache architecture we also moved the cache write to offset based instead of APPEND. We were inadvertently clearing our offset from our cache when we would clear which meant if the next operation was another write we would have the wrong offset and overwrite previous messages. Signed-off-by: Derek Collison --- server/filestore.go | 61 ++++++++++++++++++++++++++++------------ server/filestore_test.go | 29 ++++++++++++++++++- test/jetstream_test.go | 5 +++- 3 files changed, 75 insertions(+), 20 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 85b2d4d9..3c960ec9 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -602,6 +602,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.ifd = ifd + // Set cache time to creation time to start. + ts := time.Now().UnixNano() + mb.llts, mb.lrts, mb.lwts = ts, ts, ts + // We know we will need this so go ahead and spin up. mb.spinUpFlushLoop() @@ -1195,13 +1199,34 @@ func (mb *msgBlock) startCacheExpireTimer() { mb.resetCacheExpireTimer(0) } +// Used when we load in a message block. +// Lock should be held. +func (mb *msgBlock) clearCacheAndOffset() { + if mb.cache != nil { + mb.cache.off = 0 + mb.cache.wp = 0 + } + mb.clearCache() +} + // Lock should be held. func (mb *msgBlock) clearCache() { if mb.ctmr != nil { mb.ctmr.Stop() mb.ctmr = nil } - mb.cache = nil + if mb.cache == nil { + return + } + + if mb.cache.off == 0 { + mb.cache = nil + } else { + // Clear msgs and index. + mb.cache.buf = nil + mb.cache.idx = nil + mb.cache.wp = 0 + } } // Called to possibly expire a message block cache. @@ -1232,13 +1257,15 @@ func (mb *msgBlock) expireCache() { bufts = mb.lwts } - // Check for the underlying buffer first. + // Check for activity on the cache that would prevent us from expiring. if tns-bufts <= int64(mb.cexp) { mb.resetCacheExpireTimer(mb.cexp - time.Duration(tns-bufts)) return } // If we are here we will at least expire the core msg buffer. + // We need to capture offset in case we do a write next before a full load. + mb.cache.off += len(mb.cache.buf) mb.cache.buf = nil mb.cache.wp = 0 @@ -1617,7 +1644,6 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { if seq < fs.state.FirstSeq || seq > fs.state.LastSeq { return nil } - // blks are sorted in ascending order. // TODO(dlc) - Can be smarter here, when lots of blks maybe use binary search. // For now this is cache friendly for small to medium numbers of blks. @@ -1703,7 +1729,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { mb.cache.buf = buf mb.cache.idx = idx mb.cache.fseq = fseq - mb.cache.wp += len(buf) + mb.cache.wp += int(lbuf) return nil } @@ -1844,7 +1870,7 @@ func (mb *msgBlock) loadMsgs() error { checkCache: // Check to see if we have a full cache. - if mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 { + if mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 && len(mb.cache.buf) > 0 { return nil } @@ -1867,8 +1893,9 @@ checkCache: return err } + // Reset the cache since we just read everything in. // Make sure this is cleared in case we had a partial when we started. - mb.clearCache() + mb.clearCacheAndOffset() if err := mb.indexCacheBuf(buf); err != nil { return err @@ -1927,11 +1954,11 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { // Will do a lookup from cache assuming lock is held. func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) { - if mb.cache == nil { + if mb.cache == nil || len(mb.cache.idx) == 0 { return nil, errNoCache } - if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { + if seq < mb.first.seq || seq < mb.cache.fseq || seq > mb.last.seq { return nil, ErrStoreMsgNotFound } @@ -1947,6 +1974,11 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) { bi, _, hashChecked, _ := mb.slotInfo(int(seq - mb.cache.fseq)) + // Check if partial cache and we miss. + if mb.cache.off > 0 && bi <= uint32(mb.cache.off) { + return nil, errPartialCache + } + // We use the high bit to denote we have already checked the checksum. var hh hash.Hash64 if !hashChecked { @@ -1954,14 +1986,6 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) { mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit) } - // Check if partial - if mb.cache.off > 0 && bi < uint32(mb.cache.off) { - buf := mb.cache.buf - mb.cache.buf = nil - mb.cache.buf = buf - return nil, errPartialCache - } - li := int(bi) - mb.cache.off buf := mb.cache.buf[li:] @@ -2010,6 +2034,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) { fs.mu.RUnlock() return nil, err } + // TODO(dlc) - older design had a check to prefetch when we knew we were // loading in order and getting close to end of current mb. Should add // something like it back in. @@ -2385,7 +2410,7 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { return } // Close cache - mb.clearCache() + mb.clearCacheAndOffset() // Quit our loops. if mb.qch != nil { close(mb.qch) @@ -2423,7 +2448,7 @@ func (mb *msgBlock) close(sync bool) { } // Close cache - mb.clearCache() + mb.clearCacheAndOffset() // Quit our loops. if mb.qch != nil { close(mb.qch) diff --git a/server/filestore_test.go b/server/filestore_test.go index 9735a7c5..4c967770 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -853,6 +853,7 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) { // Make sure they expire. checkFor(t, time.Second, 2*maxAge, func() error { + t.Helper() state = fs.State() if state.Msgs != 0 { return fmt.Errorf("Expected no msgs, got %d", state.Msgs) @@ -1297,7 +1298,7 @@ func TestFileStoreReadCache(t *testing.T) { fs.StoreMsg(subj, nil, msg) } - // Wait for write cache portion to go to zero. + // Wait for cache to go to zero. checkFor(t, time.Second, 10*time.Millisecond, func() error { if csz := fs.cacheSize(); csz != 0 { return fmt.Errorf("cache size not 0, got %s", FriendlyBytes(int64(csz))) @@ -1331,6 +1332,32 @@ func TestFileStoreReadCache(t *testing.T) { } } +func TestFileStorePartialCacheExpiration(t *testing.T) { + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + cexp := 10 * time.Millisecond + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, CacheExpire: cexp}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + fs.StoreMsg("foo", nil, []byte("msg1")) + + // Should expire and be removed. + time.Sleep(2 * cexp) + fs.StoreMsg("bar", nil, []byte("msg2")) + + // Again wait for cache to expire. + time.Sleep(2 * cexp) + if _, _, _, _, err := fs.LoadMsg(1); err != nil { + t.Fatalf("Error loading message 1: %v", err) + } +} + func TestFileStoreSnapshot(t *testing.T) { storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) os.MkdirAll(storeDir, 0755) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 56fe6705..cbfce066 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -8857,7 +8857,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } } -func TestJetStreamStoredMsgsDontDisappear(t *testing.T) { +func TestJetStreamStoredMsgsDontDisappearAfterCacheExpiration(t *testing.T) { sc := &server.StreamConfig{ Name: "MY_STREAM", Storage: server.FileStorage, @@ -8903,6 +8903,7 @@ func TestJetStreamStoredMsgsDontDisappear(t *testing.T) { defer nc2.Close() sendStreamMsg(t, nc2, "foo.bar", "msg1") + msg, err := sub.NextMsg(time.Second) if err != nil { t.Fatalf("Did not get message: %v", err) @@ -8929,9 +8930,11 @@ func TestJetStreamStoredMsgsDontDisappear(t *testing.T) { t.Fatalf("Error: %q", smsgj.Data) } } + getMsgSeq(1) time.Sleep(time.Second) + sendStreamMsg(t, nc2, "foo.bar", "msg2") sendStreamMsg(t, nc2, "foo.bar", "msg3")