Lock access updates.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-10-29 13:42:59 -07:00
parent cce7195a2c
commit 6232e4b150

View File

@@ -905,9 +905,9 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
return
}
}
mset.mu.Unlock()
if c == nil {
mset.mu.Unlock()
return
}
@@ -924,6 +924,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && len(msg) > maxMsgSize {
mset.mu.Unlock()
response = []byte("-ERR 'message size exceeds maximum allowed'")
if doAck && len(reply) > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
@@ -931,10 +932,10 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
return
}
// If we are interest based retention and have no consumers then skip.
if interestRetention {
var noInterest bool
var noInterest bool
// If we are interest based retention and have no consumers then we can skip.
if interestRetention {
if numConsumers == 0 {
noInterest = true
} else if mset.numFilter > 0 {
@@ -947,20 +948,22 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
}
}
}
}
mset.mu.Unlock()
if noInterest {
seq = store.SkipMsg()
if doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, time.Now().UnixNano()})
}
return
// Skip here.
if noInterest {
seq = store.SkipMsg()
if doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, time.Now().UnixNano()})
}
return
}
// If here we will attempt to store the message.