mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Fixed a bug that was not correctly selecting next first because it was not skipping dbit entries.
This could result in lookups failing, e.g. after a change in max msgs per subject to a lower value. Also fixed a bug that would not prperly update psim during compact when throwing away the whole block and a subject had more than one message. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -486,7 +486,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
|
||||
|
||||
// If we have max msgs per subject make sure the is also enforced.
|
||||
if fs.cfg.MaxMsgsPer > 0 {
|
||||
fs.enforceMsgPerSubjectLimit()
|
||||
fs.enforceMsgPerSubjectLimit(false)
|
||||
}
|
||||
|
||||
// Grab first sequence for check below while we have lock.
|
||||
@@ -589,7 +589,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
|
||||
}
|
||||
|
||||
if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
|
||||
fs.enforceMsgPerSubjectLimit()
|
||||
fs.enforceMsgPerSubjectLimit(true)
|
||||
}
|
||||
fs.mu.Unlock()
|
||||
|
||||
@@ -1907,13 +1907,10 @@ func (fs *fileStore) expireMsgsOnRecover() {
|
||||
}
|
||||
// Make sure we do subject cleanup as well.
|
||||
mb.ensurePerSubjectInfoLoaded()
|
||||
for subj := range mb.fss {
|
||||
fs.removePerSubject(subj)
|
||||
}
|
||||
// Make sure we do subject cleanup as well.
|
||||
mb.ensurePerSubjectInfoLoaded()
|
||||
for subj := range mb.fss {
|
||||
fs.removePerSubject(subj)
|
||||
for subj, ss := range mb.fss {
|
||||
for i := uint64(0); i < ss.Msgs; i++ {
|
||||
fs.removePerSubject(subj)
|
||||
}
|
||||
}
|
||||
mb.dirtyCloseWithRemove(true)
|
||||
deleted++
|
||||
@@ -3190,14 +3187,16 @@ func (fs *fileStore) enforceBytesLimit() {
|
||||
// We will make sure to go through all msg blocks etc. but in practice this
|
||||
// will most likely only be the last one, so can take a more conservative approach.
|
||||
// Lock should be held.
|
||||
func (fs *fileStore) enforceMsgPerSubjectLimit() {
|
||||
func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
|
||||
maxMsgsPer := uint64(fs.cfg.MaxMsgsPer)
|
||||
|
||||
// We want to suppress callbacks from remove during this process
|
||||
// We may want to suppress callbacks from remove during this process
|
||||
// since these should have already been deleted and accounted for.
|
||||
cb := fs.scb
|
||||
fs.scb = nil
|
||||
defer func() { fs.scb = cb }()
|
||||
if !fireCallback {
|
||||
cb := fs.scb
|
||||
fs.scb = nil
|
||||
defer func() { fs.scb = cb }()
|
||||
}
|
||||
|
||||
var numMsgs uint64
|
||||
|
||||
@@ -3251,6 +3250,9 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
|
||||
}
|
||||
// Grab the ss entry for this subject in case sparse.
|
||||
mb.mu.Lock()
|
||||
if mb.cacheNotLoaded() {
|
||||
mb.loadMsgsWithLock()
|
||||
}
|
||||
mb.ensurePerSubjectInfoLoaded()
|
||||
ss := mb.fss[subj]
|
||||
if ss != nil && ss.firstNeedsUpdate {
|
||||
@@ -6227,8 +6229,10 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
|
||||
bytes += mb.bytes
|
||||
// Make sure we do subject cleanup as well.
|
||||
mb.ensurePerSubjectInfoLoaded()
|
||||
for subj := range mb.fss {
|
||||
fs.removePerSubject(subj)
|
||||
for subj, ss := range mb.fss {
|
||||
for i := uint64(0); i < ss.Msgs; i++ {
|
||||
fs.removePerSubject(subj)
|
||||
}
|
||||
}
|
||||
// Now close.
|
||||
mb.dirtyCloseWithRemove(true)
|
||||
@@ -6671,7 +6675,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
|
||||
|
||||
var le = binary.LittleEndian
|
||||
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
|
||||
li := int(mb.cache.idx[slot]&^hbit) - mb.cache.off
|
||||
bi := mb.cache.idx[slot] &^ hbit
|
||||
if bi == dbit {
|
||||
// delete marker so skip.
|
||||
continue
|
||||
}
|
||||
li := int(bi) - mb.cache.off
|
||||
if li >= len(mb.cache.buf) {
|
||||
ss.First = ss.Last
|
||||
return
|
||||
@@ -6681,10 +6690,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
|
||||
slen := int(le.Uint16(hdr[20:]))
|
||||
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
|
||||
seq := le.Uint64(hdr[4:])
|
||||
if seq < mb.first.seq || seq&ebit != 0 {
|
||||
continue
|
||||
}
|
||||
if mb.dmap.Exists(seq) {
|
||||
if seq < mb.first.seq || seq&ebit != 0 || mb.dmap.Exists(seq) {
|
||||
continue
|
||||
}
|
||||
ss.First = seq
|
||||
|
||||
@@ -6304,6 +6304,36 @@ func TestFileStoreCompactingBlocksOnSync(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// Make sure a call to Compact() updates PSIM correctly.
|
||||
func TestFileStoreCompactAndPSIMWhenDeletingBlocks(t *testing.T) {
|
||||
fs, err := newFileStore(
|
||||
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 512},
|
||||
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
|
||||
require_NoError(t, err)
|
||||
defer fs.Stop()
|
||||
|
||||
subj, msg := "A", bytes.Repeat([]byte("ABC"), 33) // ~100bytes
|
||||
|
||||
// Add in 10 As
|
||||
for i := 0; i < 10; i++ {
|
||||
fs.StoreMsg(subj, nil, msg)
|
||||
}
|
||||
require_Equal(t, fs.numMsgBlocks(), 4)
|
||||
|
||||
// Should leave 1.
|
||||
n, err := fs.Compact(10)
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, n, 9)
|
||||
require_Equal(t, fs.numMsgBlocks(), 1)
|
||||
|
||||
fs.mu.RLock()
|
||||
psi := fs.psim[subj]
|
||||
fs.mu.RUnlock()
|
||||
|
||||
require_Equal(t, psi.total, 1)
|
||||
require_Equal(t, psi.fblk, psi.lblk)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -21931,3 +21931,61 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Make sure when we downgrade history to a smaller number that the account info
|
||||
// is properly updated and all keys are still accessible.
|
||||
// There was a bug calculating next first that was not taking into account the dbit slots.
|
||||
func TestJetStreamKVReductionInHistory(t *testing.T) {
|
||||
s := RunBasicJetStreamServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
startHistory := 4
|
||||
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: uint8(startHistory)})
|
||||
require_NoError(t, err)
|
||||
|
||||
numKeys, msg := 1000, bytes.Repeat([]byte("ABC"), 330) // ~1000bytes
|
||||
for {
|
||||
key := fmt.Sprintf("%X", rand.Intn(numKeys)+1)
|
||||
_, err = kv.Put(key, msg)
|
||||
require_NoError(t, err)
|
||||
status, err := kv.Status()
|
||||
require_NoError(t, err)
|
||||
if status.Values() >= uint64(startHistory*numKeys) {
|
||||
break
|
||||
}
|
||||
}
|
||||
info, err := js.AccountInfo()
|
||||
require_NoError(t, err)
|
||||
|
||||
checkAllKeys := func() {
|
||||
// Make sure we can retrieve all of the keys.
|
||||
keys, err := kv.Keys()
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, len(keys), numKeys)
|
||||
for _, key := range keys {
|
||||
_, err := kv.Get(key)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Quick sanity check.
|
||||
checkAllKeys()
|
||||
|
||||
si, err := js.StreamInfo("KV_TEST")
|
||||
require_NoError(t, err)
|
||||
// Adjust down to history of 1.
|
||||
cfg := si.Config
|
||||
cfg.MaxMsgsPerSubject = 1
|
||||
_, err = js.UpdateStream(&cfg)
|
||||
require_NoError(t, err)
|
||||
// Make sure the accounting was updated.
|
||||
ninfo, err := js.AccountInfo()
|
||||
require_NoError(t, err)
|
||||
require_True(t, info.Store > ninfo.Store)
|
||||
|
||||
// Make sure all keys still accessible.
|
||||
checkAllKeys()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user