From bee149b45805cffda219ca1c85fa37031bea2d2c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 27 Feb 2023 17:49:00 -0800 Subject: [PATCH] Only need server's rlock here. Signed-off-by: Derek Collison --- server/jetstream.go | 4 ++-- server/jetstream_api.go | 20 +++++++++++--------- server/jetstream_cluster.go | 27 ++++++++++++++++++++++----- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 78f4e00c..fd7926f0 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -784,9 +784,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { // JetStreamEnabled reports if jetstream is enabled for this server. func (s *Server) JetStreamEnabled() bool { var js *jetStream - s.mu.Lock() + s.mu.RLock() js = s.js - s.mu.Unlock() + s.mu.RUnlock() return js.isEnabled() } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index cfe06442..890b41a3 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1591,9 +1591,14 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, // Request for the list of all detailed stream info. // TODO(dlc) - combine with above long term func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamEnabled() { + if c == nil { return } + js, cc := s.getJetStreamCluster() + if js == nil { + return + } + ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) if err != nil { s.Warnf(badAPIRequestT, msg) @@ -1606,18 +1611,15 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } // Determine if we should proceed here when we are in clustered mode. - if s.JetStreamIsClustered() { - js, cc := s.getJetStreamCluster() - if js == nil || cc == nil { - return - } - if js.isLeaderless() { + if cc != nil { + leader, leaderless := js.leaderStatus() + if leaderless { resp.Error = NewJSClusterNotAvailError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } // Make sure we are meta leader. - if !s.JetStreamIsLeader() { + if !leader { return } } @@ -1647,7 +1649,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } // Clustered mode will invoke a scatter and gather. - if s.JetStreamIsClustered() { + if cc != nil { // Need to copy these off before sending.. don't move this inside startGoRoutine!!! msg = copyBytes(msg) s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) }) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 268030a3..57dd718e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -206,11 +206,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { } // Only set once, do not need a lock. - cc := js.cluster - if cc == nil { - return nil, nil - } - return js, cc + return js, js.cluster } func (s *Server) JetStreamIsClustered() bool { @@ -717,6 +713,27 @@ func (js *jetStream) server() *Server { return s } +// Returns leader status, both whether or not we are the leader and if the group is leaderless. +func (js *jetStream) leaderStatus() (bool, bool) { + js.mu.RLock() + defer js.mu.RUnlock() + + cc := js.cluster + if cc == nil || cc.meta == nil { + return false, true + } + isLeader := cc.meta.Leader() + if isLeader { + return true, false + } + // If we don't have a leader. + // Make sure we have been running for enough time. + if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault { + return false, true + } + return false, false +} + // Will respond if we do not think we have a metacontroller leader. func (js *jetStream) isLeaderless() bool { js.mu.RLock()