mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
Improve handling when a snapshot represents state we no longer have.
We would send skip messages for a sync request that was completely below our current state, but this could be more traffic then we might want. Now we only send EOF and the other side can detect the skip forward and adjust on a successful catchup. We still send skips if we can partially fill the sync request. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
committed by
Ivan Kozlovic
parent
33c4fec75f
commit
5a050fc10b
@@ -6599,6 +6599,33 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
|
||||
// On exit, we will release our semaphore if we acquired it.
|
||||
defer releaseSyncOutSem()
|
||||
|
||||
// Check our final state when we exit cleanly.
|
||||
// If this snapshot was for messages no longer held by the leader we want to make sure
|
||||
// we are synched for the next message sequence properly.
|
||||
lastRequested := sreq.LastSeq
|
||||
checkFinalState := func() {
|
||||
if mset != nil {
|
||||
mset.mu.Lock()
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
var didReset bool
|
||||
firstExpected := lastRequested + 1
|
||||
if state.FirstSeq != firstExpected {
|
||||
// Reset our notion of first.
|
||||
mset.store.Compact(firstExpected)
|
||||
mset.store.FastState(&state)
|
||||
// Make sure last is also correct in case this also moved.
|
||||
mset.lseq = state.LastSeq
|
||||
didReset = true
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
if didReset {
|
||||
s.Warnf("Catchup for stream '%s > %s' resetting first sequence: %d on catchup complete",
|
||||
mset.account(), mset.name(), firstExpected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RETRY:
|
||||
// On retry, we need to release the semaphore we got. Call will be no-op
|
||||
// if releaseSem boolean has not been set to true on successfully getting
|
||||
@@ -6642,6 +6669,8 @@ RETRY:
|
||||
if sreq == nil {
|
||||
return nil
|
||||
}
|
||||
// Reset notion of lastRequested
|
||||
lastRequested = sreq.LastSeq
|
||||
}
|
||||
|
||||
// Used to transfer message from the wire to another Go routine internally.
|
||||
@@ -6665,8 +6694,11 @@ RETRY:
|
||||
err = nil
|
||||
goto RETRY
|
||||
}
|
||||
// Send our sync request.
|
||||
b, _ := json.Marshal(sreq)
|
||||
s.sendInternalMsgLocked(subject, reply, nil, b)
|
||||
// Remember when we sent this out to avoimd loop spins on errors below.
|
||||
reqSendTime := time.Now()
|
||||
|
||||
// Clear our sync request and capture last.
|
||||
last := sreq.LastSeq
|
||||
@@ -6687,6 +6719,7 @@ RETRY:
|
||||
// Check for eof signaling.
|
||||
if len(msg) == 0 {
|
||||
msgsQ.recycle(&mrecs)
|
||||
checkFinalState()
|
||||
return nil
|
||||
}
|
||||
if lseq, err := mset.processCatchupMsg(msg); err == nil {
|
||||
@@ -6707,6 +6740,19 @@ RETRY:
|
||||
} else {
|
||||
s.Warnf("Catchup for stream '%s > %s' errored, will retry: %v", mset.account(), mset.name(), err)
|
||||
msgsQ.recycle(&mrecs)
|
||||
|
||||
// Make sure we do not spin and make things worse.
|
||||
const minRetryWait = 2 * time.Second
|
||||
elapsed := time.Since(reqSendTime)
|
||||
if elapsed < minRetryWait {
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
return ErrServerNotRunning
|
||||
case <-qch:
|
||||
return ErrStreamStopped
|
||||
case <-time.After(minRetryWait - elapsed):
|
||||
}
|
||||
}
|
||||
goto RETRY
|
||||
}
|
||||
if mrec.reply != _EMPTY_ {
|
||||
@@ -7087,6 +7133,20 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
notActive := time.NewTimer(activityInterval)
|
||||
defer notActive.Stop()
|
||||
|
||||
// Grab our state.
|
||||
var state StreamState
|
||||
mset.mu.RLock()
|
||||
mset.store.FastState(&state)
|
||||
mset.mu.RUnlock()
|
||||
|
||||
// Reset notion of first if this request wants sequences before our starting sequence
|
||||
// and we would have nothing to send. If we have partial messages still need to send skips for those.
|
||||
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
|
||||
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
|
||||
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
|
||||
sreq.FirstSeq = state.FirstSeq
|
||||
}
|
||||
|
||||
// Setup sequences to walk through.
|
||||
seq, last := sreq.FirstSeq, sreq.LastSeq
|
||||
mset.setCatchupPeer(sreq.Peer, last-seq)
|
||||
@@ -7096,6 +7156,14 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
// Update our activity timer.
|
||||
notActive.Reset(activityInterval)
|
||||
|
||||
// Check if we know we will not enter the loop because we are done.
|
||||
if seq > last {
|
||||
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
|
||||
// EOF
|
||||
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
|
||||
return false
|
||||
}
|
||||
|
||||
var smv StoreMsg
|
||||
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbTotal() <= maxTotalCatchupOutBytes; seq++ {
|
||||
sm, err := mset.store.LoadMsg(seq, &smv)
|
||||
|
||||
@@ -12118,3 +12118,84 @@ func TestJetStreamClusterUnknownReplicaOnClusterRestart(t *testing.T) {
|
||||
t.Fatalf("Should have had an unknown server name, did not: %+v - %+v", si.Cluster.Replicas[0], si.Cluster.Replicas[1])
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
MaxAge: 5 * time.Second,
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
sl := c.streamLeader("$G", "TEST")
|
||||
nl := c.randomNonStreamLeader("$G", "TEST")
|
||||
|
||||
// Make sure we do not get disconnected when shutting the non-leader down.
|
||||
nc, js = jsClientConnect(t, sl)
|
||||
defer nc.Close()
|
||||
|
||||
send1k := func() {
|
||||
t.Helper()
|
||||
for i := 0; i < 1000; i++ {
|
||||
js.PublishAsync("foo", []byte("SNAP"))
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
}
|
||||
|
||||
// Send first 100 to everyone.
|
||||
send1k()
|
||||
|
||||
// Now shutdown a non-leader.
|
||||
c.waitOnStreamCurrent(nl, "$G", "TEST")
|
||||
nl.Shutdown()
|
||||
|
||||
// Send another 100.
|
||||
send1k()
|
||||
|
||||
// Force snapshot on the leader.
|
||||
mset, err := sl.GlobalAccount().lookupStream("TEST")
|
||||
require_NoError(t, err)
|
||||
err = mset.raftNode().InstallSnapshot(mset.stateSnapshot())
|
||||
require_NoError(t, err)
|
||||
|
||||
// Purge
|
||||
err = js.PurgeStream("TEST")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Send another 100.
|
||||
send1k()
|
||||
|
||||
// We want to make sure we do not send unnecessary skip msgs when we know we do not have all of these messages.
|
||||
nc, _ = jsClientConnect(t, sl, nats.UserInfo("admin", "s3cr3t!"))
|
||||
sub, err := nc.SubscribeSync("$JSC.R.>")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Now restart non-leader.
|
||||
nl = c.restartServer(nl)
|
||||
c.waitOnStreamCurrent(nl, "$G", "TEST")
|
||||
|
||||
// Grab state directly from non-leader.
|
||||
mset, err = nl.GlobalAccount().lookupStream("TEST")
|
||||
require_NoError(t, err)
|
||||
|
||||
if state := mset.state(); state.FirstSeq != 2001 || state.LastSeq != 3000 {
|
||||
t.Fatalf("Incorrect state: %+v", state)
|
||||
}
|
||||
|
||||
// Make sure we only sent 1 sync catchup msg.
|
||||
nmsgs, _, _ := sub.Pending()
|
||||
if nmsgs != 1 {
|
||||
t.Fatalf("Expected only 1 sync catchup msg to be sent signaling eof, but got %d", nmsgs)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user