From 804ce102ac8c6b8d161ee396fef7e123b1a598e5 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 2 Mar 2022 16:58:18 -0700 Subject: [PATCH 1/2] [CHANGED] JetStream: Redeliveries may be delayed if necessary We have seen situations where when a lot of pending messages accumulate, there is a contention between the processing of the ACKs and the checking of the pending map. Decision is made to abort checking of pending list if processing of ack(s) would be delayed because of that. The result is that a redelivery may be post-poned. Internally, the ACKs are also now using a queue to prevent processing of them from the network handler, which could cause head-of-line blocking, especially bad for routes. Signed-off-by: Ivan Kozlovic --- server/consumer.go | 90 +++++++++++++++++++++++++++++++++------- server/jetstream_test.go | 9 ++-- 2 files changed, 82 insertions(+), 17 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 00515fc1..07079f61 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/nats-io/nuid" @@ -198,6 +199,10 @@ var ( // Consumer is a jetstream consumer. type consumer struct { + // Atomic used to notify that we want to process an ack. + // This will be checked in checkPending to abort processing + // and let ack be processed in priority. + awl int64 mu sync.RWMutex js *jetStream mset *stream @@ -270,6 +275,9 @@ type consumer struct { pch chan struct{} phead *proposal ptail *proposal + + // Ack queue + ackMsgs *ipQueue } type proposal struct { @@ -562,6 +570,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri maxdc: uint64(config.MaxDeliver), maxp: config.MaxAckPending, created: time.Now().UTC(), + ackMsgs: newIPQueue(), } // Bind internal client to the user account. @@ -785,7 +794,7 @@ func (o *consumer) setLeader(isLeader bool) { } var err error - if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.processAck); err != nil { + if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil { o.mu.Unlock() o.deleteWithoutAdvisory() return @@ -1375,9 +1384,55 @@ func (o *consumer) sendAckReply(subj string) { o.sendAdvisory(subj, nil) } -// Process a message for the ack reply subject delivered with a message. -func (o *consumer) processAck(_ *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) { - _, msg := c.msgParts(rmsg) +type jsAckMsg struct { + subject string + reply string + hdr int + msg []byte +} + +var jsAckMsgPool sync.Pool + +func newJSAckMsg(subj, reply string, hdr int, msg []byte) *jsAckMsg { + var m *jsAckMsg + am := jsAckMsgPool.Get() + if am != nil { + m = am.(*jsAckMsg) + } else { + m = &jsAckMsg{} + } + // When getting something from a pool it is criticical that all fields are + // initialized. Doing this way guarantees that if someone adds a field to + // the structure, the compiler will fail the build if this line is not updated. + (*m) = jsAckMsg{subj, reply, hdr, msg} + return m +} + +func (am *jsAckMsg) returnToPool() { + if am == nil { + return + } + am.subject, am.reply, am.hdr, am.msg = _EMPTY_, _EMPTY_, -1, nil + jsAckMsgPool.Put(am) +} + +// Push the ack message to the consumer's ackMsgs queue +func (o *consumer) pushAck(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { + atomic.AddInt64(&o.awl, 1) + o.ackMsgs.push(newJSAckMsg(subject, reply, c.pa.hdr, copyBytes(rmsg))) +} + +// Processes a message for the ack reply subject delivered with a message. +func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { + defer atomic.AddInt64(&o.awl, -1) + + var msg []byte + if hdr > 0 { + msg = rmsg[hdr:] + } else { + msg = rmsg + } + sseq, dseq, dc := ackReplyInfo(subject) skipAckReply := sseq == 0 @@ -1387,16 +1442,7 @@ func (o *consumer) processAck(_ *subscription, c *client, acc *Account, subject, o.processAckMsg(sseq, dseq, dc, true) case bytes.HasPrefix(msg, AckNext): o.processAckMsg(sseq, dseq, dc, true) - // processNextMsgReq can be invoked from an internal subscription or from here. - // Therefore, it has to call msgParts(), so we can't simply pass msg[len(AckNext):] - // with current c.pa.hdr because it would cause a panic. We will save the current - // c.pa.hdr value and disable headers before calling processNextMsgReq and then - // restore so that we don't mess with the calling stack in case it is used - // somewhere else. - phdr := c.pa.hdr - c.pa.hdr = -1 - o.processNextMsgReq(nil, c, acc, subject, reply, msg[len(AckNext):]) - c.pa.hdr = phdr + o.processNextMsgRequest(reply, msg[len(AckNext):]) skipAckReply = true case bytes.HasPrefix(msg, AckNak): o.processNak(sseq, dseq, dc, msg) @@ -2373,7 +2419,10 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, return } _, msg = c.msgParts(msg) + o.processNextMsgRequest(reply, msg) +} +func (o *consumer) processNextMsgRequest(reply string, msg []byte) { o.mu.Lock() defer o.mu.Unlock() @@ -2842,6 +2891,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { o.mu.Unlock() select { + case <-o.ackMsgs.ch: + acks := o.ackMsgs.pop() + for _, acki := range acks { + ack := acki.(*jsAckMsg) + o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg) + ack.returnToPool() + } + o.ackMsgs.recycle(&acks) case interest := <-inch: // inch can be nil on pull-based, but then this will // just block and not fire. @@ -3181,7 +3238,12 @@ func (o *consumer) checkPending() { // Since we can update timestamps, we have to review all pending. // We may want to unlock here or warn if list is big. var expired []uint64 + check := len(o.pending) > 1024 for seq, p := range o.pending { + if check && atomic.LoadInt64(&o.awl) > 0 { + o.ptmr.Reset(100 * time.Millisecond) + return + } // Check if these are no longer valid. if seq < fseq { delete(o.pending, seq) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 96b03da8..c6b3d509 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -6009,9 +6009,12 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { checkNumMsgs := func(numExpected int) { t.Helper() - if state := mset.state(); state.Msgs != uint64(numExpected) { - t.Fatalf("Expected %d messages, got %d", numExpected, state.Msgs) - } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if state := mset.state(); state.Msgs != uint64(numExpected) { + return fmt.Errorf("Expected %d messages, got %d", numExpected, state.Msgs) + } + return nil + }) } // Since we had no interest this should be 0. From 97612c0fac1fadb8ba1d15b8c84ae46fc20b9265 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 3 Mar 2022 15:09:01 -0700 Subject: [PATCH 2/2] Fix test by using AckSync() to avoid flapping Signed-off-by: Ivan Kozlovic --- server/jetstream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index c6b3d509..2a43e3bd 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -6173,7 +6173,7 @@ func TestJetStreamInterestRetentionStreamWithFilteredConsumers(t *testing.T) { if err != nil { t.Fatalf("Unexpected error getting msg: %v", err) } - m.Ack() + m.AckSync() } checkState := func(expected uint64) {