Merge pull request #1693 from nats-io/pcache_bug

Partial filestore cache bug
This commit is contained in:
Derek Collison
2020-11-05 07:30:51 -08:00
committed by GitHub
3 changed files with 157 additions and 19 deletions

View File

@@ -602,6 +602,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
}
mb.ifd = ifd
// Set cache time to creation time to start.
ts := time.Now().UnixNano()
mb.llts, mb.lrts, mb.lwts = ts, ts, ts
// We know we will need this so go ahead and spin up.
mb.spinUpFlushLoop()
@@ -1195,13 +1199,34 @@ func (mb *msgBlock) startCacheExpireTimer() {
mb.resetCacheExpireTimer(0)
}
// Used when we load in a message block.
// Lock should be held.
func (mb *msgBlock) clearCacheAndOffset() {
if mb.cache != nil {
mb.cache.off = 0
mb.cache.wp = 0
}
mb.clearCache()
}
// Lock should be held.
func (mb *msgBlock) clearCache() {
if mb.ctmr != nil {
mb.ctmr.Stop()
mb.ctmr = nil
}
mb.cache = nil
if mb.cache == nil {
return
}
if mb.cache.off == 0 {
mb.cache = nil
} else {
// Clear msgs and index.
mb.cache.buf = nil
mb.cache.idx = nil
mb.cache.wp = 0
}
}
// Called to possibly expire a message block cache.
@@ -1232,13 +1257,15 @@ func (mb *msgBlock) expireCache() {
bufts = mb.lwts
}
// Check for the underlying buffer first.
// Check for activity on the cache that would prevent us from expiring.
if tns-bufts <= int64(mb.cexp) {
mb.resetCacheExpireTimer(mb.cexp - time.Duration(tns-bufts))
return
}
// If we are here we will at least expire the core msg buffer.
// We need to capture offset in case we do a write next before a full load.
mb.cache.off += len(mb.cache.buf)
mb.cache.buf = nil
mb.cache.wp = 0
@@ -1617,7 +1644,6 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq {
return nil
}
// blks are sorted in ascending order.
// TODO(dlc) - Can be smarter here, when lots of blks maybe use binary search.
// For now this is cache friendly for small to medium numbers of blks.
@@ -1703,7 +1729,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
mb.cache.buf = buf
mb.cache.idx = idx
mb.cache.fseq = fseq
mb.cache.wp += len(buf)
mb.cache.wp += int(lbuf)
return nil
}
@@ -1844,7 +1870,7 @@ func (mb *msgBlock) loadMsgs() error {
checkCache:
// Check to see if we have a full cache.
if mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 {
if mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 && len(mb.cache.buf) > 0 {
return nil
}
@@ -1867,8 +1893,9 @@ checkCache:
return err
}
// Reset the cache since we just read everything in.
// Make sure this is cleared in case we had a partial when we started.
mb.clearCache()
mb.clearCacheAndOffset()
if err := mb.indexCacheBuf(buf); err != nil {
return err
@@ -1927,11 +1954,11 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
// Will do a lookup from cache assuming lock is held.
func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) {
if mb.cache == nil {
if mb.cache == nil || len(mb.cache.idx) == 0 {
return nil, errNoCache
}
if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) {
if seq < mb.first.seq || seq < mb.cache.fseq || seq > mb.last.seq {
return nil, ErrStoreMsgNotFound
}
@@ -1947,6 +1974,11 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) {
bi, _, hashChecked, _ := mb.slotInfo(int(seq - mb.cache.fseq))
// Check if partial cache and we miss.
if mb.cache.off > 0 && bi <= uint32(mb.cache.off) {
return nil, errPartialCache
}
// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
@@ -1954,14 +1986,6 @@ func (mb *msgBlock) cacheLookupWithLock(seq uint64) (*fileStoredMsg, error) {
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
}
// Check if partial
if mb.cache.off > 0 && bi < uint32(mb.cache.off) {
buf := mb.cache.buf
mb.cache.buf = nil
mb.cache.buf = buf
return nil, errPartialCache
}
li := int(bi) - mb.cache.off
buf := mb.cache.buf[li:]
@@ -2010,6 +2034,7 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
fs.mu.RUnlock()
return nil, err
}
// TODO(dlc) - older design had a check to prefetch when we knew we were
// loading in order and getting close to end of current mb. Should add
// something like it back in.
@@ -2385,7 +2410,7 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
return
}
// Close cache
mb.clearCache()
mb.clearCacheAndOffset()
// Quit our loops.
if mb.qch != nil {
close(mb.qch)
@@ -2423,7 +2448,7 @@ func (mb *msgBlock) close(sync bool) {
}
// Close cache
mb.clearCache()
mb.clearCacheAndOffset()
// Quit our loops.
if mb.qch != nil {
close(mb.qch)

View File

@@ -853,6 +853,7 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) {
// Make sure they expire.
checkFor(t, time.Second, 2*maxAge, func() error {
t.Helper()
state = fs.State()
if state.Msgs != 0 {
return fmt.Errorf("Expected no msgs, got %d", state.Msgs)
@@ -1297,7 +1298,7 @@ func TestFileStoreReadCache(t *testing.T) {
fs.StoreMsg(subj, nil, msg)
}
// Wait for write cache portion to go to zero.
// Wait for cache to go to zero.
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if csz := fs.cacheSize(); csz != 0 {
return fmt.Errorf("cache size not 0, got %s", FriendlyBytes(int64(csz)))
@@ -1331,6 +1332,32 @@ func TestFileStoreReadCache(t *testing.T) {
}
}
func TestFileStorePartialCacheExpiration(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
cexp := 10 * time.Millisecond
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, CacheExpire: cexp}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
fs.StoreMsg("foo", nil, []byte("msg1"))
// Should expire and be removed.
time.Sleep(2 * cexp)
fs.StoreMsg("bar", nil, []byte("msg2"))
// Again wait for cache to expire.
time.Sleep(2 * cexp)
if _, _, _, _, err := fs.LoadMsg(1); err != nil {
t.Fatalf("Error loading message 1: %v", err)
}
}
func TestFileStoreSnapshot(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)

View File

@@ -8856,3 +8856,89 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
})
}
}
func TestJetStreamStoredMsgsDontDisappearAfterCacheExpiration(t *testing.T) {
sc := &server.StreamConfig{
Name: "MY_STREAM",
Storage: server.FileStorage,
Subjects: []string{"foo.>"},
Retention: server.InterestPolicy,
}
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
// mset, err := s.GlobalAccount().AddStream(sc)
mset, err := s.GlobalAccount().AddStreamWithStore(sc, &server.FileStoreConfig{BlockSize: 128, CacheExpire: 15 * time.Millisecond})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc1 := clientConnectWithOldRequest(t, s)
defer nc1.Close()
// Create a durable consumers
sub, _ := nc1.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc1.Flush()
o, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur",
DeliverSubject: sub.Subject,
FilterSubject: "foo.bar",
DeliverPolicy: server.DeliverNew,
AckPolicy: server.AckExplicit,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o.Delete()
nc2 := clientConnectWithOldRequest(t, s)
defer nc2.Close()
sendStreamMsg(t, nc2, "foo.bar", "msg1")
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Did not get message: %v", err)
}
if string(msg.Data) != "msg1" {
t.Fatalf("Unexpected message: %q", msg.Data)
}
nc1.Close()
// Get the message from the stream
getMsgSeq := func(seq uint64) {
t.Helper()
mreq := &server.JSApiMsgGetRequest{Seq: seq}
req, err := json.Marshal(mreq)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
smsgj, err := nc2.Request(fmt.Sprintf(server.JSApiMsgGetT, sc.Name), req, time.Second)
if err != nil {
t.Fatalf("Could not retrieve stream message: %v", err)
}
if strings.Contains(string(smsgj.Data), "code") {
t.Fatalf("Error: %q", smsgj.Data)
}
}
getMsgSeq(1)
time.Sleep(time.Second)
sendStreamMsg(t, nc2, "foo.bar", "msg2")
sendStreamMsg(t, nc2, "foo.bar", "msg3")
getMsgSeq(1)
getMsgSeq(2)
getMsgSeq(3)
}