diff --git a/server/jetstream_api.go b/server/jetstream_api.go index a41c64c1..3e6471fb 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -363,6 +363,7 @@ type JSApiStreamListResponse struct { ApiResponse ApiPaged Streams []*StreamInfo `json:"streams"` + Missing []string `json:"missing,omitempty"` } const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response" @@ -560,6 +561,7 @@ type JSApiConsumerListResponse struct { ApiResponse ApiPaged Consumers []*ConsumerInfo `json:"consumers"` + Missing []string `json:"missing,omitempty"` } const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response" diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d0dbe93f..77d56622 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3973,15 +3973,19 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt s.mu.Unlock() }() + var missingNames []string + sent := map[string]struct{}{} // Send out our requests here. for _, sa := range streams { if s.allPeersOffline(sa.Group) { // Place offline onto our results by hand here. si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)} resp.Streams = append(resp.Streams, si) + missingNames = append(missingNames, sa.Config.Name) } else { isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name) s.sendInternalMsgLocked(isubj, inbox, nil, nil) + sent[sa.Config.Name] = struct{}{} } } // Don't hold lock. @@ -3998,9 +4002,13 @@ LOOP: return case <-notActive.C: s.Warnf("Did not receive all stream info results for %q", acc) - resp.Error = NewJSClusterIncompleteError() + for sName := range sent { + missingNames = append(missingNames, sName) + } + resp.Missing = missingNames break LOOP case si := <-rc: + delete(sent, si.Config.Name) resp.Streams = append(resp.Streams, si) // Check to see if we are done. if len(resp.Streams) == len(streams) { @@ -4104,14 +4112,18 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of s.mu.Unlock() }() + var missingNames []string + sent := map[string]struct{}{} for _, ca := range consumers { if s.allPeersOffline(ca.Group) { // Place offline onto our results by hand here. ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)} resp.Consumers = append(resp.Consumers, ci) + missingNames = append(missingNames, ci.Name) } else { isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name) s.sendInternalMsgLocked(isubj, inbox, nil, nil) + sent[ca.Name] = struct{}{} } } js.mu.Unlock() @@ -4127,9 +4139,13 @@ LOOP: return case <-notActive.C: s.Warnf("Did not receive all consumer info results for %q", acc) - resp.Error = NewJSClusterIncompleteError() + for cName := range sent { + missingNames = append(missingNames, cName) + } + resp.Missing = missingNames break LOOP case ci := <-rc: + delete(sent, ci.Name) resp.Consumers = append(resp.Consumers, ci) // Check to see if we are done. if len(resp.Consumers) == len(consumers) {