mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
Do not request streaminfo from streams that are completely offline.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3371,6 +3371,21 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r
|
||||
cc.meta.Propose(encodeAddStreamAssignment(sa))
|
||||
}
|
||||
|
||||
func (s *Server) allPeersOffline(rg *raftGroup) bool {
|
||||
if rg == nil {
|
||||
return false
|
||||
}
|
||||
// Check to see if this stream has any servers online to respond.
|
||||
for _, peer := range rg.Peers {
|
||||
if si, ok := s.nodeToInfo.Load(peer); ok && si != nil {
|
||||
if !si.(nodeInfo).offline {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader.
|
||||
// This will be running in a separate Go routine.
|
||||
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) {
|
||||
@@ -3387,7 +3402,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs
|
||||
for _, sa := range cc.streams[acc.Name] {
|
||||
streams = append(streams, sa)
|
||||
}
|
||||
// Needs to be sorted.
|
||||
// Needs to be sorted for offsets etc.
|
||||
if len(streams) > 1 {
|
||||
sort.Slice(streams, func(i, j int) bool {
|
||||
return strings.Compare(streams[i].Config.Name, streams[j].Config.Name) < 0
|
||||
@@ -3449,8 +3464,14 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs
|
||||
|
||||
// Send out our requests here.
|
||||
for _, sa := range streams {
|
||||
isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
if s.allPeersOffline(sa.Group) {
|
||||
// Place offline onto our results by hand here.
|
||||
si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)}
|
||||
resp.Streams = append(resp.Streams, si)
|
||||
} else {
|
||||
isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
}
|
||||
}
|
||||
// Don't hold lock.
|
||||
js.mu.Unlock()
|
||||
@@ -3573,8 +3594,14 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
|
||||
}()
|
||||
|
||||
for _, ca := range consumers {
|
||||
isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
if s.allPeersOffline(ca.Group) {
|
||||
// Place offline onto our results by hand here.
|
||||
ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)}
|
||||
resp.Consumers = append(resp.Consumers, ci)
|
||||
} else {
|
||||
isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
}
|
||||
}
|
||||
js.mu.Unlock()
|
||||
|
||||
@@ -4340,6 +4367,21 @@ func (mset *stream) handleClusterSyncRequest(sub *subscription, c *client, subje
|
||||
mset.srv.startGoRoutine(func() { mset.runCatchup(reply, &sreq) })
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (js *jetStream) offlineClusterInfo(rg *raftGroup) *ClusterInfo {
|
||||
s := js.srv
|
||||
|
||||
ci := &ClusterInfo{Name: s.ClusterName()}
|
||||
for _, peer := range rg.Peers {
|
||||
if sir, ok := s.nodeToInfo.Load(peer); ok && sir != nil {
|
||||
si := sir.(nodeInfo)
|
||||
pi := &PeerInfo{Name: si.name, Current: false, Offline: true}
|
||||
ci.Replicas = append(ci.Replicas, pi)
|
||||
}
|
||||
}
|
||||
return ci
|
||||
}
|
||||
|
||||
// clusterInfo will report on the status of the raft group.
|
||||
func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
|
||||
if js == nil {
|
||||
@@ -4347,8 +4389,8 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
|
||||
}
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
s := js.srv
|
||||
|
||||
s := js.srv
|
||||
if rg == nil || rg.node == nil {
|
||||
return &ClusterInfo{
|
||||
Name: s.ClusterName(),
|
||||
|
||||
@@ -1497,6 +1497,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
}
|
||||
if si.sub != nil {
|
||||
mset.unsubscribe(si.sub)
|
||||
si.sub = nil
|
||||
}
|
||||
// Need to delete the old one.
|
||||
mset.removeInternalConsumer(si)
|
||||
|
||||
Reference in New Issue
Block a user