mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2739 from nats-io/list-missing
Adding missing entry to stream/consumer list
This commit is contained in:
@@ -363,6 +363,7 @@ type JSApiStreamListResponse struct {
|
||||
ApiResponse
|
||||
ApiPaged
|
||||
Streams []*StreamInfo `json:"streams"`
|
||||
Missing []string `json:"missing,omitempty"`
|
||||
}
|
||||
|
||||
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
|
||||
@@ -560,6 +561,7 @@ type JSApiConsumerListResponse struct {
|
||||
ApiResponse
|
||||
ApiPaged
|
||||
Consumers []*ConsumerInfo `json:"consumers"`
|
||||
Missing []string `json:"missing,omitempty"`
|
||||
}
|
||||
|
||||
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
|
||||
|
||||
@@ -3973,15 +3973,19 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
|
||||
var missingNames []string
|
||||
sent := map[string]struct{}{}
|
||||
// Send out our requests here.
|
||||
for _, sa := range streams {
|
||||
if s.allPeersOffline(sa.Group) {
|
||||
// Place offline onto our results by hand here.
|
||||
si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)}
|
||||
resp.Streams = append(resp.Streams, si)
|
||||
missingNames = append(missingNames, sa.Config.Name)
|
||||
} else {
|
||||
isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
sent[sa.Config.Name] = struct{}{}
|
||||
}
|
||||
}
|
||||
// Don't hold lock.
|
||||
@@ -3998,9 +4002,13 @@ LOOP:
|
||||
return
|
||||
case <-notActive.C:
|
||||
s.Warnf("Did not receive all stream info results for %q", acc)
|
||||
resp.Error = NewJSClusterIncompleteError()
|
||||
for sName := range sent {
|
||||
missingNames = append(missingNames, sName)
|
||||
}
|
||||
resp.Missing = missingNames
|
||||
break LOOP
|
||||
case si := <-rc:
|
||||
delete(sent, si.Config.Name)
|
||||
resp.Streams = append(resp.Streams, si)
|
||||
// Check to see if we are done.
|
||||
if len(resp.Streams) == len(streams) {
|
||||
@@ -4104,14 +4112,18 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
|
||||
var missingNames []string
|
||||
sent := map[string]struct{}{}
|
||||
for _, ca := range consumers {
|
||||
if s.allPeersOffline(ca.Group) {
|
||||
// Place offline onto our results by hand here.
|
||||
ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)}
|
||||
resp.Consumers = append(resp.Consumers, ci)
|
||||
missingNames = append(missingNames, ci.Name)
|
||||
} else {
|
||||
isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
sent[ca.Name] = struct{}{}
|
||||
}
|
||||
}
|
||||
js.mu.Unlock()
|
||||
@@ -4127,9 +4139,13 @@ LOOP:
|
||||
return
|
||||
case <-notActive.C:
|
||||
s.Warnf("Did not receive all consumer info results for %q", acc)
|
||||
resp.Error = NewJSClusterIncompleteError()
|
||||
for cName := range sent {
|
||||
missingNames = append(missingNames, cName)
|
||||
}
|
||||
resp.Missing = missingNames
|
||||
break LOOP
|
||||
case ci := <-rc:
|
||||
delete(sent, ci.Name)
|
||||
resp.Consumers = append(resp.Consumers, ci)
|
||||
// Check to see if we are done.
|
||||
if len(resp.Consumers) == len(consumers) {
|
||||
|
||||
Reference in New Issue
Block a user