diff --git a/server/jetstream_api.go b/server/jetstream_api.go index c110203d..805de08a 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1480,6 +1480,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep resp.Total = numStreams resp.Limit = JSApiNamesLimit resp.Offset = offset + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } @@ -3281,6 +3282,10 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re js.mu.RLock() isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName) + // Ignore pending consumers for now. + if ca != nil && ca.pending { + ca = nil + } js.mu.RUnlock() if isLeader && ca == nil { @@ -3319,6 +3324,12 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re if js.isLeaderless() { resp.Error = jsClusterNotAvailErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } else if rg := ca.Group; rg != nil && rg.node != nil && rg.isMember(cc.meta.ID()) { + // Check here if we are a member and this is just a new consumer that does not have a leader yet. + if rg.node.GroupLeader() == _EMPTY_ && !rg.node.HadPreviousLeader() { + resp.Error = jsNoConsumerErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } } return } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 2c53982d..364fa9ac 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5839,14 +5839,9 @@ func TestJetStreamClusterCreateConcurrentDurableConsumers(t *testing.T) { defer c.shutdown() // Client for API requests. - nc, _ := jsClientConnect(t, c.randomServer()) + nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - js, err := nc.JetStream(nats.MaxWait(10 * time.Second)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - // Create origin stream, muct be R > 1 if _, err := js.AddStream(&nats.StreamConfig{Name: "ORDERS", Replicas: 3}); err != nil { t.Fatalf("Unexpected error: %v", err)