From dd735f4a18688833c79d7f9c6c99224d8a14aa9c Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 8 Dec 2021 18:21:51 -0500 Subject: [PATCH 1/2] Adding missing entry to stream/consumer list Signed-off-by: Matthias Hanel --- server/jetstream_api.go | 2 ++ server/jetstream_cluster.go | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index a41c64c1..d41d8b57 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -363,6 +363,7 @@ type JSApiStreamListResponse struct { ApiResponse ApiPaged Streams []*StreamInfo `json:"streams"` + Missing []string `json:"missing_stream_names,omitempty"` } const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response" @@ -560,6 +561,7 @@ type JSApiConsumerListResponse struct { ApiResponse ApiPaged Consumers []*ConsumerInfo `json:"consumers"` + Missing []string `json:"missing_consumer_names,omitempty"` } const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response" diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d0dbe93f..77d56622 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3973,15 +3973,19 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt s.mu.Unlock() }() + var missingNames []string + sent := map[string]struct{}{} // Send out our requests here. for _, sa := range streams { if s.allPeersOffline(sa.Group) { // Place offline onto our results by hand here. si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)} resp.Streams = append(resp.Streams, si) + missingNames = append(missingNames, sa.Config.Name) } else { isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name) s.sendInternalMsgLocked(isubj, inbox, nil, nil) + sent[sa.Config.Name] = struct{}{} } } // Don't hold lock. @@ -3998,9 +4002,13 @@ LOOP: return case <-notActive.C: s.Warnf("Did not receive all stream info results for %q", acc) - resp.Error = NewJSClusterIncompleteError() + for sName := range sent { + missingNames = append(missingNames, sName) + } + resp.Missing = missingNames break LOOP case si := <-rc: + delete(sent, si.Config.Name) resp.Streams = append(resp.Streams, si) // Check to see if we are done. if len(resp.Streams) == len(streams) { @@ -4104,14 +4112,18 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of s.mu.Unlock() }() + var missingNames []string + sent := map[string]struct{}{} for _, ca := range consumers { if s.allPeersOffline(ca.Group) { // Place offline onto our results by hand here. ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)} resp.Consumers = append(resp.Consumers, ci) + missingNames = append(missingNames, ci.Name) } else { isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name) s.sendInternalMsgLocked(isubj, inbox, nil, nil) + sent[ca.Name] = struct{}{} } } js.mu.Unlock() @@ -4127,9 +4139,13 @@ LOOP: return case <-notActive.C: s.Warnf("Did not receive all consumer info results for %q", acc) - resp.Error = NewJSClusterIncompleteError() + for cName := range sent { + missingNames = append(missingNames, cName) + } + resp.Missing = missingNames break LOOP case ci := <-rc: + delete(sent, ci.Name) resp.Consumers = append(resp.Consumers, ci) // Check to see if we are done. if len(resp.Consumers) == len(consumers) { From 0ba2544c5aae61b2cbddacbc811b2e9d2eb74f70 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 8 Dec 2021 19:33:35 -0500 Subject: [PATCH 2/2] removed suffix from "missing" list Signed-off-by: Matthias Hanel --- server/jetstream_api.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d41d8b57..3e6471fb 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -363,7 +363,7 @@ type JSApiStreamListResponse struct { ApiResponse ApiPaged Streams []*StreamInfo `json:"streams"` - Missing []string `json:"missing_stream_names,omitempty"` + Missing []string `json:"missing,omitempty"` } const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response" @@ -561,7 +561,7 @@ type JSApiConsumerListResponse struct { ApiResponse ApiPaged Consumers []*ConsumerInfo `json:"consumers"` - Missing []string `json:"missing_consumer_names,omitempty"` + Missing []string `json:"missing,omitempty"` } const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"