Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-04-07 05:32:05 -07:00
7 changed files with 111 additions and 67 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2013-2022 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -3142,33 +3142,54 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
// If they are assigned to this server check their status.
ourID := meta.ID()
// TODO(dlc) - Might be better here to not hold the lock the whole time.
// Copy the meta layer so we do not need to hold the js read lock for an extended period of time.
js.mu.RLock()
defer js.mu.RUnlock()
streams := make(map[string]map[string]*streamAssignment, len(cc.streams))
for acc, asa := range cc.streams {
nasa := make(map[string]*streamAssignment)
for stream, sa := range asa {
if sa.Group.isMember(ourID) {
// Make sure we can look up
if !js.isStreamHealthy(acc, stream) {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream)
return health
}
// Now check consumers.
csa := sa.copyGroup()
csa.consumers = make(map[string]*consumerAssignment)
for consumer, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
if !js.isConsumerCurrent(acc, stream, consumer) {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
return health
}
csa.consumers[consumer] = ca.copyGroup()
}
}
nasa[stream] = csa
}
}
streams[acc] = nasa
}
js.mu.RUnlock()
// Use our copy to traverse so we do not need to hold the js lock.
for accName, asa := range streams {
acc, err := s.LookupAccount(accName)
if err != nil && len(asa) > 0 {
health.Status = na
health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err)
return health
}
for stream, sa := range asa {
// Make sure we can look up
if !js.isStreamHealthy(acc, sa) {
health.Status = na
health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream)
return health
}
mset, _ := acc.lookupStream(stream)
// Now check consumers.
for consumer, ca := range sa.consumers {
if !js.isConsumerCurrent(mset, consumer, ca) {
health.Status = na
health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer)
return health
}
}
}
}
// Success.
return health
}