diff --git a/server/consumer.go b/server/consumer.go index e28f3836..84474370 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/nats-io/nuid" @@ -198,6 +199,10 @@ var ( // Consumer is a jetstream consumer. type consumer struct { + // Atomic used to notify that we want to process an ack. + // This will be checked in checkPending to abort processing + // and let ack be processed in priority. + awl int64 mu sync.RWMutex js *jetStream mset *stream @@ -270,6 +275,9 @@ type consumer struct { pch chan struct{} phead *proposal ptail *proposal + + // Ack queue + ackMsgs *ipQueue } type proposal struct { @@ -562,6 +570,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri maxdc: uint64(config.MaxDeliver), maxp: config.MaxAckPending, created: time.Now().UTC(), + ackMsgs: newIPQueue(), } // Bind internal client to the user account. @@ -795,7 +804,7 @@ func (o *consumer) setLeader(isLeader bool) { } var err error - if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.processAck); err != nil { + if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil { o.mu.Unlock() o.deleteWithoutAdvisory() return @@ -1384,9 +1393,55 @@ func (o *consumer) sendAckReply(subj string) { o.sendAdvisory(subj, nil) } -// Process a message for the ack reply subject delivered with a message. -func (o *consumer) processAck(_ *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) { - _, msg := c.msgParts(rmsg) +type jsAckMsg struct { + subject string + reply string + hdr int + msg []byte +} + +var jsAckMsgPool sync.Pool + +func newJSAckMsg(subj, reply string, hdr int, msg []byte) *jsAckMsg { + var m *jsAckMsg + am := jsAckMsgPool.Get() + if am != nil { + m = am.(*jsAckMsg) + } else { + m = &jsAckMsg{} + } + // When getting something from a pool it is criticical that all fields are + // initialized. Doing this way guarantees that if someone adds a field to + // the structure, the compiler will fail the build if this line is not updated. + (*m) = jsAckMsg{subj, reply, hdr, msg} + return m +} + +func (am *jsAckMsg) returnToPool() { + if am == nil { + return + } + am.subject, am.reply, am.hdr, am.msg = _EMPTY_, _EMPTY_, -1, nil + jsAckMsgPool.Put(am) +} + +// Push the ack message to the consumer's ackMsgs queue +func (o *consumer) pushAck(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { + atomic.AddInt64(&o.awl, 1) + o.ackMsgs.push(newJSAckMsg(subject, reply, c.pa.hdr, copyBytes(rmsg))) +} + +// Processes a message for the ack reply subject delivered with a message. +func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { + defer atomic.AddInt64(&o.awl, -1) + + var msg []byte + if hdr > 0 { + msg = rmsg[hdr:] + } else { + msg = rmsg + } + sseq, dseq, dc := ackReplyInfo(subject) skipAckReply := sseq == 0 @@ -1396,16 +1451,7 @@ func (o *consumer) processAck(_ *subscription, c *client, acc *Account, subject, o.processAckMsg(sseq, dseq, dc, true) case bytes.HasPrefix(msg, AckNext): o.processAckMsg(sseq, dseq, dc, true) - // processNextMsgReq can be invoked from an internal subscription or from here. - // Therefore, it has to call msgParts(), so we can't simply pass msg[len(AckNext):] - // with current c.pa.hdr because it would cause a panic. We will save the current - // c.pa.hdr value and disable headers before calling processNextMsgReq and then - // restore so that we don't mess with the calling stack in case it is used - // somewhere else. - phdr := c.pa.hdr - c.pa.hdr = -1 - o.processNextMsgReq(nil, c, acc, subject, reply, msg[len(AckNext):]) - c.pa.hdr = phdr + o.processNextMsgRequest(reply, msg[len(AckNext):]) skipAckReply = true case bytes.HasPrefix(msg, AckNak): o.processNak(sseq, dseq, dc, msg) @@ -2382,7 +2428,10 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, return } _, msg = c.msgParts(msg) + o.processNextMsgRequest(reply, msg) +} +func (o *consumer) processNextMsgRequest(reply string, msg []byte) { o.mu.Lock() defer o.mu.Unlock() @@ -2851,6 +2900,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { o.mu.Unlock() select { + case <-o.ackMsgs.ch: + acks := o.ackMsgs.pop() + for _, acki := range acks { + ack := acki.(*jsAckMsg) + o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg) + ack.returnToPool() + } + o.ackMsgs.recycle(&acks) case interest := <-inch: // inch can be nil on pull-based, but then this will // just block and not fire. @@ -3190,7 +3247,12 @@ func (o *consumer) checkPending() { // Since we can update timestamps, we have to review all pending. // We may want to unlock here or warn if list is big. var expired []uint64 + check := len(o.pending) > 1024 for seq, p := range o.pending { + if check && atomic.LoadInt64(&o.awl) > 0 { + o.ptmr.Reset(100 * time.Millisecond) + return + } // Check if these are no longer valid. if seq < fseq { delete(o.pending, seq) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index c8920522..5dc4467f 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -5983,9 +5983,12 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { checkNumMsgs := func(numExpected int) { t.Helper() - if state := mset.state(); state.Msgs != uint64(numExpected) { - t.Fatalf("Expected %d messages, got %d", numExpected, state.Msgs) - } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if state := mset.state(); state.Msgs != uint64(numExpected) { + return fmt.Errorf("Expected %d messages, got %d", numExpected, state.Msgs) + } + return nil + }) } // Since we had no interest this should be 0. @@ -6144,7 +6147,7 @@ func TestJetStreamInterestRetentionStreamWithFilteredConsumers(t *testing.T) { if err != nil { t.Fatalf("Unexpected error getting msg: %v", err) } - m.Ack() + m.AckSync() } checkState := func(expected uint64) {