diff --git a/server/memstore.go b/server/memstore.go index 848e4e29..036a09b7 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -27,7 +27,6 @@ type memStore struct { cfg StreamConfig state StreamState msgs map[uint64]*storedMsg - dmap map[uint64]struct{} fss map[string]*SimpleState maxp int64 scb StorageUpdateHandler @@ -53,7 +52,6 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) { ms := &memStore{ msgs: make(map[uint64]*storedMsg), fss: make(map[string]*SimpleState), - dmap: make(map[uint64]struct{}), maxp: cfg.MaxMsgsPer, cfg: *cfg, } @@ -475,7 +473,6 @@ func (ms *memStore) Purge() (uint64, error) { ms.state.Msgs = 0 ms.msgs = make(map[uint64]*storedMsg) ms.fss = make(map[string]*SimpleState) - ms.dmap = make(map[uint64]struct{}) ms.mu.Unlock() if cb != nil { @@ -511,8 +508,6 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ delete(ms.msgs, seq) - } else { - delete(ms.dmap, seq) } } ms.state.Msgs -= purged @@ -554,8 +549,6 @@ func (ms *memStore) Truncate(seq uint64) error { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) delete(ms.msgs, seq) - } else { - delete(ms.dmap, i) } } // Reset last. @@ -643,13 +636,11 @@ func (ms *memStore) EraseMsg(seq uint64) (bool, error) { func (ms *memStore) updateFirstSeq(seq uint64) { if seq != ms.state.FirstSeq { // Interior delete. - ms.dmap[seq] = struct{}{} return } var nsm *storedMsg var ok bool for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ { - delete(ms.dmap, nseq) if nsm, ok = ms.msgs[nseq]; ok { break } @@ -661,7 +652,6 @@ func (ms *memStore) updateFirstSeq(seq uint64) { // Like purge. ms.state.FirstSeq = ms.state.LastSeq + 1 ms.state.FirstTime = time.Time{} - ms.dmap = make(map[uint64]struct{}) } } @@ -755,19 +745,26 @@ func (ms *memStore) FastState(state *StreamState) { func (ms *memStore) State() StreamState { ms.mu.RLock() + defer ms.mu.RUnlock() + state := ms.state state.Consumers = ms.consumers state.Deleted = nil - for seq := range ms.dmap { - state.Deleted = append(state.Deleted, seq) - } - ms.mu.RUnlock() - if len(state.Deleted) > 0 { - sort.Slice(state.Deleted, func(i, j int) bool { - return state.Deleted[i] < state.Deleted[j] - }) - state.NumDeleted = len(state.Deleted) + + // Calculate interior delete details. + if state.LastSeq > state.FirstSeq { + state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1) + if state.NumDeleted > 0 { + state.Deleted = make([]uint64, 0, state.NumDeleted) + // TODO(dlc) - Too Simplistic, once state is updated to allow runs etc redo. + for seq := state.FirstSeq + 1; seq < ms.state.LastSeq; seq++ { + if _, ok := ms.msgs[seq]; !ok { + state.Deleted = append(state.Deleted, seq) + } + } + } } + return state } diff --git a/server/norace_test.go b/server/norace_test.go index c8e07acf..9c0e1a10 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3918,3 +3918,58 @@ func TestNoRaceJetStreamClusterStreamDropCLFS(t *testing.T) { t.Fatalf("Stream was reset") } } + +func TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + MaxMsgsPerSubject: 1, + Storage: nats.MemoryStorage, + }) + require_NoError(t, err) + + acc, err := s.lookupAccount("$G") + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + msg := []byte("Hello World!") + if _, err := js.PublishAsync("foo", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + for i := 1; i <= 1_000_000; i++ { + if _, err := js.PublishAsync("bar", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + now := time.Now() + ss := mset.stateWithDetail(true) + // Before the fix the snapshot for this test would be > 200ms on my setup. + if elapsed := time.Since(now); elapsed > 50*time.Millisecond { + t.Fatalf("Took too long to snapshot: %v", elapsed) + } + + if ss.Msgs != 2 || ss.FirstSeq != 1 || ss.LastSeq != 1_000_001 || ss.NumDeleted != 999999 { + // To not print out on error. + ss.Deleted = nil + t.Fatalf("Bad State: %+v", ss) + } +} diff --git a/server/stream.go b/server/stream.go index a36c5078..aeebb423 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1058,7 +1058,7 @@ func (mset *stream) update(config *StreamConfig) error { return nil } -// Purge will remove all messages from the stream and underlying store. +// Purge will remove all messages from the stream and underlying store based on the request. func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) { mset.mu.Lock() if mset.client == nil {