mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Under certain scenarios we have witnessed healthz() that never retrun healthy due to a stream or consumer being missing or stopped.
This will now allow the healthy call to attempt to restart those assets. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -434,24 +434,89 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Restart the stream in question.
|
||||
// Should only be called when the stream is know in a bad state.
|
||||
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
|
||||
js.mu.Lock()
|
||||
cc := js.cluster
|
||||
if cc == nil {
|
||||
js.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy.
|
||||
asa := cc.streams[acc.Name]
|
||||
if asa == nil {
|
||||
js.mu.Unlock()
|
||||
return
|
||||
}
|
||||
sa := asa[csa.Config.Name]
|
||||
if sa == nil {
|
||||
js.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Make sure to clear out the raft node if still present in the meta layer.
|
||||
if rg := sa.Group; rg != nil && rg.node != nil {
|
||||
rg.node = nil
|
||||
}
|
||||
js.mu.Unlock()
|
||||
|
||||
// Process stream assignment to recreate.
|
||||
js.processStreamAssignment(sa)
|
||||
|
||||
// If we had consumers assigned to this server they will be present in the copy, csa.
|
||||
// They also need to be processed. The csa consumers is a copy of only our consumers,
|
||||
// those assigned to us, but the consumer assignment's there are direct from the meta
|
||||
// layer to make this part much easier and avoid excessive lookups.
|
||||
for _, cca := range csa.consumers {
|
||||
if cca.deleted {
|
||||
continue
|
||||
}
|
||||
// Need to look up original as well here to make sure node is nil.
|
||||
js.mu.Lock()
|
||||
ca := sa.consumers[cca.Name]
|
||||
if ca != nil && ca.Group != nil {
|
||||
// Make sure node is wiped.
|
||||
ca.Group.node = nil
|
||||
}
|
||||
js.mu.Unlock()
|
||||
if ca != nil {
|
||||
js.processConsumerAssignment(ca)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// isStreamHealthy will determine if the stream is up to date or very close.
|
||||
// For R1 it will make sure the stream is present on this server.
|
||||
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
|
||||
js.mu.Lock()
|
||||
cc := js.cluster
|
||||
if cc == nil {
|
||||
// Non-clustered mode
|
||||
js.mu.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
// Pull the group out.
|
||||
rg := sa.Group
|
||||
if rg == nil {
|
||||
js.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
if rg := sa.Group; rg != nil && (rg.node == nil || rg.node.Healthy()) {
|
||||
|
||||
streamName := sa.Config.Name
|
||||
node := rg.node
|
||||
js.mu.Unlock()
|
||||
|
||||
// First lookup stream and make sure its there.
|
||||
mset, err := acc.lookupStream(streamName)
|
||||
if err != nil {
|
||||
js.restartStream(acc, sa)
|
||||
return false
|
||||
}
|
||||
|
||||
if node == nil || node.Healthy() {
|
||||
// Check if we are processing a snapshot and are catching up.
|
||||
if mset, err := acc.lookupStream(sa.Config.Name); err == nil && !mset.isCatchingUp() {
|
||||
if !mset.isCatchingUp() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@@ -460,23 +525,35 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
|
||||
|
||||
// isConsumerCurrent will determine if the consumer is up to date.
|
||||
// For R1 it will make sure the consunmer is present on this server.
|
||||
func (js *jetStream) isConsumerCurrent(mset *stream, consumer string, ca *consumerAssignment) bool {
|
||||
func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) bool {
|
||||
if mset == nil {
|
||||
return false
|
||||
}
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
|
||||
cc := js.cluster
|
||||
js.mu.RUnlock()
|
||||
|
||||
if cc == nil {
|
||||
// Non-clustered mode
|
||||
return true
|
||||
}
|
||||
o := mset.lookupConsumer(consumer)
|
||||
if o == nil {
|
||||
js.mu.Lock()
|
||||
if ca.Group != nil {
|
||||
ca.Group.node = nil
|
||||
}
|
||||
deleted := ca.deleted
|
||||
js.mu.Unlock()
|
||||
if !deleted {
|
||||
js.processConsumerAssignment(ca)
|
||||
}
|
||||
return false
|
||||
}
|
||||
if n := o.raftNode(); n != nil && !n.Current() {
|
||||
return false
|
||||
if node := o.raftNode(); node == nil || node.Healthy() {
|
||||
return true
|
||||
}
|
||||
return true
|
||||
return false
|
||||
}
|
||||
|
||||
// subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap
|
||||
|
||||
@@ -3763,3 +3763,74 @@ func TestJetStreamClusterConsumerInfoForJszForFollowers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Under certain scenarios we have seen consumers become stopped and cause healthz to fail.
|
||||
// The specific scneario is heavy loads, and stream resets on upgrades that could orphan consumers.
|
||||
func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "NATS", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
sendStreamMsg(t, nc, "foo", "HELLO")
|
||||
}
|
||||
|
||||
sub, err := js.PullSubscribe("foo", "d")
|
||||
require_NoError(t, err)
|
||||
|
||||
fetch, ack := 122, 22
|
||||
msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second))
|
||||
require_NoError(t, err)
|
||||
require_True(t, len(msgs) == fetch)
|
||||
for _, m := range msgs[:ack] {
|
||||
m.AckSync()
|
||||
}
|
||||
// Let acks propagate.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// We will now stop a stream on a given server.
|
||||
s := c.randomServer()
|
||||
mset, err := s.GlobalAccount().lookupStream("TEST")
|
||||
require_NoError(t, err)
|
||||
// Stop the stream
|
||||
mset.stop(false, false)
|
||||
|
||||
// Wait for exit.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
|
||||
hs := s.healthz(nil)
|
||||
if hs.Error != _EMPTY_ {
|
||||
return errors.New(hs.Error)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Now take out the consumer.
|
||||
mset, err = s.GlobalAccount().lookupStream("TEST")
|
||||
require_NoError(t, err)
|
||||
|
||||
o := mset.lookupConsumer("d")
|
||||
require_NotNil(t, o)
|
||||
|
||||
o.stop()
|
||||
// Wait for exit.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
|
||||
hs := s.healthz(nil)
|
||||
if hs.Error != _EMPTY_ {
|
||||
return errors.New(hs.Error)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3144,7 +3144,8 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
|
||||
csa.consumers = make(map[string]*consumerAssignment)
|
||||
for consumer, ca := range sa.consumers {
|
||||
if ca.Group.isMember(ourID) {
|
||||
csa.consumers[consumer] = ca.copyGroup()
|
||||
// Use original here. Not a copy.
|
||||
csa.consumers[consumer] = ca
|
||||
}
|
||||
}
|
||||
nasa[stream] = csa
|
||||
@@ -3173,7 +3174,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
|
||||
mset, _ := acc.lookupStream(stream)
|
||||
// Now check consumers.
|
||||
for consumer, ca := range sa.consumers {
|
||||
if !js.isConsumerCurrent(mset, consumer, ca) {
|
||||
if !js.isConsumerHealthy(mset, consumer, ca) {
|
||||
health.Status = na
|
||||
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
|
||||
return health
|
||||
|
||||
Reference in New Issue
Block a user