diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 606c8e27..8101fb80 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6599,6 +6599,33 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { // On exit, we will release our semaphore if we acquired it. defer releaseSyncOutSem() + // Check our final state when we exit cleanly. + // If this snapshot was for messages no longer held by the leader we want to make sure + // we are synched for the next message sequence properly. + lastRequested := sreq.LastSeq + checkFinalState := func() { + if mset != nil { + mset.mu.Lock() + var state StreamState + mset.store.FastState(&state) + var didReset bool + firstExpected := lastRequested + 1 + if state.FirstSeq != firstExpected { + // Reset our notion of first. + mset.store.Compact(firstExpected) + mset.store.FastState(&state) + // Make sure last is also correct in case this also moved. + mset.lseq = state.LastSeq + didReset = true + } + mset.mu.Unlock() + if didReset { + s.Warnf("Catchup for stream '%s > %s' resetting first sequence: %d on catchup complete", + mset.account(), mset.name(), firstExpected) + } + } + } + RETRY: // On retry, we need to release the semaphore we got. Call will be no-op // if releaseSem boolean has not been set to true on successfully getting @@ -6642,6 +6669,8 @@ RETRY: if sreq == nil { return nil } + // Reset notion of lastRequested + lastRequested = sreq.LastSeq } // Used to transfer message from the wire to another Go routine internally. @@ -6665,8 +6694,11 @@ RETRY: err = nil goto RETRY } + // Send our sync request. b, _ := json.Marshal(sreq) s.sendInternalMsgLocked(subject, reply, nil, b) + // Remember when we sent this out to avoimd loop spins on errors below. + reqSendTime := time.Now() // Clear our sync request and capture last. last := sreq.LastSeq @@ -6687,6 +6719,7 @@ RETRY: // Check for eof signaling. if len(msg) == 0 { msgsQ.recycle(&mrecs) + checkFinalState() return nil } if lseq, err := mset.processCatchupMsg(msg); err == nil { @@ -6707,6 +6740,19 @@ RETRY: } else { s.Warnf("Catchup for stream '%s > %s' errored, will retry: %v", mset.account(), mset.name(), err) msgsQ.recycle(&mrecs) + + // Make sure we do not spin and make things worse. + const minRetryWait = 2 * time.Second + elapsed := time.Since(reqSendTime) + if elapsed < minRetryWait { + select { + case <-s.quitCh: + return ErrServerNotRunning + case <-qch: + return ErrStreamStopped + case <-time.After(minRetryWait - elapsed): + } + } goto RETRY } if mrec.reply != _EMPTY_ { @@ -7087,6 +7133,20 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { notActive := time.NewTimer(activityInterval) defer notActive.Stop() + // Grab our state. + var state StreamState + mset.mu.RLock() + mset.store.FastState(&state) + mset.mu.RUnlock() + + // Reset notion of first if this request wants sequences before our starting sequence + // and we would have nothing to send. If we have partial messages still need to send skips for those. + if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq { + s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d", + mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq) + sreq.FirstSeq = state.FirstSeq + } + // Setup sequences to walk through. seq, last := sreq.FirstSeq, sreq.LastSeq mset.setCatchupPeer(sreq.Peer, last-seq) @@ -7096,6 +7156,14 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Update our activity timer. notActive.Reset(activityInterval) + // Check if we know we will not enter the loop because we are done. + if seq > last { + s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) + // EOF + s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) + return false + } + var smv StoreMsg for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbTotal() <= maxTotalCatchupOutBytes; seq++ { sm, err := mset.store.LoadMsg(seq, &smv) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 16fd5cb2..901b73c7 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -12118,3 +12118,84 @@ func TestJetStreamClusterUnknownReplicaOnClusterRestart(t *testing.T) { t.Fatalf("Should have had an unknown server name, did not: %+v - %+v", si.Cluster.Replicas[0], si.Cluster.Replicas[1]) } } + +func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + MaxAge: 5 * time.Second, + Replicas: 3, + }) + require_NoError(t, err) + + sl := c.streamLeader("$G", "TEST") + nl := c.randomNonStreamLeader("$G", "TEST") + + // Make sure we do not get disconnected when shutting the non-leader down. + nc, js = jsClientConnect(t, sl) + defer nc.Close() + + send1k := func() { + t.Helper() + for i := 0; i < 1000; i++ { + js.PublishAsync("foo", []byte("SNAP")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + } + + // Send first 100 to everyone. + send1k() + + // Now shutdown a non-leader. + c.waitOnStreamCurrent(nl, "$G", "TEST") + nl.Shutdown() + + // Send another 100. + send1k() + + // Force snapshot on the leader. + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + err = mset.raftNode().InstallSnapshot(mset.stateSnapshot()) + require_NoError(t, err) + + // Purge + err = js.PurgeStream("TEST") + require_NoError(t, err) + + // Send another 100. + send1k() + + // We want to make sure we do not send unnecessary skip msgs when we know we do not have all of these messages. + nc, _ = jsClientConnect(t, sl, nats.UserInfo("admin", "s3cr3t!")) + sub, err := nc.SubscribeSync("$JSC.R.>") + require_NoError(t, err) + + // Now restart non-leader. + nl = c.restartServer(nl) + c.waitOnStreamCurrent(nl, "$G", "TEST") + + // Grab state directly from non-leader. + mset, err = nl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + if state := mset.state(); state.FirstSeq != 2001 || state.LastSeq != 3000 { + t.Fatalf("Incorrect state: %+v", state) + } + + // Make sure we only sent 1 sync catchup msg. + nmsgs, _, _ := sub.Pending() + if nmsgs != 1 { + t.Fatalf("Expected only 1 sync catchup msg to be sent signaling eof, but got %d", nmsgs) + } +}