Add store.SkipMsg() and update no interest retention streams

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-10-22 19:35:28 -07:00
parent 63477acb61
commit ad247d1853
5 changed files with 226 additions and 53 deletions

View File

@@ -639,6 +639,26 @@ func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, erro
return seq, ts, nil
}
// SkipMsg will use the next sequence number but not store anything.
func (fs *fileStore) SkipMsg() uint64 {
// Grab time.
now := time.Now().UTC()
fs.mu.Lock()
seq := fs.state.LastSeq + 1
fs.state.LastSeq = seq
fs.state.LastTime = now
if fs.state.Msgs == 0 {
fs.state.FirstSeq = seq
fs.state.FirstTime = now
}
if seq == fs.state.FirstSeq {
fs.state.FirstSeq = seq + 1
fs.state.FirstTime = now
}
fs.mu.Unlock()
return seq
}
// Will check the msg limit and drop firstSeq msg if needed.
// Lock should be held.
func (fs *fileStore) enforceMsgLimit() {
@@ -744,6 +764,11 @@ func (mb *msgBlock) kickWriteFlusher() {
}
}
// Lock should be held.
func (mb *msgBlock) isEmpty() bool {
return mb.first.seq > mb.last.seq
}
// Lock should be held.
func (mb *msgBlock) selectNextFirst() {
var seq uint64
@@ -757,6 +782,11 @@ func (mb *msgBlock) selectNextFirst() {
}
// Set new first sequence.
mb.first.seq = seq
// Check if we are empty..
if mb.isEmpty() {
mb.first.ts = 0
return
}
// Need to get the timestamp.
// We will try the cache direct and fallback if needed.
@@ -774,6 +804,21 @@ func (mb *msgBlock) selectNextFirst() {
}
}
// Select the next FirstSeq
func (fs *fileStore) selectNextFirst() {
if len(fs.blks) > 0 {
mb := fs.blks[0]
mb.mu.RLock()
fs.state.FirstSeq = mb.first.seq
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
mb.mu.RUnlock()
} else {
// Could not find anything, so treat like purge
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
}
}
func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) error {
// Update global accounting.
msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
@@ -825,18 +870,20 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
atomic.AddUint64(&mb.cgenid, 1)
var shouldWriteIndex bool
var firstSeqNeedsUpdate bool
// Optimize for FIFO case.
if seq == mb.first.seq {
mb.selectNextFirst()
if seq == fs.state.FirstSeq {
fs.state.FirstSeq = mb.first.seq // new one.
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
if mb.first.seq > mb.last.seq {
if mb.isEmpty() {
fs.removeMsgBlock(mb)
firstSeqNeedsUpdate = seq == fs.state.FirstSeq
} else {
shouldWriteIndex = true
if seq == fs.state.FirstSeq {
fs.state.FirstSeq = mb.first.seq // new one.
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
}
} else {
// Out of order delete.
@@ -864,6 +911,13 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
}
}
mb.mu.Unlock()
// If we emptied the current message block and the seq was state.First.Seq
// then we need to jump message blocks.
if firstSeqNeedsUpdate {
fs.selectNextFirst()
}
fs.mu.Unlock()
if fs.scb != nil {

View File

@@ -204,6 +204,83 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
}
}
func TestFileStoreSelectNextFirst(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 256}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
numMsgs := 10
subj, msg := "zzz", []byte("Hello World")
for i := 0; i < numMsgs; i++ {
fs.StoreMsg(subj, nil, msg)
}
if state := fs.State(); state.Msgs != uint64(numMsgs) {
t.Fatalf("Expected %d msgs, got %d", numMsgs, state.Msgs)
}
// Note the 256 block size is tied to the msg size below to give us 5 messages per block.
if fmb := fs.selectMsgBlock(1); fmb.msgs != 5 {
t.Fatalf("Expected 5 messages per block, but got %d", fmb.msgs)
}
// Delete 2-7, this will cross message blocks.
for i := 2; i <= 7; i++ {
fs.RemoveMsg(uint64(i))
}
if state := fs.State(); state.Msgs != 4 || state.FirstSeq != 1 {
t.Fatalf("Expected 4 msgs, first seq of 11, got msgs of %d and first seq of %d", state.Msgs, state.FirstSeq)
}
// Now close the gap which will force the system to jump underlying message blocks to find the right sequence.
fs.RemoveMsg(1)
if state := fs.State(); state.Msgs != 3 || state.FirstSeq != 8 {
t.Fatalf("Expected 3 msgs, first seq of 8, got msgs of %d and first seq of %d", state.Msgs, state.FirstSeq)
}
}
func TestFileStoreSkipMsg(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 256}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
numSkips := 10
for i := 0; i < numSkips; i++ {
fs.SkipMsg()
}
state := fs.State()
if state.Msgs != 0 {
t.Fatalf("Expected %d msgs, got %d", 0, state.Msgs)
}
if state.FirstSeq != uint64(numSkips+1) || state.LastSeq != uint64(numSkips) {
t.Fatalf("Expected first to be %d and last to be %d. got first %d and last %d", numSkips+1, numSkips, state.FirstSeq, state.LastSeq)
}
fs.StoreMsg("zzz", nil, []byte("Hello World!"))
fs.SkipMsg()
fs.SkipMsg()
fs.StoreMsg("zzz", nil, []byte("Hello World!"))
state = fs.State()
if state.Msgs != 2 {
t.Fatalf("Expected %d msgs, got %d", 2, state.Msgs)
}
if state.FirstSeq != uint64(numSkips+1) || state.LastSeq != uint64(numSkips+4) {
t.Fatalf("Expected first to be %d and last to be %d. got first %d and last %d", numSkips+1, numSkips+4, state.FirstSeq, state.LastSeq)
}
}
func TestFileStoreMsgLimit(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)

