From ff3f102cdd4344f047f36e9c01ac64a8e6a035de Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 2 Apr 2023 16:30:13 -0700 Subject: [PATCH] Fix for datarace in healthcheck Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 14 ++++++++++---- server/monitor.go | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7764619c..9c794c80 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -436,8 +436,11 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. -// Read lock should be held. -func (cc *jetStreamCluster) isStreamHealthy(account, stream string) bool { +func (js *jetStream) isStreamHealthy(account, stream string) bool { + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster + if cc == nil { // Non-clustered mode return true @@ -477,8 +480,11 @@ func (cc *jetStreamCluster) isStreamHealthy(account, stream string) bool { // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. -// Read lock should be held. -func (cc *jetStreamCluster) isConsumerCurrent(account, stream, consumer string) bool { +func (js *jetStream) isConsumerCurrent(account, stream, consumer string) bool { + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster + if cc == nil { // Non-clustered mode return true diff --git a/server/monitor.go b/server/monitor.go index 3b3488b0..624091ce 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3129,7 +3129,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { for stream, sa := range asa { if sa.Group.isMember(ourID) { // Make sure we can look up - if !cc.isStreamHealthy(acc, stream) { + if !js.isStreamHealthy(acc, stream) { health.Status = na health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream) return health @@ -3137,7 +3137,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // Now check consumers. for consumer, ca := range sa.consumers { if ca.Group.isMember(ourID) { - if !cc.isConsumerCurrent(acc, stream, consumer) { + if !js.isConsumerCurrent(acc, stream, consumer) { health.Status = na health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) return health