diff --git a/server/filestore.go b/server/filestore.go index a9033526..0d5c7246 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -522,7 +522,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.ageChk = nil } - if cfg.MaxMsgsPer > 0 && cfg.MaxMsgsPer < old_cfg.MaxMsgsPer { + if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer { fs.enforceMsgPerSubjectLimit() } fs.mu.Unlock() @@ -5081,20 +5081,17 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store start = fs.state.FirstSeq } - // TODO(dlc) - If num blocks gets large maybe use selectMsgBlock but have it return index b/c - // we need to keep walking if no match found in first mb. - for _, mb := range fs.blks { - // Skip blocks that are less than our starting sequence. - if start > atomic.LoadUint64(&mb.last.seq) { - continue - } - if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil { - if expireOk && mb != fs.lmb { - mb.tryForceExpireCache() + if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 { + for i := bi; i < len(fs.blks); i++ { + mb := fs.blks[i] + if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil { + if expireOk && mb != fs.lmb { + mb.tryForceExpireCache() + } + return sm, sm.seq, nil + } else if err != ErrStoreMsgNotFound { + return nil, 0, err } - return sm, sm.seq, nil - } else if err != ErrStoreMsgNotFound { - return nil, 0, err } } @@ -6237,7 +6234,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si slen := int(le.Uint16(hdr[20:])) if subj == string(buf[msgHdrSize:msgHdrSize+slen]) { seq := le.Uint64(hdr[4:]) - if seq&ebit != 0 { + if seq < mb.first.seq || seq&ebit != 0 { continue } if mb.dmap.Exists(seq) { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 2e246dec..59ba472c 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21639,3 +21639,64 @@ func TestJetStreamUsageSyncDeadlock(t *testing.T) { sendStreamMsg(t, nc, "foo", "hello") } + +// https://github.com/nats-io/nats.go/issues/1382 +// https://github.com/nats-io/nats-server/issues/4445 +func TestJetStreamChangeMaxMessagesPerSubject(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"one.>"}, + MaxMsgsPerSubject: 5, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, "one.data", "data") + } + + expectMsgs := func(num int32) error { + t.Helper() + + var msgs atomic.Int32 + sub, err := js.Subscribe("one.>", func(msg *nats.Msg) { + msgs.Add(1) + msg.Ack() + }) + require_NoError(t, err) + defer sub.Unsubscribe() + + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + if nm := msgs.Load(); nm != num { + return fmt.Errorf("expected to get %v messages, got %v instead", num, nm) + } + return nil + }) + return nil + } + + require_NoError(t, expectMsgs(5)) + + js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"one.>"}, + MaxMsgsPerSubject: 3, + }) + + info, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, info.Config.MaxMsgsPerSubject == 3) + require_True(t, info.State.Msgs == 3) + + require_NoError(t, expectMsgs(3)) + + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, "one.data", "data") + } + + require_NoError(t, expectMsgs(3)) +}