diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ac989c51..5a16ed11 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -436,11 +436,9 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. +// Read lock should be held. func (js *jetStream) isStreamHealthy(account, stream string) bool { - js.mu.RLock() - defer js.mu.RUnlock() cc := js.cluster - if cc == nil { // Non-clustered mode return true @@ -480,11 +478,9 @@ func (js *jetStream) isStreamHealthy(account, stream string) bool { // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. +// Read lock should be held. func (js *jetStream) isConsumerCurrent(account, stream, consumer string) bool { - js.mu.RLock() - defer js.mu.RUnlock() cc := js.cluster - if cc == nil { // Non-clustered mode return true diff --git a/server/monitor.go b/server/monitor.go index 624091ce..4adbffb8 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3125,6 +3125,11 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // Range across all accounts, the streams assigned to them, and the consumers. // If they are assigned to this server check their status. ourID := meta.ID() + + // TODO(dlc) - Might be better here to not hold the lock the whole time. + js.mu.RLock() + defer js.mu.RUnlock() + for acc, asa := range cc.streams { for stream, sa := range asa { if sa.Group.isMember(ourID) {