Fix for consumer names list in clustered mode

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-23 10:05:11 -08:00
parent cb2433c2fc
commit d1d2d5b24e
2 changed files with 141 additions and 22 deletions

View File

@@ -937,7 +937,6 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
// TODO(dlc) - If this list is long maybe do this in a Go routine?
var numStreams int
if s.JetStreamIsClustered() {
// FIXME(dlc) - Do this or scatter/gather?
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
// TODO(dlc) - Debug or Warn?
@@ -1854,7 +1853,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
// Request for the list of all consumer names.
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamIsLeader() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -1867,7 +1866,6 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
Consumers: []string{},
}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -1886,30 +1884,68 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
}
streamName := streamNameFromSubject(subject)
mset, err := acc.LookupStream(streamName)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var numConsumers int
obs := mset.Consumers()
sort.Slice(obs, func(i, j int) bool {
return strings.Compare(obs[i].name, obs[j].name) < 0
})
if s.JetStreamIsClustered() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
// TODO(dlc) - Debug or Warn?
return
}
js.mu.RLock()
sas := cc.streams[acc.Name]
if sas == nil {
js.mu.RUnlock()
resp.Error = jsNotFoundError(ErrJetStreamNotEnabled)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
sa := sas[streamName]
if sa == nil || sa.err != nil {
js.mu.RUnlock()
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
for consumer := range sa.consumers {
resp.Consumers = append(resp.Consumers, consumer)
}
if len(resp.Consumers) > 1 {
sort.Slice(resp.Consumers, func(i, j int) bool { return strings.Compare(resp.Consumers[i], resp.Consumers[j]) < 0 })
}
numConsumers = len(resp.Consumers)
if offset > numConsumers {
offset = numConsumers
resp.Consumers = resp.Consumers[:offset]
}
js.mu.RUnlock()
ocnt := len(obs)
if offset > ocnt {
offset = ocnt
}
} else {
mset, err := acc.LookupStream(streamName)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
for _, o := range obs[offset:] {
resp.Consumers = append(resp.Consumers, o.Name())
if len(resp.Consumers) >= JSApiNamesLimit {
break
obs := mset.Consumers()
sort.Slice(obs, func(i, j int) bool {
return strings.Compare(obs[i].name, obs[j].name) < 0
})
numConsumers = len(obs)
if offset > numConsumers {
offset = numConsumers
}
for _, o := range obs[offset:] {
resp.Consumers = append(resp.Consumers, o.Name())
if len(resp.Consumers) >= JSApiNamesLimit {
break
}
}
}
resp.Total = ocnt
resp.Total = numConsumers
resp.Limit = JSApiNamesLimit
resp.Offset = offset
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))

View File

@@ -1434,6 +1434,89 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
}
}
func TestJetStreamClusterExtendedStreamInfoSingleReplica(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
toSend := 50
for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
leader := c.streamLeader("$G", "TEST").Name()
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster == nil {
t.Fatalf("Expected cluster info")
}
if si.Cluster.Name != c.name {
t.Fatalf("Expected cluster name of %q, got %q", c.name, si.Cluster.Name)
}
if si.Cluster.Leader != leader {
t.Fatalf("Expected leader of %q, got %q", leader, si.Cluster.Leader)
}
if len(si.Cluster.Replicas) != 0 {
t.Fatalf("Expected no replicas but got %d", len(si.Cluster.Replicas))
}
// Make sure we can grab consumer lists from any server.
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
t.Fatalf("Unexpected error: %v", cl.Err())
}
p := cl.Page()
if len(p) != 0 {
t.Fatalf("ConsumerInfo expected no paged results, got %d", len(p))
}
// Now add in a consumer.
cfg := &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}
if _, err := js.AddConsumer("TEST", cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cl = js.NewConsumerLister("TEST")
if !cl.Next() {
t.Fatalf("Unexpected error: %v", cl.Err())
}
p = cl.Page()
if len(p) != 1 {
t.Fatalf("ConsumerInfo expected 1 result, got %d", len(p))
}
// Now do direct names list as well.
resp, err := nc.Request(fmt.Sprintf(server.JSApiConsumersT, "TEST"), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var clResponse server.JSApiConsumerNamesResponse
if err = json.Unmarshal(resp.Data, &clResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(clResponse.Consumers) != 1 {
t.Fatalf("Expected only 1 consumer but got %d", len(clResponse.Consumers))
}
}
func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()