From f3553791b11598c3cd049621b22a39ee7eec1b9d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 17 May 2023 13:14:33 -0700 Subject: [PATCH] Updates to stream reset logic. 1. When catching up do not try forever and if needed reset cluster state. 2. In checking if a stream is healthy check for node drift. 3. When restarting a stream make sure the current node is stopped. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 60354874..5972ba5b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -456,6 +456,9 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { } // Make sure to clear out the raft node if still present in the meta layer. if rg := sa.Group; rg != nil && rg.node != nil { + if rg.node.State() != Closed { + rg.node.Stop() + } rg.node = nil } js.mu.Unlock() @@ -493,7 +496,7 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { // For R1 it will make sure the stream is present on this server. func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { js.mu.Lock() - cc := js.cluster + s, cc := js.srv, js.cluster if cc == nil { // Non-clustered mode js.mu.Unlock() @@ -523,7 +526,11 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { if !mset.isCatchingUp() { return true } + } else if node != nil && node != mset.raftNode() { + s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName) + mset.resetClusteredState(nil) } + return false } @@ -7590,6 +7597,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { } } + // Do not let this go on forever. + const maxRetries = 3 + var numRetries int + RETRY: // On retry, we need to release the semaphore we got. Call will be no-op // if releaseSem boolean has not been set to true on successfully getting @@ -7606,13 +7617,20 @@ RETRY: sub = nil } - // Block here if we have too many requests in flight. - <-s.syncOutSem - releaseSem = true if !s.isRunning() { return ErrServerNotRunning } + numRetries++ + if numRetries >= maxRetries { + // Force a hard reset here. + return errFirstSequenceMismatch + } + + // Block here if we have too many requests in flight. + <-s.syncOutSem + releaseSem = true + // We may have been blocked for a bit, so the reset need to ensure that we // consume the already fired timer. if !notActive.Stop() {