diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 22a5aa76..5fc58336 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -408,6 +408,47 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { return false } +// isStreamHealthy will determine if the stream is up to date or very close. +// For R1 it will make sure the stream is present on this server. +// Read lock should be held. +func (cc *jetStreamCluster) isStreamHealthy(account, stream string) bool { + if cc == nil { + // Non-clustered mode + return true + } + as := cc.streams[account] + if as == nil { + return false + } + sa := as[stream] + if sa == nil { + return false + } + rg := sa.Group + if rg == nil { + return false + } + + if rg.node == nil || rg.node.Healthy() { + // Check if we are processing a snapshot and are catching up. + acc, err := cc.s.LookupAccount(account) + if err != nil { + return false + } + mset, err := acc.lookupStream(stream) + if err != nil { + return false + } + if mset.isCatchingUp() { + return false + } + // Success. + return true + } + + return false +} + // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. // Read lock should be held. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ce865116..4665b5d1 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2500,3 +2500,51 @@ func TestJetStreamClusterKVWatchersWithServerDown(t *testing.T) { w.Stop() } } + +// TestJetStreamClusterCurrentVsHealth is designed to show the +// difference between "current" and "healthy" when async publishes +// outpace the rate at which they can be applied. +func TestJetStreamClusterCurrentVsHealth(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + c.waitOnLeader() + server := c.randomNonLeader() + + nc, js := jsClientConnect(t, server) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + server = c.randomNonStreamLeader(globalAccountName, "TEST") + stream, err := server.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + raft, ok := stream.raftGroup().node.(*raft) + require_True(t, ok) + + for i := 0; i < 1000; i++ { + _, err := js.PublishAsync("foo", []byte("bar")) + require_NoError(t, err) + + raft.RLock() + commit := raft.commit + applied := raft.applied + raft.RUnlock() + + current := raft.Current() + healthy := raft.Healthy() + + if !current || !healthy || commit != applied { + t.Logf( + "%d | Current %v, healthy %v, commit %d, applied %d, pending %d", + i, current, healthy, commit, applied, commit-applied, + ) + } + } +} diff --git a/server/monitor.go b/server/monitor.go index d2aadfc6..3322a3ee 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3105,7 +3105,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { for stream, sa := range asa { if sa.Group.isMember(ourID) { // Make sure we can look up - if !cc.isStreamCurrent(acc, stream) { + if !cc.isStreamHealthy(acc, stream) { health.Status = na health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream) return health diff --git a/server/raft.go b/server/raft.go index 88077360..1c95aeef 100644 --- a/server/raft.go +++ b/server/raft.go @@ -47,6 +47,7 @@ type RaftNode interface { Leader() bool Quorum() bool Current() bool + Healthy() bool Term() uint64 GroupLeader() string HadPreviousLeader() bool @@ -1160,13 +1161,15 @@ func (n *raft) isCatchingUp() bool { return n.catchup != nil } -// Lock should be held. -func (n *raft) isCurrent() bool { - // First check if have had activity and we match commit and applied. - if n.commit == 0 || n.commit != n.applied { - n.debug("Not current, commit %d != applied %d", n.commit, n.applied) +// Lock should be held. This function may block for up to ~5ms to check +// forward progress in some cases. +func (n *raft) isCurrent(includeForwardProgress bool) bool { + // Check whether we've made progress on any state, 0 is invalid so not healthy. + if n.commit == 0 { + n.debug("Not current, no commits") return false } + // Make sure we are the leader or we know we have heard from the leader recently. if n.state == Leader { return true @@ -1181,14 +1184,40 @@ func (n *raft) isCurrent() bool { if n.leader != noLeader && n.leader != n.id && n.catchup == nil { okInterval := int64(hbInterval) * 2 ts := time.Now().UnixNano() - if ps := n.peers[n.leader]; ps != nil && ps.ts > 0 && (ts-ps.ts) <= okInterval { - return true + if ps := n.peers[n.leader]; ps == nil || ps.ts == 0 && (ts-ps.ts) > okInterval { + n.debug("Not current, no recent leader contact") + return false } - n.debug("Not current, no recent leader contact") } if cs := n.catchup; cs != nil { n.debug("Not current, still catching up pindex=%d, cindex=%d", n.pindex, cs.cindex) } + + if n.commit == n.applied { + // At this point if we are current, we can return saying so. + return true + } else if !includeForwardProgress { + // Otherwise, if we aren't allowed to include forward progress + // (i.e. we are checking "current" instead of "healthy") then + // give up now. + return false + } + + // Otherwise, wait for a short period of time and see if we are making any + // forward progress. + if startDelta := n.commit - n.applied; startDelta > 0 { + for i := 0; i < 10; i++ { // 5ms, in 0.5ms increments + n.RUnlock() + time.Sleep(time.Millisecond / 2) + n.RLock() + if n.commit-n.applied < startDelta { + // The gap is getting smaller, so we're making forward progress. + return true + } + } + } + + n.warn("Falling behind in health check, commit %d != applied %d", n.commit, n.applied) return false } @@ -1199,7 +1228,17 @@ func (n *raft) Current() bool { } n.RLock() defer n.RUnlock() - return n.isCurrent() + return n.isCurrent(false) +} + +// Healthy returns if we are the leader for our group and nearly up-to-date. +func (n *raft) Healthy() bool { + if n == nil { + return false + } + n.RLock() + defer n.RUnlock() + return n.isCurrent(true) } // HadPreviousLeader indicates if this group ever had a leader.