diff --git a/server/consumer.go b/server/consumer.go index c397c7f3..4f0353a8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1998,58 +1998,6 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str return fmt.Sprintf(o.ackReplyT, dc, sseq, dseq, ts, pending) } -// deliverCurrentMsg is the hot path to deliver a message that was just received. -// Will return if the message was delivered or not. -func (o *consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, ts int64) bool { - o.mu.Lock() - if seq != o.sseq { - o.mu.Unlock() - return false - } - - // If we are in push mode and not active let's stop sending. - if o.isPushMode() && !o.active { - o.mu.Unlock() - return false - } - - // If we are in pull mode and no one is waiting already break and wait. - if o.isPullMode() && !o.checkWaitingForInterest() { - o.mu.Unlock() - return false - } - - // Since we short circuit the getNextMsg() call where we check for max pending - // we need to do that here as well. - if o.maxp > 0 && len(o.pending) >= o.maxp { - o.mu.Unlock() - return false - } - - // Bump store sequence here. - o.sseq++ - - // If we are partitioned and we do not match, do not consider this a failure. - // Go ahead and return true. - if o.cfg.FilterSubject != _EMPTY_ && !o.isFilteredMatch(subj) { - o.updateSkipped() - o.mu.Unlock() - return true - } - - var dsubj string - if wr := o.waiting.pop(); wr != nil { - dsubj = wr.reply - } else { - dsubj = o.dsubj - } - - o.deliverMsg(dsubj, subj, hdr, msg, seq, 1, ts) - o.mu.Unlock() - - return true -} - // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { @@ -2069,10 +2017,13 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 } dseq := o.dseq + o.dseq++ + pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq} mset := o.mset ap := o.cfg.AckPolicy sendq := o.sendq + // This needs to be unlocked since the other side may need this lock on a failed delivery. o.mu.Unlock() // Send message. @@ -2087,12 +2038,10 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 if ap == AckExplicit || ap == AckAll { o.trackPending(seq, dseq) } else if ap == AckNone { - o.adflr = o.dseq + o.adflr = dseq o.asflr = seq } - o.dseq++ - // FIXME(dlc) - Capture errors? o.updateDelivered(dseq, seq, dc, ts) } @@ -2667,15 +2616,6 @@ func (o *consumer) setInitialPending() { } } -// addStreamPending will add to the stream pending. -func (o *consumer) incStreamPending(sseq uint64, subj string) { - o.mu.Lock() - if o.isFilteredMatch(subj) { - o.sgap++ - } - o.mu.Unlock() -} - func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Lock() // Ignore if we have already seen this one.