Memory store tracked interior deletes for stream state, but under KV semantics this could be very large.

Actually faster to not track at all and generate on the fly. Saves lots of memory too.

When we update the stream state to include runs, etc will update this as well.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-12-20 17:37:16 -08:00
parent 490acf5f29
commit af4d7dbe52
3 changed files with 72 additions and 20 deletions

View File

@@ -27,7 +27,6 @@ type memStore struct {
cfg StreamConfig
state StreamState
msgs map[uint64]*storedMsg
dmap map[uint64]struct{}
fss map[string]*SimpleState
maxp int64
scb StorageUpdateHandler
@@ -53,7 +52,6 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
ms := &memStore{
msgs: make(map[uint64]*storedMsg),
fss: make(map[string]*SimpleState),
dmap: make(map[uint64]struct{}),
maxp: cfg.MaxMsgsPer,
cfg: *cfg,
}
@@ -475,7 +473,6 @@ func (ms *memStore) Purge() (uint64, error) {
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*storedMsg)
ms.fss = make(map[string]*SimpleState)
ms.dmap = make(map[uint64]struct{})
ms.mu.Unlock()
if cb != nil {
@@ -511,8 +508,6 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
} else {
delete(ms.dmap, seq)
}
}
ms.state.Msgs -= purged
@@ -554,8 +549,6 @@ func (ms *memStore) Truncate(seq uint64) error {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
delete(ms.msgs, seq)
} else {
delete(ms.dmap, i)
}
}
// Reset last.
@@ -643,13 +636,11 @@ func (ms *memStore) EraseMsg(seq uint64) (bool, error) {
func (ms *memStore) updateFirstSeq(seq uint64) {
if seq != ms.state.FirstSeq {
// Interior delete.
ms.dmap[seq] = struct{}{}
return
}
var nsm *storedMsg
var ok bool
for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ {
delete(ms.dmap, nseq)
if nsm, ok = ms.msgs[nseq]; ok {
break
}
@@ -661,7 +652,6 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
// Like purge.
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = time.Time{}
ms.dmap = make(map[uint64]struct{})
}
}
@@ -755,19 +745,26 @@ func (ms *memStore) FastState(state *StreamState) {
func (ms *memStore) State() StreamState {
ms.mu.RLock()
defer ms.mu.RUnlock()
state := ms.state
state.Consumers = ms.consumers
state.Deleted = nil
for seq := range ms.dmap {
state.Deleted = append(state.Deleted, seq)
}
ms.mu.RUnlock()
if len(state.Deleted) > 0 {
sort.Slice(state.Deleted, func(i, j int) bool {
return state.Deleted[i] < state.Deleted[j]
})
state.NumDeleted = len(state.Deleted)
// Calculate interior delete details.
if state.LastSeq > state.FirstSeq {
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
if state.NumDeleted > 0 {
state.Deleted = make([]uint64, 0, state.NumDeleted)
// TODO(dlc) - Too Simplistic, once state is updated to allow runs etc redo.
for seq := state.FirstSeq + 1; seq < ms.state.LastSeq; seq++ {
if _, ok := ms.msgs[seq]; !ok {
state.Deleted = append(state.Deleted, seq)
}
}
}
}
return state
}

View File

@@ -3918,3 +3918,58 @@ func TestNoRaceJetStreamClusterStreamDropCLFS(t *testing.T) {
t.Fatalf("Stream was reset")
}
}
func TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
config := s.JetStreamConfig()
if config != nil {
defer removeDir(t, config.StoreDir)
}
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
MaxMsgsPerSubject: 1,
Storage: nats.MemoryStorage,
})
require_NoError(t, err)
acc, err := s.lookupAccount("$G")
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
msg := []byte("Hello World!")
if _, err := js.PublishAsync("foo", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
for i := 1; i <= 1_000_000; i++ {
if _, err := js.PublishAsync("bar", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
now := time.Now()
ss := mset.stateWithDetail(true)
// Before the fix the snapshot for this test would be > 200ms on my setup.
if elapsed := time.Since(now); elapsed > 50*time.Millisecond {
t.Fatalf("Took too long to snapshot: %v", elapsed)
}
if ss.Msgs != 2 || ss.FirstSeq != 1 || ss.LastSeq != 1_000_001 || ss.NumDeleted != 999999 {
// To not print out on error.
ss.Deleted = nil
t.Fatalf("Bad State: %+v", ss)
}
}

View File

@@ -1058,7 +1058,7 @@ func (mset *stream) update(config *StreamConfig) error {
return nil
}
// Purge will remove all messages from the stream and underlying store.
// Purge will remove all messages from the stream and underlying store based on the request.
func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) {
mset.mu.Lock()
if mset.client == nil {