From a627db9fc832c2abdcfdebdb6f38f7ee7d209a94 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 24 Mar 2021 10:26:09 -0700 Subject: [PATCH] Do not request streaminfo from streams that are completely offline. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 54 ++++++++++++++++++++++++++++++++----- server/stream.go | 1 + 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3cdb5758..f512511d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3371,6 +3371,21 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r cc.meta.Propose(encodeAddStreamAssignment(sa)) } +func (s *Server) allPeersOffline(rg *raftGroup) bool { + if rg == nil { + return false + } + // Check to see if this stream has any servers online to respond. + for _, peer := range rg.Peers { + if si, ok := s.nodeToInfo.Load(peer); ok && si != nil { + if !si.(nodeInfo).offline { + return false + } + } + } + return true +} + // This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader. // This will be running in a separate Go routine. func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) { @@ -3387,7 +3402,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs for _, sa := range cc.streams[acc.Name] { streams = append(streams, sa) } - // Needs to be sorted. + // Needs to be sorted for offsets etc. if len(streams) > 1 { sort.Slice(streams, func(i, j int) bool { return strings.Compare(streams[i].Config.Name, streams[j].Config.Name) < 0 @@ -3449,8 +3464,14 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs // Send out our requests here. for _, sa := range streams { - isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name) - s.sendInternalMsgLocked(isubj, inbox, nil, nil) + if s.allPeersOffline(sa.Group) { + // Place offline onto our results by hand here. + si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)} + resp.Streams = append(resp.Streams, si) + } else { + isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name) + s.sendInternalMsgLocked(isubj, inbox, nil, nil) + } } // Don't hold lock. js.mu.Unlock() @@ -3573,8 +3594,14 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of }() for _, ca := range consumers { - isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name) - s.sendInternalMsgLocked(isubj, inbox, nil, nil) + if s.allPeersOffline(ca.Group) { + // Place offline onto our results by hand here. + ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)} + resp.Consumers = append(resp.Consumers, ci) + } else { + isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name) + s.sendInternalMsgLocked(isubj, inbox, nil, nil) + } } js.mu.Unlock() @@ -4340,6 +4367,21 @@ func (mset *stream) handleClusterSyncRequest(sub *subscription, c *client, subje mset.srv.startGoRoutine(func() { mset.runCatchup(reply, &sreq) }) } +// Lock should be held. +func (js *jetStream) offlineClusterInfo(rg *raftGroup) *ClusterInfo { + s := js.srv + + ci := &ClusterInfo{Name: s.ClusterName()} + for _, peer := range rg.Peers { + if sir, ok := s.nodeToInfo.Load(peer); ok && sir != nil { + si := sir.(nodeInfo) + pi := &PeerInfo{Name: si.name, Current: false, Offline: true} + ci.Replicas = append(ci.Replicas, pi) + } + } + return ci +} + // clusterInfo will report on the status of the raft group. func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { if js == nil { @@ -4347,8 +4389,8 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { } js.mu.RLock() defer js.mu.RUnlock() - s := js.srv + s := js.srv if rg == nil || rg.node == nil { return &ClusterInfo{ Name: s.ClusterName(), diff --git a/server/stream.go b/server/stream.go index 90cfd8eb..f351236f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1497,6 +1497,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { } if si.sub != nil { mset.unsubscribe(si.sub) + si.sub = nil } // Need to delete the old one. mset.removeInternalConsumer(si)