Fixed a panic when consumer is closed

Panic was:
```
=== RUN   TestJetStreamClusterDelete
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xcec8fb]
goroutine 1761 [running]:
github.com/nats-io/nats-server/v2/server.(*stream).config(0x0)
	/home/travis/gopath/src/github.com/nats-io/nats-server/server/stream.go:1192 +0x5b
github.com/nats-io/nats-server/v2/server.(*consumer).replica(0xc000101400)
	/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:3580 +0xea
github.com/nats-io/nats-server/v2/server.(*jetStream).monitorConsumer(0xc0001d2790, 0xc000101400, 0xc0004df0e0)
	/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:3733 +0xe06
github.com/nats-io/nats-server/v2/server.(*jetStream).processClusterCreateConsumer.func1()
	/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:3445 +0x4d
created by github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine
	/home/travis/gopath/src/github.com/nats-io/nats-server/server/server.go:3057 +0x85
FAIL	github.com/nats-io/nats-server/v2/server	9.911s
```

Seem to have been introduced in #3282

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-07-27 16:42:32 -06:00
parent 23c475e0d2
commit 88203dd5d5

View File

@@ -3572,13 +3572,18 @@ func (o *consumer) streamAndNode() (*stream, RaftNode) {
return o.mset, o.node
}
func (o *consumer) replica() int {
// Return the replica count for this consumer. If the consumer has been
// stopped, this will return an error.
func (o *consumer) replica() (int, error) {
o.mu.RLock()
oCfg := o.cfg
mset := o.mset
o.mu.RUnlock()
if mset == nil {
return 0, errBadConsumer
}
sCfg := mset.config()
return oCfg.replicas(&sCfg)
return oCfg.replicas(&sCfg), nil
}
func (o *consumer) raftGroup() *raftGroup {
@@ -3730,7 +3735,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
// Check for migrations (peer count and replica count differ) here.
// We set the state on the stream assignment update below.
if isLeader && len(rg.Peers) != o.replica() {
replicas, err := o.replica()
if err != nil {
return
}
if isLeader && len(rg.Peers) != replicas {
startMigrationMonitoring()
} else {
stopMigrationMonitoring()
@@ -3744,7 +3753,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
// keep peer list up to date with config
js.checkPeers(rg)
// If we are migrating, monitor for the new peers to be caught up.
if isLeader && len(rg.Peers) != o.replica() {
replicas, err := o.replica()
if err != nil {
return
}
if isLeader && len(rg.Peers) != replicas {
startMigrationMonitoring()
} else {
stopMigrationMonitoring()
@@ -3756,7 +3769,10 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
continue
}
rg := o.raftGroup()
replicas := o.replica()
replicas, err := o.replica()
if err != nil {
return
}
if len(rg.Peers) <= replicas {
// Migration no longer happening, so not our job anymore
stopMigrationMonitoring()