Merge pull request #2898 from nats-io/js_cons_ack_processing

[CHANGED] JetStream: Redeliveries may be delayed if necessary
This commit is contained in:
Ivan Kozlovic
2022-03-17 10:57:22 -06:00
committed by GitHub
2 changed files with 83 additions and 18 deletions

View File

@@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nats-io/nuid"
@@ -198,6 +199,10 @@ var (
// Consumer is a jetstream consumer.
type consumer struct {
// Atomic used to notify that we want to process an ack.
// This will be checked in checkPending to abort processing
// and let ack be processed in priority.
awl int64
mu sync.RWMutex
js *jetStream
mset *stream
@@ -270,6 +275,9 @@ type consumer struct {
pch chan struct{}
phead *proposal
ptail *proposal
// Ack queue
ackMsgs *ipQueue
}
type proposal struct {
@@ -562,6 +570,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
maxdc: uint64(config.MaxDeliver),
maxp: config.MaxAckPending,
created: time.Now().UTC(),
ackMsgs: newIPQueue(),
}
// Bind internal client to the user account.
@@ -795,7 +804,7 @@ func (o *consumer) setLeader(isLeader bool) {
}
var err error
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.processAck); err != nil {
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
@@ -1384,9 +1393,55 @@ func (o *consumer) sendAckReply(subj string) {
o.sendAdvisory(subj, nil)
}
// Process a message for the ack reply subject delivered with a message.
func (o *consumer) processAck(_ *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
type jsAckMsg struct {
subject string
reply string
hdr int
msg []byte
}
var jsAckMsgPool sync.Pool
func newJSAckMsg(subj, reply string, hdr int, msg []byte) *jsAckMsg {
var m *jsAckMsg
am := jsAckMsgPool.Get()
if am != nil {
m = am.(*jsAckMsg)
} else {
m = &jsAckMsg{}
}
// When getting something from a pool it is criticical that all fields are
// initialized. Doing this way guarantees that if someone adds a field to
// the structure, the compiler will fail the build if this line is not updated.
(*m) = jsAckMsg{subj, reply, hdr, msg}
return m
}
func (am *jsAckMsg) returnToPool() {
if am == nil {
return
}
am.subject, am.reply, am.hdr, am.msg = _EMPTY_, _EMPTY_, -1, nil
jsAckMsgPool.Put(am)
}
// Push the ack message to the consumer's ackMsgs queue
func (o *consumer) pushAck(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
atomic.AddInt64(&o.awl, 1)
o.ackMsgs.push(newJSAckMsg(subject, reply, c.pa.hdr, copyBytes(rmsg)))
}
// Processes a message for the ack reply subject delivered with a message.
func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
defer atomic.AddInt64(&o.awl, -1)
var msg []byte
if hdr > 0 {
msg = rmsg[hdr:]
} else {
msg = rmsg
}
sseq, dseq, dc := ackReplyInfo(subject)
skipAckReply := sseq == 0
@@ -1396,16 +1451,7 @@ func (o *consumer) processAck(_ *subscription, c *client, acc *Account, subject,
o.processAckMsg(sseq, dseq, dc, true)
case bytes.HasPrefix(msg, AckNext):
o.processAckMsg(sseq, dseq, dc, true)
// processNextMsgReq can be invoked from an internal subscription or from here.
// Therefore, it has to call msgParts(), so we can't simply pass msg[len(AckNext):]
// with current c.pa.hdr because it would cause a panic. We will save the current
// c.pa.hdr value and disable headers before calling processNextMsgReq and then
// restore so that we don't mess with the calling stack in case it is used
// somewhere else.
phdr := c.pa.hdr
c.pa.hdr = -1
o.processNextMsgReq(nil, c, acc, subject, reply, msg[len(AckNext):])
c.pa.hdr = phdr
o.processNextMsgRequest(reply, msg[len(AckNext):])
skipAckReply = true
case bytes.HasPrefix(msg, AckNak):
o.processNak(sseq, dseq, dc, msg)
@@ -2382,7 +2428,10 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
return
}
_, msg = c.msgParts(msg)
o.processNextMsgRequest(reply, msg)
}
func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
o.mu.Lock()
defer o.mu.Unlock()
@@ -2851,6 +2900,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
o.mu.Unlock()
select {
case <-o.ackMsgs.ch:
acks := o.ackMsgs.pop()
for _, acki := range acks {
ack := acki.(*jsAckMsg)
o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg)
ack.returnToPool()
}
o.ackMsgs.recycle(&acks)
case interest := <-inch:
// inch can be nil on pull-based, but then this will
// just block and not fire.
@@ -3190,7 +3247,12 @@ func (o *consumer) checkPending() {
// Since we can update timestamps, we have to review all pending.
// We may want to unlock here or warn if list is big.
var expired []uint64
check := len(o.pending) > 1024
for seq, p := range o.pending {
if check && atomic.LoadInt64(&o.awl) > 0 {
o.ptmr.Reset(100 * time.Millisecond)
return
}
// Check if these are no longer valid.
if seq < fseq {
delete(o.pending, seq)

View File

@@ -5983,9 +5983,12 @@ func TestJetStreamInterestRetentionStream(t *testing.T) {
checkNumMsgs := func(numExpected int) {
t.Helper()
if state := mset.state(); state.Msgs != uint64(numExpected) {
t.Fatalf("Expected %d messages, got %d", numExpected, state.Msgs)
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if state := mset.state(); state.Msgs != uint64(numExpected) {
return fmt.Errorf("Expected %d messages, got %d", numExpected, state.Msgs)
}
return nil
})
}
// Since we had no interest this should be 0.
@@ -6144,7 +6147,7 @@ func TestJetStreamInterestRetentionStreamWithFilteredConsumers(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error getting msg: %v", err)
}
m.Ack()
m.AckSync()
}
checkState := func(expected uint64) {