diff --git a/server/filestore.go b/server/filestore.go index e04acae5..8121714e 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -639,6 +639,26 @@ func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, erro return seq, ts, nil } +// SkipMsg will use the next sequence number but not store anything. +func (fs *fileStore) SkipMsg() uint64 { + // Grab time. + now := time.Now().UTC() + fs.mu.Lock() + seq := fs.state.LastSeq + 1 + fs.state.LastSeq = seq + fs.state.LastTime = now + if fs.state.Msgs == 0 { + fs.state.FirstSeq = seq + fs.state.FirstTime = now + } + if seq == fs.state.FirstSeq { + fs.state.FirstSeq = seq + 1 + fs.state.FirstTime = now + } + fs.mu.Unlock() + return seq +} + // Will check the msg limit and drop firstSeq msg if needed. // Lock should be held. func (fs *fileStore) enforceMsgLimit() { @@ -744,6 +764,11 @@ func (mb *msgBlock) kickWriteFlusher() { } } +// Lock should be held. +func (mb *msgBlock) isEmpty() bool { + return mb.first.seq > mb.last.seq +} + // Lock should be held. func (mb *msgBlock) selectNextFirst() { var seq uint64 @@ -757,6 +782,11 @@ func (mb *msgBlock) selectNextFirst() { } // Set new first sequence. mb.first.seq = seq + // Check if we are empty.. + if mb.isEmpty() { + mb.first.ts = 0 + return + } // Need to get the timestamp. // We will try the cache direct and fallback if needed. @@ -774,6 +804,21 @@ func (mb *msgBlock) selectNextFirst() { } } +// Select the next FirstSeq +func (fs *fileStore) selectNextFirst() { + if len(fs.blks) > 0 { + mb := fs.blks[0] + mb.mu.RLock() + fs.state.FirstSeq = mb.first.seq + fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() + mb.mu.RUnlock() + } else { + // Could not find anything, so treat like purge + fs.state.FirstSeq = fs.state.LastSeq + 1 + fs.state.FirstTime = time.Time{} + } +} + func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) error { // Update global accounting. msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) @@ -825,18 +870,20 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored atomic.AddUint64(&mb.cgenid, 1) var shouldWriteIndex bool + var firstSeqNeedsUpdate bool // Optimize for FIFO case. if seq == mb.first.seq { mb.selectNextFirst() - if seq == fs.state.FirstSeq { - fs.state.FirstSeq = mb.first.seq // new one. - fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() - } - if mb.first.seq > mb.last.seq { + if mb.isEmpty() { fs.removeMsgBlock(mb) + firstSeqNeedsUpdate = seq == fs.state.FirstSeq } else { shouldWriteIndex = true + if seq == fs.state.FirstSeq { + fs.state.FirstSeq = mb.first.seq // new one. + fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC() + } } } else { // Out of order delete. @@ -864,6 +911,13 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored } } mb.mu.Unlock() + + // If we emptied the current message block and the seq was state.First.Seq + // then we need to jump message blocks. + if firstSeqNeedsUpdate { + fs.selectNextFirst() + } + fs.mu.Unlock() if fs.scb != nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index b9924d17..ffd2ccdd 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -204,6 +204,83 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { } } +func TestFileStoreSelectNextFirst(t *testing.T) { + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 256}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + numMsgs := 10 + subj, msg := "zzz", []byte("Hello World") + for i := 0; i < numMsgs; i++ { + fs.StoreMsg(subj, nil, msg) + } + if state := fs.State(); state.Msgs != uint64(numMsgs) { + t.Fatalf("Expected %d msgs, got %d", numMsgs, state.Msgs) + } + + // Note the 256 block size is tied to the msg size below to give us 5 messages per block. + if fmb := fs.selectMsgBlock(1); fmb.msgs != 5 { + t.Fatalf("Expected 5 messages per block, but got %d", fmb.msgs) + } + + // Delete 2-7, this will cross message blocks. + for i := 2; i <= 7; i++ { + fs.RemoveMsg(uint64(i)) + } + + if state := fs.State(); state.Msgs != 4 || state.FirstSeq != 1 { + t.Fatalf("Expected 4 msgs, first seq of 11, got msgs of %d and first seq of %d", state.Msgs, state.FirstSeq) + } + // Now close the gap which will force the system to jump underlying message blocks to find the right sequence. + fs.RemoveMsg(1) + if state := fs.State(); state.Msgs != 3 || state.FirstSeq != 8 { + t.Fatalf("Expected 3 msgs, first seq of 8, got msgs of %d and first seq of %d", state.Msgs, state.FirstSeq) + } +} + +func TestFileStoreSkipMsg(t *testing.T) { + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 256}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + numSkips := 10 + for i := 0; i < numSkips; i++ { + fs.SkipMsg() + } + state := fs.State() + if state.Msgs != 0 { + t.Fatalf("Expected %d msgs, got %d", 0, state.Msgs) + } + if state.FirstSeq != uint64(numSkips+1) || state.LastSeq != uint64(numSkips) { + t.Fatalf("Expected first to be %d and last to be %d. got first %d and last %d", numSkips+1, numSkips, state.FirstSeq, state.LastSeq) + } + + fs.StoreMsg("zzz", nil, []byte("Hello World!")) + fs.SkipMsg() + fs.SkipMsg() + fs.StoreMsg("zzz", nil, []byte("Hello World!")) + + state = fs.State() + if state.Msgs != 2 { + t.Fatalf("Expected %d msgs, got %d", 2, state.Msgs) + } + if state.FirstSeq != uint64(numSkips+1) || state.LastSeq != uint64(numSkips+4) { + t.Fatalf("Expected first to be %d and last to be %d. got first %d and last %d", numSkips+1, numSkips+4, state.FirstSeq, state.LastSeq) + } +} + func TestFileStoreMsgLimit(t *testing.T) { storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) os.MkdirAll(storeDir, 0755) diff --git a/server/memstore.go b/server/memstore.go index 67642f00..17063270 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -140,6 +140,24 @@ func (ms *memStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error return seq, ts, nil } +// SkipMsg will use the next sequence number but not store anything. +func (ms *memStore) SkipMsg() uint64 { + // Grab time. + now := time.Now().UTC() + + ms.mu.Lock() + seq := ms.state.LastSeq + 1 + ms.state.LastSeq = seq + ms.state.LastTime = now + if ms.state.Msgs == 0 { + ms.state.FirstSeq = seq + ms.state.FirstTime = now + } + ms.updateFirstSeq(seq) + ms.mu.Unlock() + return seq +} + // StorageBytesUpdate registers an async callback for updates to storage changes. func (ms *memStore) StorageBytesUpdate(cb func(int64)) { ms.mu.Lock() @@ -297,6 +315,29 @@ func (ms *memStore) EraseMsg(seq uint64) (bool, error) { return removed, nil } +// Performs logic tp update first sequence number. +// Lock should be held. +func (ms *memStore) updateFirstSeq(seq uint64) { + if seq != ms.state.FirstSeq { + return + } + var nsm *storedMsg + var ok bool + for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ { + if nsm, ok = ms.msgs[nseq]; ok { + break + } + } + if nsm != nil { + ms.state.FirstSeq = nsm.seq + ms.state.FirstTime = time.Unix(0, nsm.ts).UTC() + } else { + // Like purge. + ms.state.FirstSeq = ms.state.LastSeq + 1 + ms.state.FirstTime = time.Time{} + } +} + // Removes the message referenced by seq. func (ms *memStore) removeMsg(seq uint64, secure bool) bool { var ss uint64 @@ -309,23 +350,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ms.state.Msgs-- ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) ms.state.Bytes -= ss - if seq == ms.state.FirstSeq { - var nsm *storedMsg - var ok bool - for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ { - if nsm, ok = ms.msgs[nseq]; ok { - break - } - } - if nsm != nil { - ms.state.FirstSeq = nsm.seq - ms.state.FirstTime = time.Unix(0, nsm.ts).UTC() - } else { - // Like purge. - ms.state.FirstSeq = ms.state.LastSeq + 1 - ms.state.FirstTime = time.Time{} - } - } + ms.updateFirstSeq(seq) if secure { if len(sm.hdr) > 0 { diff --git a/server/store.go b/server/store.go index 0365030d..3ab78aab 100644 --- a/server/store.go +++ b/server/store.go @@ -54,6 +54,7 @@ var ( type StreamStore interface { StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) + SkipMsg() uint64 LoadMsg(seq uint64) (subj string, hdr, msg []byte, ts int64, err error) RemoveMsg(seq uint64) (bool, error) EraseMsg(seq uint64) (bool, error) diff --git a/server/stream.go b/server/stream.go index a2b2d288..0e75aa49 100644 --- a/server/stream.go +++ b/server/stream.go @@ -793,39 +793,55 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj // Header support. var hdr []byte - // Check to see if we are over the account limit. + // Check to see if we are over the max msg size. if maxMsgSize >= 0 && len(msg) > maxMsgSize { response = []byte("-ERR 'message size exceeds maximum allowed'") - } else { - // Headers. - if pc != nil && pc.pa.hdr > 0 { - hdr = msg[:pc.pa.hdr] - msg = msg[pc.pa.hdr:] + if doAck && len(reply) > 0 { + mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} } - seq, ts, err = store.StoreMsg(subject, hdr, msg) - if err != nil { - if err != ErrStoreClosed { - c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err) - } - response = []byte(fmt.Sprintf("-ERR '%v'", err)) - } else if jsa.limitsExceeded(stype) { - c.Warnf("JetStream resource limits exceeded for account: %q", accName) - response = []byte("-ERR 'resource limits exceeded for account'") - store.RemoveMsg(seq) - seq = 0 - } else if err == nil { - if doAck && len(reply) > 0 { - response = append(pubAck, strconv.FormatUint(seq, 10)...) - response = append(response, '}') - } - // If we have a msgId make sure to save. - if msgId != "" { - mset.storeMsgId(&ddentry{msgId, seq, ts}) - } - // If we are interest based retention and have no consumers clean that up here. - if interestRetention && numConsumers == 0 { - store.RemoveMsg(seq) - } + return + } + + // If we are interest based retention and have no consumers then skip. + if interestRetention && numConsumers == 0 { + seq = store.SkipMsg() + if doAck && len(reply) > 0 { + response = append(pubAck, strconv.FormatUint(seq, 10)...) + response = append(response, '}') + mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + } + // If we have a msgId make sure to save. + if msgId != "" { + mset.storeMsgId(&ddentry{msgId, seq, time.Now().UnixNano()}) + } + return + } + + // If here we will attempt to store the message. + // Headers. + if pc != nil && pc.pa.hdr > 0 { + hdr = msg[:pc.pa.hdr] + msg = msg[pc.pa.hdr:] + } + seq, ts, err = store.StoreMsg(subject, hdr, msg) + if err != nil { + if err != ErrStoreClosed { + c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err) + } + response = []byte(fmt.Sprintf("-ERR '%v'", err)) + } else if jsa.limitsExceeded(stype) { + c.Warnf("JetStream resource limits exceeded for account: %q", accName) + response = []byte("-ERR 'resource limits exceeded for account'") + store.RemoveMsg(seq) + seq = 0 + } else { + if doAck && len(reply) > 0 { + response = append(pubAck, strconv.FormatUint(seq, 10)...) + response = append(response, '}') + } + // If we have a msgId make sure to save. + if msgId != "" { + mset.storeMsgId(&ddentry{msgId, seq, ts}) } } @@ -834,7 +850,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} } - if err == nil && numConsumers > 0 && seq > 0 { + if err == nil && seq > 0 && numConsumers > 0 { var needSignal bool mset.mu.Lock() for _, o := range mset.consumers {