View File

@@ -140,6 +140,24 @@ func (ms *memStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error
return seq, ts, nil
}
// SkipMsg will use the next sequence number but not store anything.
func (ms *memStore) SkipMsg() uint64 {
// Grab time.
now := time.Now().UTC()
ms.mu.Lock()
seq := ms.state.LastSeq + 1
ms.state.LastSeq = seq
ms.state.LastTime = now
if ms.state.Msgs == 0 {
ms.state.FirstSeq = seq
ms.state.FirstTime = now
}
ms.updateFirstSeq(seq)
ms.mu.Unlock()
return seq
}
// StorageBytesUpdate registers an async callback for updates to storage changes.
func (ms *memStore) StorageBytesUpdate(cb func(int64)) {
ms.mu.Lock()
@@ -297,6 +315,29 @@ func (ms *memStore) EraseMsg(seq uint64) (bool, error) {
return removed, nil
}
// Performs logic tp update first sequence number.
// Lock should be held.
func (ms *memStore) updateFirstSeq(seq uint64) {
if seq != ms.state.FirstSeq {
return
}
var nsm *storedMsg
var ok bool
for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ {
if nsm, ok = ms.msgs[nseq]; ok {
break
}
}
if nsm != nil {
ms.state.FirstSeq = nsm.seq
ms.state.FirstTime = time.Unix(0, nsm.ts).UTC()
} else {
// Like purge.
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = time.Time{}
}
}
// Removes the message referenced by seq.
func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
var ss uint64
@@ -309,23 +350,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
ms.state.Msgs--
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
ms.state.Bytes -= ss
if seq == ms.state.FirstSeq {
var nsm *storedMsg
var ok bool
for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ {
if nsm, ok = ms.msgs[nseq]; ok {
break
}
}
if nsm != nil {
ms.state.FirstSeq = nsm.seq
ms.state.FirstTime = time.Unix(0, nsm.ts).UTC()
} else {
// Like purge.
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = time.Time{}
}
}
ms.updateFirstSeq(seq)
if secure {
if len(sm.hdr) > 0 {

View File

@@ -54,6 +54,7 @@ var (
type StreamStore interface {
StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error)
SkipMsg() uint64
LoadMsg(seq uint64) (subj string, hdr, msg []byte, ts int64, err error)
RemoveMsg(seq uint64) (bool, error)
EraseMsg(seq uint64) (bool, error)

View File

@@ -793,39 +793,55 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
// Header support.
var hdr []byte
// Check to see if we are over the account limit.
// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && len(msg) > maxMsgSize {
response = []byte("-ERR 'message size exceeds maximum allowed'")
} else {
// Headers.
if pc != nil && pc.pa.hdr > 0 {
hdr = msg[:pc.pa.hdr]
msg = msg[pc.pa.hdr:]
if doAck && len(reply) > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
seq, ts, err = store.StoreMsg(subject, hdr, msg)
if err != nil {
if err != ErrStoreClosed {
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
}
response = []byte(fmt.Sprintf("-ERR '%v'", err))
} else if jsa.limitsExceeded(stype) {
c.Warnf("JetStream resource limits exceeded for account: %q", accName)
response = []byte("-ERR 'resource limits exceeded for account'")
store.RemoveMsg(seq)
seq = 0
} else if err == nil {
if doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
}
// If we have a msgId make sure to save.
if msgId != "" {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
// If we are interest based retention and have no consumers clean that up here.
if interestRetention && numConsumers == 0 {
store.RemoveMsg(seq)
}
return
}
// If we are interest based retention and have no consumers then skip.
if interestRetention && numConsumers == 0 {
seq = store.SkipMsg()
if doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
// If we have a msgId make sure to save.
if msgId != "" {
mset.storeMsgId(&ddentry{msgId, seq, time.Now().UnixNano()})
}
return
}
// If here we will attempt to store the message.
// Headers.
if pc != nil && pc.pa.hdr > 0 {
hdr = msg[:pc.pa.hdr]
msg = msg[pc.pa.hdr:]
}
seq, ts, err = store.StoreMsg(subject, hdr, msg)
if err != nil {
if err != ErrStoreClosed {
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
}
response = []byte(fmt.Sprintf("-ERR '%v'", err))
} else if jsa.limitsExceeded(stype) {
c.Warnf("JetStream resource limits exceeded for account: %q", accName)
response = []byte("-ERR 'resource limits exceeded for account'")
store.RemoveMsg(seq)
seq = 0
} else {
if doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
}
// If we have a msgId make sure to save.
if msgId != "" {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
}
@@ -834,7 +850,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
if err == nil && numConsumers > 0 && seq > 0 {
if err == nil && seq > 0 && numConsumers > 0 {
var needSignal bool
mset.mu.Lock()
for _, o := range mset.consumers {