mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] JetStream stream info consumers count in clustered mode
In clustering mode, the number of consumers in stream info may be wrong in presence of non durable consumers. Ephemeral are handled by specific nodes. The StreamInfo response would contain only the consumer count that the stream leader is handling. This fix overrides the stream's state consumers count with the number of consumers from the stream assignment record. Resolves #2895 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1645,6 +1645,8 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
resp.ApiResponse.Type = JSApiStreamCreateResponseType
|
||||
}
|
||||
|
||||
var clusterWideConsCount int
|
||||
|
||||
// If we are in clustered mode we need to be the stream leader to proceed.
|
||||
if s.JetStreamIsClustered() {
|
||||
// Check to make sure the stream is assigned.
|
||||
@@ -1655,6 +1657,9 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
|
||||
js.mu.RLock()
|
||||
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
|
||||
if sa != nil {
|
||||
clusterWideConsCount = len(sa.consumers)
|
||||
}
|
||||
js.mu.RUnlock()
|
||||
|
||||
if isLeader && sa == nil {
|
||||
@@ -1730,6 +1735,9 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
|
||||
Domain: s.getOpts().JetStreamDomain,
|
||||
Cluster: js.clusterInfo(mset.raftGroup()),
|
||||
}
|
||||
if clusterWideConsCount > 0 {
|
||||
resp.StreamInfo.State.Consumers = clusterWideConsCount
|
||||
}
|
||||
if mset.isMirror() {
|
||||
resp.StreamInfo.Mirror = mset.mirrorInfo()
|
||||
} else if mset.hasSources() {
|
||||
|
||||
@@ -4250,7 +4250,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
|
||||
}()
|
||||
|
||||
var missingNames []string
|
||||
sent := map[string]struct{}{}
|
||||
sent := map[string]int{}
|
||||
// Send out our requests here.
|
||||
for _, sa := range streams {
|
||||
if s.allPeersOffline(sa.Group) {
|
||||
@@ -4261,7 +4261,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
|
||||
} else {
|
||||
isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name)
|
||||
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
|
||||
sent[sa.Config.Name] = struct{}{}
|
||||
sent[sa.Config.Name] = len(sa.consumers)
|
||||
}
|
||||
}
|
||||
// Don't hold lock.
|
||||
@@ -4284,6 +4284,10 @@ LOOP:
|
||||
resp.Missing = missingNames
|
||||
break LOOP
|
||||
case si := <-rc:
|
||||
consCount := sent[si.Config.Name]
|
||||
if consCount > 0 {
|
||||
si.State.Consumers = consCount
|
||||
}
|
||||
delete(sent, si.Config.Name)
|
||||
resp.Streams = append(resp.Streams, si)
|
||||
// Check to see if we are done.
|
||||
|
||||
@@ -10736,6 +10736,46 @@ func TestJetStreamClusterInterestRetentionWithFilteredConsumersExtra(t *testing.
|
||||
checkState(0)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamConsumersCount(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
sname := "TEST_STREAM_CONS_COUNT"
|
||||
_, err := js.AddStream(&nats.StreamConfig{Name: sname, Subjects: []string{"foo"}, Replicas: 3})
|
||||
require_NoError(t, err)
|
||||
|
||||
// Create some R1 consumers
|
||||
for i := 0; i < 10; i++ {
|
||||
inbox := nats.NewInbox()
|
||||
natsSubSync(t, nc, inbox)
|
||||
_, err = js.AddConsumer(sname, &nats.ConsumerConfig{DeliverSubject: inbox})
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
// Now check that the consumer count in stream info/list is 10
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
// Check stream info
|
||||
si, err := js.StreamInfo(sname)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error getting stream info: %v", err)
|
||||
}
|
||||
if n := si.State.Consumers; n != 10 {
|
||||
return fmt.Errorf("From StreamInfo, expecting 10 consumers, got %v", n)
|
||||
}
|
||||
|
||||
// Now from stream list
|
||||
for si := range js.StreamsInfo() {
|
||||
if n := si.State.Consumers; n != 10 {
|
||||
return fmt.Errorf("From StreamsInfo, expecting 10 consumers, got %v", n)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Support functions
|
||||
|
||||
// Used to setup superclusters for tests.
|
||||
|
||||
Reference in New Issue
Block a user