mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Inline flow in clustered mode was flaky, removed that path.
Changed up accounting. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1998,58 +1998,6 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str
|
||||
return fmt.Sprintf(o.ackReplyT, dc, sseq, dseq, ts, pending)
|
||||
}
|
||||
|
||||
// deliverCurrentMsg is the hot path to deliver a message that was just received.
|
||||
// Will return if the message was delivered or not.
|
||||
func (o *consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, ts int64) bool {
|
||||
o.mu.Lock()
|
||||
if seq != o.sseq {
|
||||
o.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// If we are in push mode and not active let's stop sending.
|
||||
if o.isPushMode() && !o.active {
|
||||
o.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// If we are in pull mode and no one is waiting already break and wait.
|
||||
if o.isPullMode() && !o.checkWaitingForInterest() {
|
||||
o.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// Since we short circuit the getNextMsg() call where we check for max pending
|
||||
// we need to do that here as well.
|
||||
if o.maxp > 0 && len(o.pending) >= o.maxp {
|
||||
o.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// Bump store sequence here.
|
||||
o.sseq++
|
||||
|
||||
// If we are partitioned and we do not match, do not consider this a failure.
|
||||
// Go ahead and return true.
|
||||
if o.cfg.FilterSubject != _EMPTY_ && !o.isFilteredMatch(subj) {
|
||||
o.updateSkipped()
|
||||
o.mu.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
var dsubj string
|
||||
if wr := o.waiting.pop(); wr != nil {
|
||||
dsubj = wr.reply
|
||||
} else {
|
||||
dsubj = o.dsubj
|
||||
}
|
||||
|
||||
o.deliverMsg(dsubj, subj, hdr, msg, seq, 1, ts)
|
||||
o.mu.Unlock()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Deliver a msg to the consumer.
|
||||
// Lock should be held and o.mset validated to be non-nil.
|
||||
func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) {
|
||||
@@ -2069,10 +2017,13 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
|
||||
}
|
||||
|
||||
dseq := o.dseq
|
||||
o.dseq++
|
||||
|
||||
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq}
|
||||
mset := o.mset
|
||||
ap := o.cfg.AckPolicy
|
||||
sendq := o.sendq
|
||||
|
||||
// This needs to be unlocked since the other side may need this lock on a failed delivery.
|
||||
o.mu.Unlock()
|
||||
// Send message.
|
||||
@@ -2087,12 +2038,10 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
|
||||
if ap == AckExplicit || ap == AckAll {
|
||||
o.trackPending(seq, dseq)
|
||||
} else if ap == AckNone {
|
||||
o.adflr = o.dseq
|
||||
o.adflr = dseq
|
||||
o.asflr = seq
|
||||
}
|
||||
|
||||
o.dseq++
|
||||
|
||||
// FIXME(dlc) - Capture errors?
|
||||
o.updateDelivered(dseq, seq, dc, ts)
|
||||
}
|
||||
@@ -2667,15 +2616,6 @@ func (o *consumer) setInitialPending() {
|
||||
}
|
||||
}
|
||||
|
||||
// addStreamPending will add to the stream pending.
|
||||
func (o *consumer) incStreamPending(sseq uint64, subj string) {
|
||||
o.mu.Lock()
|
||||
if o.isFilteredMatch(subj) {
|
||||
o.sgap++
|
||||
}
|
||||
o.mu.Unlock()
|
||||
}
|
||||
|
||||
func (o *consumer) decStreamPending(sseq uint64, subj string) {
|
||||
o.mu.Lock()
|
||||
// Ignore if we have already seen this one.
|
||||
|
||||
Reference in New Issue
Block a user