Don't mark a clustered stream as unhealthy if making forward progress, add TestJetStreamClusterCurrentVsHealth

This commit is contained in:
Neil Twigg
2023-01-24 11:50:08 +00:00
parent 461aad17a5
commit 83932b4be6
4 changed files with 138 additions and 10 deletions

View File

@@ -408,6 +408,47 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
return false
}
// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
// Read lock should be held.
func (cc *jetStreamCluster) isStreamHealthy(account, stream string) bool {
if cc == nil {
// Non-clustered mode
return true
}
as := cc.streams[account]
if as == nil {
return false
}
sa := as[stream]
if sa == nil {
return false
}
rg := sa.Group
if rg == nil {
return false
}
if rg.node == nil || rg.node.Healthy() {
// Check if we are processing a snapshot and are catching up.
acc, err := cc.s.LookupAccount(account)
if err != nil {
return false
}
mset, err := acc.lookupStream(stream)
if err != nil {
return false
}
if mset.isCatchingUp() {
return false
}
// Success.
return true
}
return false
}
// isConsumerCurrent will determine if the consumer is up to date.
// For R1 it will make sure the consunmer is present on this server.
// Read lock should be held.

View File

@@ -2500,3 +2500,51 @@ func TestJetStreamClusterKVWatchersWithServerDown(t *testing.T) {
w.Stop()
}
}
// TestJetStreamClusterCurrentVsHealth is designed to show the
// difference between "current" and "healthy" when async publishes
// outpace the rate at which they can be applied.
func TestJetStreamClusterCurrentVsHealth(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
c.waitOnLeader()
server := c.randomNonLeader()
nc, js := jsClientConnect(t, server)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
server = c.randomNonStreamLeader(globalAccountName, "TEST")
stream, err := server.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
raft, ok := stream.raftGroup().node.(*raft)
require_True(t, ok)
for i := 0; i < 1000; i++ {
_, err := js.PublishAsync("foo", []byte("bar"))
require_NoError(t, err)
raft.RLock()
commit := raft.commit
applied := raft.applied
raft.RUnlock()
current := raft.Current()
healthy := raft.Healthy()
if !current || !healthy || commit != applied {
t.Logf(
"%d | Current %v, healthy %v, commit %d, applied %d, pending %d",
i, current, healthy, commit, applied, commit-applied,
)
}
}
}

View File

@@ -3105,7 +3105,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
for stream, sa := range asa {
if sa.Group.isMember(ourID) {
// Make sure we can look up
if !cc.isStreamCurrent(acc, stream) {
if !cc.isStreamHealthy(acc, stream) {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream)
return health

View File

@@ -47,6 +47,7 @@ type RaftNode interface {
Leader() bool
Quorum() bool
Current() bool
Healthy() bool
Term() uint64
GroupLeader() string
HadPreviousLeader() bool
@@ -1160,13 +1161,15 @@ func (n *raft) isCatchingUp() bool {
return n.catchup != nil
}
// Lock should be held.
func (n *raft) isCurrent() bool {
// First check if have had activity and we match commit and applied.
if n.commit == 0 || n.commit != n.applied {
n.debug("Not current, commit %d != applied %d", n.commit, n.applied)
// Lock should be held. This function may block for up to ~5ms to check
// forward progress in some cases.
func (n *raft) isCurrent(includeForwardProgress bool) bool {
// Check whether we've made progress on any state, 0 is invalid so not healthy.
if n.commit == 0 {
n.debug("Not current, no commits")
return false
}
// Make sure we are the leader or we know we have heard from the leader recently.
if n.state == Leader {
return true
@@ -1181,14 +1184,40 @@ func (n *raft) isCurrent() bool {
if n.leader != noLeader && n.leader != n.id && n.catchup == nil {
okInterval := int64(hbInterval) * 2
ts := time.Now().UnixNano()
if ps := n.peers[n.leader]; ps != nil && ps.ts > 0 && (ts-ps.ts) <= okInterval {
return true
if ps := n.peers[n.leader]; ps == nil || ps.ts == 0 && (ts-ps.ts) > okInterval {
n.debug("Not current, no recent leader contact")
return false
}
n.debug("Not current, no recent leader contact")
}
if cs := n.catchup; cs != nil {
n.debug("Not current, still catching up pindex=%d, cindex=%d", n.pindex, cs.cindex)
}
if n.commit == n.applied {
// At this point if we are current, we can return saying so.
return true
} else if !includeForwardProgress {
// Otherwise, if we aren't allowed to include forward progress
// (i.e. we are checking "current" instead of "healthy") then
// give up now.
return false
}
// Otherwise, wait for a short period of time and see if we are making any
// forward progress.
if startDelta := n.commit - n.applied; startDelta > 0 {
for i := 0; i < 10; i++ { // 5ms, in 0.5ms increments
n.RUnlock()
time.Sleep(time.Millisecond / 2)
n.RLock()
if n.commit-n.applied < startDelta {
// The gap is getting smaller, so we're making forward progress.
return true
}
}
}
n.warn("Falling behind in health check, commit %d != applied %d", n.commit, n.applied)
return false
}
@@ -1199,7 +1228,17 @@ func (n *raft) Current() bool {
}
n.RLock()
defer n.RUnlock()
return n.isCurrent()
return n.isCurrent(false)
}
// Healthy returns if we are the leader for our group and nearly up-to-date.
func (n *raft) Healthy() bool {
if n == nil {
return false
}
n.RLock()
defer n.RUnlock()
return n.isCurrent(true)
}
// HadPreviousLeader indicates if this group ever had a leader.