Don't need to release locks now with outq. Also borrow once

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-04 17:58:01 -08:00
parent 207ebd3b3d
commit 4cdfb0ab6e

View File

@@ -428,6 +428,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
sysc: s.createInternalJetStreamClient(),
cfg: *config,
dsubj: config.DeliverSubject,
outq: mset.outq,
active: true,
qch: make(chan struct{}),
mch: make(chan struct{}, 1),
@@ -623,7 +624,7 @@ func (o *consumer) setLeader(isLeader bool) {
}
mset.mu.RLock()
s, jsa, stream, outq := mset.srv, mset.jsa, mset.cfg.Name, mset.outq
s, jsa, stream := mset.srv, mset.jsa, mset.cfg.Name
mset.mu.RUnlock()
o.mu.Lock()
@@ -670,9 +671,6 @@ func (o *consumer) setLeader(isLeader bool) {
o.replay = true
}
// We borrow this from the mset.
o.outq = outq
// Recreate quit channel.
o.qch = make(chan struct{})
qch := o.qch
@@ -692,7 +690,6 @@ func (o *consumer) setLeader(isLeader bool) {
o.srv.sysUnsubscribe(o.infoSub)
o.infoSub = nil
}
o.outq = nil
if o.qch != nil {
close(o.qch)
o.qch = nil
@@ -738,14 +735,8 @@ func (o *consumer) unsubscribe(sub *subscription) {
// We need to make sure we protect access to the outq.
// Do all advisory sends here.
// Lock should be held on entry but will be released.
func (o *consumer) sendAdvisory(subj string, msg []byte) {
outq := o.outq
o.mu.Unlock()
if outq != nil {
outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil})
}
o.mu.Lock()
o.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil})
}
func (o *consumer) sendDeleteAdvisoryLocked() {
@@ -1580,14 +1571,10 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string
if mset == nil {
return
}
outq := o.outq
sendErr := func(status int, description string) {
// Needs to be unlocked to send err.
o.mu.Unlock()
defer o.mu.Lock()
hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description))
outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil})
o.outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil})
}
if o.isPushMode() {
@@ -1777,7 +1764,7 @@ func (o *consumer) forceExpireFirstWaiting() *waitingRequest {
return wr
}
// If we are expiring this and we think there is still interest, alert.
if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.outq != nil {
if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil {
// We still appear to have interest, so send alert as courtesy.
hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n")
o.outq.send(&jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0, nil})
@@ -1950,7 +1937,7 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str
// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) {
if o.mset == nil || o.outq == nil {
if o.mset == nil {
return
}
// Update pending on first attempt
@@ -1971,19 +1958,14 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil}
mset := o.mset
ap := o.cfg.AckPolicy
outq := o.outq
// This needs to be unlocked since the other side may need this lock on a failed delivery.
o.mu.Unlock()
// Send message.
outq.send(pmsg)
o.outq.send(pmsg)
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) {
mset.store.RemoveMsg(seq)
}
// Re-acquire lock.
o.mu.Lock()
if ap == AckExplicit || ap == AckAll {
o.trackPending(seq, dseq)