mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Only need server's rlock here.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -784,9 +784,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {
|
||||
// JetStreamEnabled reports if jetstream is enabled for this server.
|
||||
func (s *Server) JetStreamEnabled() bool {
|
||||
var js *jetStream
|
||||
s.mu.Lock()
|
||||
s.mu.RLock()
|
||||
js = s.js
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
return js.isEnabled()
|
||||
}
|
||||
|
||||
|
||||
@@ -1591,9 +1591,14 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
|
||||
// Request for the list of all detailed stream info.
|
||||
// TODO(dlc) - combine with above long term
|
||||
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
|
||||
if c == nil || !s.JetStreamEnabled() {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
js, cc := s.getJetStreamCluster()
|
||||
if js == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
|
||||
if err != nil {
|
||||
s.Warnf(badAPIRequestT, msg)
|
||||
@@ -1606,18 +1611,15 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
|
||||
}
|
||||
|
||||
// Determine if we should proceed here when we are in clustered mode.
|
||||
if s.JetStreamIsClustered() {
|
||||
js, cc := s.getJetStreamCluster()
|
||||
if js == nil || cc == nil {
|
||||
return
|
||||
}
|
||||
if js.isLeaderless() {
|
||||
if cc != nil {
|
||||
leader, leaderless := js.leaderStatus()
|
||||
if leaderless {
|
||||
resp.Error = NewJSClusterNotAvailError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
// Make sure we are meta leader.
|
||||
if !s.JetStreamIsLeader() {
|
||||
if !leader {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -1647,7 +1649,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
|
||||
}
|
||||
|
||||
// Clustered mode will invoke a scatter and gather.
|
||||
if s.JetStreamIsClustered() {
|
||||
if cc != nil {
|
||||
// Need to copy these off before sending.. don't move this inside startGoRoutine!!!
|
||||
msg = copyBytes(msg)
|
||||
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
|
||||
|
||||
@@ -206,11 +206,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
|
||||
}
|
||||
|
||||
// Only set once, do not need a lock.
|
||||
cc := js.cluster
|
||||
if cc == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return js, cc
|
||||
return js, js.cluster
|
||||
}
|
||||
|
||||
func (s *Server) JetStreamIsClustered() bool {
|
||||
@@ -717,6 +713,27 @@ func (js *jetStream) server() *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// Returns leader status, both whether or not we are the leader and if the group is leaderless.
|
||||
func (js *jetStream) leaderStatus() (bool, bool) {
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
|
||||
cc := js.cluster
|
||||
if cc == nil || cc.meta == nil {
|
||||
return false, true
|
||||
}
|
||||
isLeader := cc.meta.Leader()
|
||||
if isLeader {
|
||||
return true, false
|
||||
}
|
||||
// If we don't have a leader.
|
||||
// Make sure we have been running for enough time.
|
||||
if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault {
|
||||
return false, true
|
||||
}
|
||||
return false, false
|
||||
}
|
||||
|
||||
// Will respond if we do not think we have a metacontroller leader.
|
||||
func (js *jetStream) isLeaderless() bool {
|
||||
js.mu.RLock()
|
||||
|
||||
Reference in New Issue
Block a user