From 9f1bc5882ca15bf3b2a3bed4d1813ecac20bfd3f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 21 Apr 2021 12:13:10 -0700 Subject: [PATCH] Make sure to return no consumer if we are still waiting on a leader to be elected. Signed-off-by: Derek Collison --- server/jetstream_api.go | 12 ++++++++++++ server/jetstream_cluster_test.go | 7 +------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index c110203d..d6f6751c 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1480,6 +1480,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep resp.Total = numStreams resp.Limit = JSApiNamesLimit resp.Offset = offset + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } @@ -3281,6 +3282,10 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re js.mu.RLock() isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName) + // Ignore pending consumers for now. + if ca != nil && ca.pending { + ca = nil + } js.mu.RUnlock() if isLeader && ca == nil { @@ -3320,6 +3325,13 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re resp.Error = jsClusterNotAvailErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } + // Check here if we are a member and this is just a new consumer that does not have a leader yet. + if rg := ca.Group; rg.node != nil && rg.isMember(cc.meta.ID()) { + if rg.node.GroupLeader() == _EMPTY_ && !rg.node.HadPreviousLeader() { + resp.Error = jsNoConsumerErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } + } return } } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 2c53982d..364fa9ac 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5839,14 +5839,9 @@ func TestJetStreamClusterCreateConcurrentDurableConsumers(t *testing.T) { defer c.shutdown() // Client for API requests. - nc, _ := jsClientConnect(t, c.randomServer()) + nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - js, err := nc.JetStream(nats.MaxWait(10 * time.Second)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - // Create origin stream, muct be R > 1 if _, err := js.AddStream(&nats.StreamConfig{Name: "ORDERS", Replicas: 3}); err != nil { t.Fatalf("Unexpected error: %v", err)