From d1d2d5b24e8d24ba147b858dafa8f926acab8147 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 23 Jan 2021 10:05:11 -0800 Subject: [PATCH] Fix for consumer names list in clustered mode Signed-off-by: Derek Collison --- server/jetstream_api.go | 80 +++++++++++++++++++++++--------- test/jetstream_cluster_test.go | 83 ++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 22 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d77055ee..d2fc9be2 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -937,7 +937,6 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep // TODO(dlc) - If this list is long maybe do this in a Go routine? var numStreams int if s.JetStreamIsClustered() { - // FIXME(dlc) - Do this or scatter/gather? js, cc := s.getJetStreamCluster() if js == nil || cc == nil { // TODO(dlc) - Debug or Warn? @@ -1854,7 +1853,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s // Request for the list of all consumer names. func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil { + if c == nil || !s.JetStreamIsLeader() { return } ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) @@ -1867,7 +1866,6 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType}, Consumers: []string{}, } - if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1886,30 +1884,68 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r } streamName := streamNameFromSubject(subject) - mset, err := acc.LookupStream(streamName) - if err != nil { - resp.Error = jsNotFoundError(err) - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } + var numConsumers int - obs := mset.Consumers() - sort.Slice(obs, func(i, j int) bool { - return strings.Compare(obs[i].name, obs[j].name) < 0 - }) + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + // TODO(dlc) - Debug or Warn? + return + } + js.mu.RLock() + sas := cc.streams[acc.Name] + if sas == nil { + js.mu.RUnlock() + resp.Error = jsNotFoundError(ErrJetStreamNotEnabled) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + sa := sas[streamName] + if sa == nil || sa.err != nil { + js.mu.RUnlock() + resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + for consumer := range sa.consumers { + resp.Consumers = append(resp.Consumers, consumer) + } + if len(resp.Consumers) > 1 { + sort.Slice(resp.Consumers, func(i, j int) bool { return strings.Compare(resp.Consumers[i], resp.Consumers[j]) < 0 }) + } + numConsumers = len(resp.Consumers) + if offset > numConsumers { + offset = numConsumers + resp.Consumers = resp.Consumers[:offset] + } + js.mu.RUnlock() - ocnt := len(obs) - if offset > ocnt { - offset = ocnt - } + } else { + mset, err := acc.LookupStream(streamName) + if err != nil { + resp.Error = jsNotFoundError(err) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } - for _, o := range obs[offset:] { - resp.Consumers = append(resp.Consumers, o.Name()) - if len(resp.Consumers) >= JSApiNamesLimit { - break + obs := mset.Consumers() + sort.Slice(obs, func(i, j int) bool { + return strings.Compare(obs[i].name, obs[j].name) < 0 + }) + + numConsumers = len(obs) + if offset > numConsumers { + offset = numConsumers + } + + for _, o := range obs[offset:] { + resp.Consumers = append(resp.Consumers, o.Name()) + if len(resp.Consumers) >= JSApiNamesLimit { + break + } } } - resp.Total = ocnt + resp.Total = numConsumers resp.Limit = JSApiNamesLimit resp.Offset = offset s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 14a76750..02bed003 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -1434,6 +1434,89 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) { } } +func TestJetStreamClusterExtendedStreamInfoSingleReplica(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + toSend := 50 + for i := 0; i < toSend; i++ { + if _, err = js.Publish("foo", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + leader := c.streamLeader("$G", "TEST").Name() + + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.Cluster == nil { + t.Fatalf("Expected cluster info") + } + if si.Cluster.Name != c.name { + t.Fatalf("Expected cluster name of %q, got %q", c.name, si.Cluster.Name) + } + if si.Cluster.Leader != leader { + t.Fatalf("Expected leader of %q, got %q", leader, si.Cluster.Leader) + } + if len(si.Cluster.Replicas) != 0 { + t.Fatalf("Expected no replicas but got %d", len(si.Cluster.Replicas)) + } + + // Make sure we can grab consumer lists from any server. + cl := js.NewConsumerLister("TEST") + if !cl.Next() { + t.Fatalf("Unexpected error: %v", cl.Err()) + } + p := cl.Page() + if len(p) != 0 { + t.Fatalf("ConsumerInfo expected no paged results, got %d", len(p)) + } + + // Now add in a consumer. + cfg := &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy} + if _, err := js.AddConsumer("TEST", cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + cl = js.NewConsumerLister("TEST") + if !cl.Next() { + t.Fatalf("Unexpected error: %v", cl.Err()) + } + p = cl.Page() + if len(p) != 1 { + t.Fatalf("ConsumerInfo expected 1 result, got %d", len(p)) + } + + // Now do direct names list as well. + resp, err := nc.Request(fmt.Sprintf(server.JSApiConsumersT, "TEST"), nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var clResponse server.JSApiConsumerNamesResponse + if err = json.Unmarshal(resp.Data, &clResponse); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(clResponse.Consumers) != 1 { + t.Fatalf("Expected only 1 consumer but got %d", len(clResponse.Consumers)) + } + +} + func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()