mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Make sure to return no consumer if we are still waiting on a leader to be elected.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1480,6 +1480,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
|
||||
resp.Total = numStreams
|
||||
resp.Limit = JSApiNamesLimit
|
||||
resp.Offset = offset
|
||||
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
|
||||
}
|
||||
|
||||
@@ -3281,6 +3282,10 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
|
||||
|
||||
js.mu.RLock()
|
||||
isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
|
||||
// Ignore pending consumers for now.
|
||||
if ca != nil && ca.pending {
|
||||
ca = nil
|
||||
}
|
||||
js.mu.RUnlock()
|
||||
|
||||
if isLeader && ca == nil {
|
||||
@@ -3320,6 +3325,13 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
|
||||
resp.Error = jsClusterNotAvailErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
}
|
||||
// Check here if we are a member and this is just a new consumer that does not have a leader yet.
|
||||
if rg := ca.Group; rg.node != nil && rg.isMember(cc.meta.ID()) {
|
||||
if rg.node.GroupLeader() == _EMPTY_ && !rg.node.HadPreviousLeader() {
|
||||
resp.Error = jsNoConsumerErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5839,14 +5839,9 @@ func TestJetStreamClusterCreateConcurrentDurableConsumers(t *testing.T) {
|
||||
defer c.shutdown()
|
||||
|
||||
// Client for API requests.
|
||||
nc, _ := jsClientConnect(t, c.randomServer())
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
js, err := nc.JetStream(nats.MaxWait(10 * time.Second))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Create origin stream, muct be R > 1
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "ORDERS", Replicas: 3}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user