diff --git a/server/stream.go b/server/stream.go index 1089a294..3296a56d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1292,7 +1292,9 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { func (mset *stream) setMirrorErr(err *ApiError) { mset.mu.Lock() - mset.mirror.err = err + if mset.mirror != nil { + mset.mirror.err = err + } mset.mu.Unlock() } @@ -1421,16 +1423,22 @@ func (mset *stream) setupMirrorConsumer() error { if ccr.Error != nil || ccr.ConsumerInfo == nil { mset.cancelMirrorConsumer() } else { - // Capture consumer name. mset.mu.Lock() - if mset.mirror != nil { - mset.mirror.cname = ccr.ConsumerInfo.Name + // Mirror config has been removed. + if mset.mirror == nil { + mset.mu.Unlock() + mset.cancelMirrorConsumer() + return } + // Capture consumer name. + mset.mirror.cname = ccr.ConsumerInfo.Name + msgs := mset.mirror.msgs + // Process inbound mirror messages from the wire. sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. - mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg) + mset.queueInbound(msgs, subject, reply, hdr, msg) }) if err != nil { mset.mirror.err = jsError(err)