From 88203dd5d5eb18a06240a215a795fa109a73e6e5 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 27 Jul 2022 16:42:32 -0600 Subject: [PATCH] Fixed a panic when consumer is closed Panic was: ``` === RUN TestJetStreamClusterDelete panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xcec8fb] goroutine 1761 [running]: github.com/nats-io/nats-server/v2/server.(*stream).config(0x0) /home/travis/gopath/src/github.com/nats-io/nats-server/server/stream.go:1192 +0x5b github.com/nats-io/nats-server/v2/server.(*consumer).replica(0xc000101400) /home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:3580 +0xea github.com/nats-io/nats-server/v2/server.(*jetStream).monitorConsumer(0xc0001d2790, 0xc000101400, 0xc0004df0e0) /home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:3733 +0xe06 github.com/nats-io/nats-server/v2/server.(*jetStream).processClusterCreateConsumer.func1() /home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:3445 +0x4d created by github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine /home/travis/gopath/src/github.com/nats-io/nats-server/server/server.go:3057 +0x85 FAIL github.com/nats-io/nats-server/v2/server 9.911s ``` Seem to have been introduced in #3282 Signed-off-by: Ivan Kozlovic --- server/jetstream_cluster.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3d0a580b..1ce61c92 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3572,13 +3572,18 @@ func (o *consumer) streamAndNode() (*stream, RaftNode) { return o.mset, o.node } -func (o *consumer) replica() int { +// Return the replica count for this consumer. If the consumer has been +// stopped, this will return an error. +func (o *consumer) replica() (int, error) { o.mu.RLock() oCfg := o.cfg mset := o.mset o.mu.RUnlock() + if mset == nil { + return 0, errBadConsumer + } sCfg := mset.config() - return oCfg.replicas(&sCfg) + return oCfg.replicas(&sCfg), nil } func (o *consumer) raftGroup() *raftGroup { @@ -3730,7 +3735,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // Check for migrations (peer count and replica count differ) here. // We set the state on the stream assignment update below. - if isLeader && len(rg.Peers) != o.replica() { + replicas, err := o.replica() + if err != nil { + return + } + if isLeader && len(rg.Peers) != replicas { startMigrationMonitoring() } else { stopMigrationMonitoring() @@ -3744,7 +3753,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // keep peer list up to date with config js.checkPeers(rg) // If we are migrating, monitor for the new peers to be caught up. - if isLeader && len(rg.Peers) != o.replica() { + replicas, err := o.replica() + if err != nil { + return + } + if isLeader && len(rg.Peers) != replicas { startMigrationMonitoring() } else { stopMigrationMonitoring() @@ -3756,7 +3769,10 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { continue } rg := o.raftGroup() - replicas := o.replica() + replicas, err := o.replica() + if err != nil { + return + } if len(rg.Peers) <= replicas { // Migration no longer happening, so not our job anymore stopMigrationMonitoring()