diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3d0a580b..1ce61c92 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3572,13 +3572,18 @@ func (o *consumer) streamAndNode() (*stream, RaftNode) { return o.mset, o.node } -func (o *consumer) replica() int { +// Return the replica count for this consumer. If the consumer has been +// stopped, this will return an error. +func (o *consumer) replica() (int, error) { o.mu.RLock() oCfg := o.cfg mset := o.mset o.mu.RUnlock() + if mset == nil { + return 0, errBadConsumer + } sCfg := mset.config() - return oCfg.replicas(&sCfg) + return oCfg.replicas(&sCfg), nil } func (o *consumer) raftGroup() *raftGroup { @@ -3730,7 +3735,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // Check for migrations (peer count and replica count differ) here. // We set the state on the stream assignment update below. - if isLeader && len(rg.Peers) != o.replica() { + replicas, err := o.replica() + if err != nil { + return + } + if isLeader && len(rg.Peers) != replicas { startMigrationMonitoring() } else { stopMigrationMonitoring() @@ -3744,7 +3753,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // keep peer list up to date with config js.checkPeers(rg) // If we are migrating, monitor for the new peers to be caught up. - if isLeader && len(rg.Peers) != o.replica() { + replicas, err := o.replica() + if err != nil { + return + } + if isLeader && len(rg.Peers) != replicas { startMigrationMonitoring() } else { stopMigrationMonitoring() @@ -3756,7 +3769,10 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { continue } rg := o.raftGroup() - replicas := o.replica() + replicas, err := o.replica() + if err != nil { + return + } if len(rg.Peers) <= replicas { // Migration no longer happening, so not our job anymore stopMigrationMonitoring()