diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 33a953da..7a3b9203 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -4164,9 +4164,20 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, js.mu.RLock() isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName) ourID := cc.meta.ID() - var offline bool + var rg *raftGroup + var offline, isMember bool if ca != nil { - offline = s.allPeersOffline(ca.Group) + if rg = ca.Group; rg != nil { + offline = s.allPeersOffline(rg) + isMember = rg.isMember(ourID) + } + } + // Capture consumer leader here. + isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName) + // Also capture if we think there is no meta leader. + var isLeaderLess bool + if !isLeader { + isLeaderLess = cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault } js.mu.RUnlock() @@ -4189,7 +4200,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } else if ca == nil { - if js.isLeaderless() { + if isLeaderLess { resp.Error = NewJSClusterNotAvailError() // Delaying an error response gives the leader a chance to respond before us s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil) @@ -4202,38 +4213,35 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, } // Check to see if we are a member of the group and if the group has no leader. - if js.isGroupLeaderless(ca.Group) { + if isMember && js.isGroupLeaderless(ca.Group) { resp.Error = NewJSClusterNotAvailError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } // We have the consumer assigned and a leader, so only the consumer leader should answer. - if !acc.JetStreamIsConsumerLeader(streamName, consumerName) { - if js.isLeaderless() { + if !isConsumerLeader { + if isLeaderLess { resp.Error = NewJSClusterNotAvailError() // Delaying an error response gives the leader a chance to respond before us s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group) return } + + var node RaftNode + var leaderNotPartOfGroup bool + // We have a consumer assignment. - js.mu.RLock() - var ( - node RaftNode - leaderNotPartOfGroup bool - isMember bool - ) - rg := ca.Group - if rg != nil && rg.isMember(ourID) { - isMember = true + if isMember { + js.mu.RLock() if rg.node != nil { node = rg.node if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) { leaderNotPartOfGroup = true } } + js.mu.RUnlock() } - js.mu.RUnlock() // Check if we should ignore all together. if node == nil {