When creating replicated mirrors where the source stream had a very large starting sequence number, the server would use excessive CPU and Memory.

This is due to the mirroring functionality trying to skip messages when it detects a gap. In a replicated stream this puts excessive stress on the raft system.
This step is not needed at all if the mirror stream has no messages, we can simply jump ahead.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-06-15 17:20:15 -07:00
parent 367d857612
commit 087a28a13e
2 changed files with 76 additions and 3 deletions

View File

@@ -8138,3 +8138,63 @@ func TestNoRaceCheckAckFloorWithVeryLargeFirstSeqAndNewConsumers(t *testing.T) {
t.Fatalf("Check ack floor taking too long!")
}
}
func TestNoRaceReplicatedMirrorWithLargeStartingSequenceOverLeafnode(t *testing.T) {
// Cluster B
tmpl := strings.Replace(jsClusterTempl, "store_dir:", "domain: B, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "B", _EMPTY_, 3, 22020, true)
defer c.shutdown()
// Cluster A
// Domain is "A'
lc := c.createLeafNodesWithStartPortAndDomain("A", 3, 22110, "A")
defer lc.shutdown()
lc.waitOnClusterReady()
// Create a stream on B (HUB/CLOUD) and set its starting sequence very high.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 1_000_000_000})
require_NoError(t, err)
// Send in a small amount of messages.
for i := 0; i < 1000; i++ {
sendStreamMsg(t, nc, "foo", "Hello")
}
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.FirstSeq == 1_000_000_000)
// Now try to create a replicated mirror on the leaf cluster.
lnc, ljs := jsClientConnect(t, lc.randomServer())
defer lnc.Close()
_, err = ljs.AddStream(&nats.StreamConfig{
Name: "TEST",
Mirror: &nats.StreamSource{
Name: "TEST",
Domain: "B",
},
})
require_NoError(t, err)
// Make sure we sync quickly.
checkFor(t, time.Second, 200*time.Millisecond, func() error {
si, err = ljs.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == 1000 && si.State.FirstSeq == 1_000_000_000 {
return nil
}
return fmt.Errorf("Mirror state not correct: %+v", si.State)
})
}

View File

@@ -1671,7 +1671,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
mset.mu.RUnlock()
return 0, errors.New("sealed stream")
}
store := mset.store
store, mlseq := mset.store, mset.lseq
mset.mu.RUnlock()
if preq != nil {
@@ -1683,11 +1683,17 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
return purged, err
}
// Purge consumers.
// Grab our stream state.
var state StreamState
store.FastState(&state)
fseq, lseq := state.FirstSeq, state.LastSeq
// Check if our last has moved past what our original last sequence was, if so reset.
if lseq > mlseq {
mset.setLastSeq(lseq)
}
// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
ss := store.FilteredState(state.FirstSeq, preq.Subject)
@@ -2399,7 +2405,14 @@ func (mset *stream) setupMirrorConsumer() error {
// Check if we need to skip messages.
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
// Check to see if delivered is past our last and we have no msgs. This will help the
// case when mirroring a stream that has a very high starting sequence number.
if state.Msgs == 0 && ccr.ConsumerInfo.Delivered.Stream > state.LastSeq {
mset.store.PurgeEx(_EMPTY_, ccr.ConsumerInfo.Delivered.Stream+1, 0)
mset.lseq = ccr.ConsumerInfo.Delivered.Stream
} else {
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
}
}
// Capture consumer name.