mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Updates to stream reset logic.
1. When catching up do not try forever and if needed reset cluster state. 2. In checking if a stream is healthy check for node drift. 3. When restarting a stream make sure the current node is stopped. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -456,6 +456,9 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
|
||||
}
|
||||
// Make sure to clear out the raft node if still present in the meta layer.
|
||||
if rg := sa.Group; rg != nil && rg.node != nil {
|
||||
if rg.node.State() != Closed {
|
||||
rg.node.Stop()
|
||||
}
|
||||
rg.node = nil
|
||||
}
|
||||
js.mu.Unlock()
|
||||
@@ -493,7 +496,7 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
|
||||
// For R1 it will make sure the stream is present on this server.
|
||||
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
|
||||
js.mu.Lock()
|
||||
cc := js.cluster
|
||||
s, cc := js.srv, js.cluster
|
||||
if cc == nil {
|
||||
// Non-clustered mode
|
||||
js.mu.Unlock()
|
||||
@@ -523,7 +526,11 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
|
||||
if !mset.isCatchingUp() {
|
||||
return true
|
||||
}
|
||||
} else if node != nil && node != mset.raftNode() {
|
||||
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
|
||||
mset.resetClusteredState(nil)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -7590,6 +7597,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Do not let this go on forever.
|
||||
const maxRetries = 3
|
||||
var numRetries int
|
||||
|
||||
RETRY:
|
||||
// On retry, we need to release the semaphore we got. Call will be no-op
|
||||
// if releaseSem boolean has not been set to true on successfully getting
|
||||
@@ -7606,13 +7617,20 @@ RETRY:
|
||||
sub = nil
|
||||
}
|
||||
|
||||
// Block here if we have too many requests in flight.
|
||||
<-s.syncOutSem
|
||||
releaseSem = true
|
||||
if !s.isRunning() {
|
||||
return ErrServerNotRunning
|
||||
}
|
||||
|
||||
numRetries++
|
||||
if numRetries >= maxRetries {
|
||||
// Force a hard reset here.
|
||||
return errFirstSequenceMismatch
|
||||
}
|
||||
|
||||
// Block here if we have too many requests in flight.
|
||||
<-s.syncOutSem
|
||||
releaseSem = true
|
||||
|
||||
// We may have been blocked for a bit, so the reset need to ensure that we
|
||||
// consume the already fired timer.
|
||||
if !notActive.Stop() {
|
||||
|
||||
Reference in New Issue
Block a user