Make sure we can recover an underlying node being stopped.

Do not return healthy if the node is closed, and wait a bit longer for forward progress.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-29 07:42:23 -07:00
parent 85f6bfb2ac
commit 546dd0c9ab
3 changed files with 37 additions and 4 deletions

View File

@@ -537,8 +537,10 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
// Non-clustered mode
return true
}
o := mset.lookupConsumer(consumer)
if o == nil {
// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
js.mu.Lock()
if ca.Group != nil {
ca.Group.node = nil
@@ -548,10 +550,19 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
if !deleted {
js.processConsumerAssignment(ca)
}
}
o := mset.lookupConsumer(consumer)
if o == nil {
restartConsumer()
return false
}
if node := o.raftNode(); node == nil || node.Healthy() {
return true
} else if node != nil && node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()
}
return false
}

View File

@@ -3833,4 +3833,19 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) {
}
return nil
})
// Now just stop the raft node from underneath the consumer.
o = mset.lookupConsumer("d")
require_NotNil(t, o)
node := o.raftNode()
require_NotNil(t, node)
node.Stop()
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
}

View File

@@ -1172,9 +1172,16 @@ func (n *raft) isCatchingUp() bool {
return n.catchup != nil
}
// Lock should be held. This function may block for up to ~5ms to check
// This function may block for up to ~10ms to check
// forward progress in some cases.
// Lock should be held.
func (n *raft) isCurrent(includeForwardProgress bool) bool {
// Check if we are closed.
if n.state == Closed {
n.debug("Not current, node is closed")
return false
}
// Check whether we've made progress on any state, 0 is invalid so not healthy.
if n.commit == 0 {
n.debug("Not current, no commits")
@@ -1219,7 +1226,7 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
if startDelta := n.commit - n.applied; startDelta > 0 {
for i := 0; i < 10; i++ { // 5ms, in 0.5ms increments
n.Unlock()
time.Sleep(time.Millisecond / 2)
time.Sleep(time.Millisecond)
n.Lock()
if n.commit-n.applied < startDelta {
// The gap is getting smaller, so we're making forward progress.