From af4d7dbe523d96e25329be75cb16d19ee0050ea5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 20 Dec 2021 17:37:16 -0800 Subject: [PATCH] Memory store tracked interior deletes for stream state, but under KV semantics this could be very large. Actually faster to not track at all and generate on the fly. Saves lots of memory too. When we update the stream state to include runs, etc will update this as well. Signed-off-by: Derek Collison --- server/memstore.go | 35 +++++++++++++-------------- server/norace_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++ server/stream.go | 2 +- 3 files changed, 72 insertions(+), 20 deletions(-) diff --git a/server/memstore.go b/server/memstore.go index 848e4e29..036a09b7 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -27,7 +27,6 @@ type memStore struct { cfg StreamConfig state StreamState msgs map[uint64]*storedMsg - dmap map[uint64]struct{} fss map[string]*SimpleState maxp int64 scb StorageUpdateHandler @@ -53,7 +52,6 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) { ms := &memStore{ msgs: make(map[uint64]*storedMsg), fss: make(map[string]*SimpleState), - dmap: make(map[uint64]struct{}), maxp: cfg.MaxMsgsPer, cfg: *cfg, } @@ -475,7 +473,6 @@ func (ms *memStore) Purge() (uint64, error) { ms.state.Msgs = 0 ms.msgs = make(map[uint64]*storedMsg) ms.fss = make(map[string]*SimpleState) - ms.dmap = make(map[uint64]struct{}) ms.mu.Unlock() if cb != nil { @@ -511,8 +508,6 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ delete(ms.msgs, seq) - } else { - delete(ms.dmap, seq) } } ms.state.Msgs -= purged @@ -554,8 +549,6 @@ func (ms *memStore) Truncate(seq uint64) error { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) delete(ms.msgs, seq) - } else { - delete(ms.dmap, i) } } // Reset last. @@ -643,13 +636,11 @@ func (ms *memStore) EraseMsg(seq uint64) (bool, error) { func (ms *memStore) updateFirstSeq(seq uint64) { if seq != ms.state.FirstSeq { // Interior delete. - ms.dmap[seq] = struct{}{} return } var nsm *storedMsg var ok bool for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ { - delete(ms.dmap, nseq) if nsm, ok = ms.msgs[nseq]; ok { break } @@ -661,7 +652,6 @@ func (ms *memStore) updateFirstSeq(seq uint64) { // Like purge. ms.state.FirstSeq = ms.state.LastSeq + 1 ms.state.FirstTime = time.Time{} - ms.dmap = make(map[uint64]struct{}) } } @@ -755,19 +745,26 @@ func (ms *memStore) FastState(state *StreamState) { func (ms *memStore) State() StreamState { ms.mu.RLock() + defer ms.mu.RUnlock() + state := ms.state state.Consumers = ms.consumers state.Deleted = nil - for seq := range ms.dmap { - state.Deleted = append(state.Deleted, seq) - } - ms.mu.RUnlock() - if len(state.Deleted) > 0 { - sort.Slice(state.Deleted, func(i, j int) bool { - return state.Deleted[i] < state.Deleted[j] - }) - state.NumDeleted = len(state.Deleted) + + // Calculate interior delete details. + if state.LastSeq > state.FirstSeq { + state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1) + if state.NumDeleted > 0 { + state.Deleted = make([]uint64, 0, state.NumDeleted) + // TODO(dlc) - Too Simplistic, once state is updated to allow runs etc redo. + for seq := state.FirstSeq + 1; seq < ms.state.LastSeq; seq++ { + if _, ok := ms.msgs[seq]; !ok { + state.Deleted = append(state.Deleted, seq) + } + } + } } + return state } diff --git a/server/norace_test.go b/server/norace_test.go index c8e07acf..9c0e1a10 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3918,3 +3918,58 @@ func TestNoRaceJetStreamClusterStreamDropCLFS(t *testing.T) { t.Fatalf("Stream was reset") } } + +func TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + MaxMsgsPerSubject: 1, + Storage: nats.MemoryStorage, + }) + require_NoError(t, err) + + acc, err := s.lookupAccount("$G") + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + msg := []byte("Hello World!") + if _, err := js.PublishAsync("foo", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + for i := 1; i <= 1_000_000; i++ { + if _, err := js.PublishAsync("bar", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + now := time.Now() + ss := mset.stateWithDetail(true) + // Before the fix the snapshot for this test would be > 200ms on my setup. + if elapsed := time.Since(now); elapsed > 50*time.Millisecond { + t.Fatalf("Took too long to snapshot: %v", elapsed) + } + + if ss.Msgs != 2 || ss.FirstSeq != 1 || ss.LastSeq != 1_000_001 || ss.NumDeleted != 999999 { + // To not print out on error. + ss.Deleted = nil + t.Fatalf("Bad State: %+v", ss) + } +} diff --git a/server/stream.go b/server/stream.go index a36c5078..aeebb423 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1058,7 +1058,7 @@ func (mset *stream) update(config *StreamConfig) error { return nil } -// Purge will remove all messages from the stream and underlying store. +// Purge will remove all messages from the stream and underlying store based on the request. func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) { mset.mu.Lock() if mset.client == nil {