Merge pull request #2752 from nats-io/memstore_lid

Memstore tracking of interior deletes improved.
This commit is contained in:
Derek Collison
2021-12-20 18:06:06 -08:00
committed by GitHub
4 changed files with 76 additions and 22 deletions

View File

@@ -4868,8 +4868,9 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error {
mset.processSnapshotDeletes(snap)
mset.mu.Lock()
var state StreamState
mset.clfs = snap.Failed
state := mset.store.State()
mset.store.FastState(&state)
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
mset.mu.Unlock()
@@ -4931,7 +4932,8 @@ RETRY:
// Grab sync request again on failures.
if sreq == nil {
mset.mu.Lock()
state := mset.store.State()
var state StreamState
mset.store.FastState(&state)
sreq = mset.calculateSyncRequest(&state, snap)
mset.mu.Unlock()
if sreq == nil {

View File

@@ -27,7 +27,6 @@ type memStore struct {
cfg StreamConfig
state StreamState
msgs map[uint64]*storedMsg
dmap map[uint64]struct{}
fss map[string]*SimpleState
maxp int64
scb StorageUpdateHandler
@@ -53,7 +52,6 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
ms := &memStore{
msgs: make(map[uint64]*storedMsg),
fss: make(map[string]*SimpleState),
dmap: make(map[uint64]struct{}),
maxp: cfg.MaxMsgsPer,
cfg: *cfg,
}
@@ -475,7 +473,6 @@ func (ms *memStore) Purge() (uint64, error) {
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*storedMsg)
ms.fss = make(map[string]*SimpleState)
ms.dmap = make(map[uint64]struct{})
ms.mu.Unlock()
if cb != nil {
@@ -511,8 +508,6 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
} else {
delete(ms.dmap, seq)
}
}
ms.state.Msgs -= purged
@@ -554,8 +549,6 @@ func (ms *memStore) Truncate(seq uint64) error {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
delete(ms.msgs, seq)
} else {
delete(ms.dmap, i)
}
}
// Reset last.
@@ -643,13 +636,11 @@ func (ms *memStore) EraseMsg(seq uint64) (bool, error) {
func (ms *memStore) updateFirstSeq(seq uint64) {
if seq != ms.state.FirstSeq {
// Interior delete.
ms.dmap[seq] = struct{}{}
return
}
var nsm *storedMsg
var ok bool
for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ {
delete(ms.dmap, nseq)
if nsm, ok = ms.msgs[nseq]; ok {
break
}
@@ -661,7 +652,6 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
// Like purge.
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = time.Time{}
ms.dmap = make(map[uint64]struct{})
}
}
@@ -755,19 +745,26 @@ func (ms *memStore) FastState(state *StreamState) {
func (ms *memStore) State() StreamState {
ms.mu.RLock()
defer ms.mu.RUnlock()
state := ms.state
state.Consumers = ms.consumers
state.Deleted = nil
for seq := range ms.dmap {
state.Deleted = append(state.Deleted, seq)
}
ms.mu.RUnlock()
if len(state.Deleted) > 0 {
sort.Slice(state.Deleted, func(i, j int) bool {
return state.Deleted[i] < state.Deleted[j]
})
state.NumDeleted = len(state.Deleted)
// Calculate interior delete details.
if state.LastSeq > state.FirstSeq {
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
if state.NumDeleted > 0 {
state.Deleted = make([]uint64, 0, state.NumDeleted)
// TODO(dlc) - Too Simplistic, once state is updated to allow runs etc redo.
for seq := state.FirstSeq + 1; seq < ms.state.LastSeq; seq++ {
if _, ok := ms.msgs[seq]; !ok {
state.Deleted = append(state.Deleted, seq)
}
}
}
}
return state
}

View File

@@ -3918,3 +3918,58 @@ func TestNoRaceJetStreamClusterStreamDropCLFS(t *testing.T) {
t.Fatalf("Stream was reset")
}
}
func TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
config := s.JetStreamConfig()
if config != nil {
defer removeDir(t, config.StoreDir)
}
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
MaxMsgsPerSubject: 1,
Storage: nats.MemoryStorage,
})
require_NoError(t, err)
acc, err := s.lookupAccount("$G")
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
msg := []byte("Hello World!")
if _, err := js.PublishAsync("foo", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
for i := 1; i <= 1_000_000; i++ {
if _, err := js.PublishAsync("bar", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
now := time.Now()
ss := mset.stateWithDetail(true)
// Before the fix the snapshot for this test would be > 200ms on my setup.
if elapsed := time.Since(now); elapsed > 50*time.Millisecond {
t.Fatalf("Took too long to snapshot: %v", elapsed)
}
if ss.Msgs != 2 || ss.FirstSeq != 1 || ss.LastSeq != 1_000_001 || ss.NumDeleted != 999999 {
// To not print out on error.
ss.Deleted = nil
t.Fatalf("Bad State: %+v", ss)
}
}

View File

@@ -1058,7 +1058,7 @@ func (mset *stream) update(config *StreamConfig) error {
return nil
}
// Purge will remove all messages from the stream and underlying store.
// Purge will remove all messages from the stream and underlying store based on the request.
func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) {
mset.mu.Lock()
if mset.client == nil {