From 2da50512e27eaa69d94ee4b4214c16d0c65d0d2a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Apr 2023 07:50:57 -0700 Subject: [PATCH] Optimize non-inline direct gets to not use simple go routines Signed-off-by: Derek Collison --- server/client.go | 3 --- server/stream.go | 61 +++++++++++++++++++++++++++++------------------- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/server/client.go b/server/client.go index ccae5068..cb01533e 100644 --- a/server/client.go +++ b/server/client.go @@ -110,9 +110,6 @@ const ( // For stalling fast producers stallClientMinDuration = 100 * time.Millisecond stallClientMaxDuration = time.Second - - // Threshold for not knowingly doing a potential blocking operation when internal and on a route or gateway or leafnode. - noBlockThresh = 500 * time.Millisecond ) var readLoopReportThreshold = readLoopReport diff --git a/server/stream.go b/server/stream.go index 5aaffff4..75132d60 100644 --- a/server/stream.go +++ b/server/stream.go @@ -193,6 +193,7 @@ type stream struct { pubAck []byte outq *jsOutQ msgs *ipQueue[*inMsg] + gets *ipQueue[*directGetReq] store StreamStore ackq *ipQueue[uint64] lseq uint64 @@ -460,6 +461,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt stype: cfg.Storage, consumers: make(map[string]*consumer), msgs: newIPQueue[*inMsg](s, qpfx+"messages"), + gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), qch: make(chan struct{}), uch: make(chan struct{}, 4), sch: make(chan struct{}, 1), @@ -3528,12 +3530,25 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { mset.queueInbound(mset.msgs, subj, rply, hdr, msg) } +var dgPool = sync.Pool{ + New: func() interface{} { + return &directGetReq{} + }, +} + +// For when we need to not inline the request. +type directGetReq struct { + // Copy of this is correct for this. + req JSApiMsgGetRequest + reply string +} + // processDirectGetRequest handles direct get request for stream messages. func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - _, msg := c.msgParts(rmsg) if len(reply) == 0 { return } + _, msg := c.msgParts(rmsg) if len(msg) == 0 { hdr := []byte("NATS/1.0 408 Empty Request\r\n\r\n") mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) @@ -3566,26 +3581,20 @@ func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Accou inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF if !inlineOk { - // Check how long we have been away from the readloop for the route or gateway or leafnode. - // If too long move to a separate go routine. - if elapsed := time.Since(c.in.start); elapsed < noBlockThresh { - inlineOk = true - } - } - - if inlineOk { - mset.getDirectRequest(&req, reply) + dg := dgPool.Get().(*directGetReq) + dg.req, dg.reply = req, reply + mset.gets.push(dg) } else { - go mset.getDirectRequest(&req, reply) + mset.getDirectRequest(&req, reply) } } // This is for direct get by last subject which is part of the subject itself. func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - _, msg := c.msgParts(rmsg) if len(reply) == 0 { return } + _, msg := c.msgParts(rmsg) // This version expects no payload. if len(msg) != 0 { hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n") @@ -3611,19 +3620,15 @@ func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *cli return } + req := JSApiMsgGetRequest{LastFor: key} + inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF if !inlineOk { - // Check how long we have been away from the readloop for the route or gateway or leafnode. - // If too long move to a separate go routine. - if elapsed := time.Since(c.in.start); elapsed < noBlockThresh { - inlineOk = true - } - } - - if inlineOk { - mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply) + dg := dgPool.Get().(*directGetReq) + dg.req, dg.reply = req, reply + mset.gets.push(dg) } else { - go mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply) + mset.getDirectRequest(&req, reply) } } @@ -4332,7 +4337,7 @@ func (mset *stream) internalLoop() { c := s.createInternalJetStreamClient() c.registerWithAccount(mset.acc) defer c.closeConnection(ClientClosed) - outq, qch, msgs := mset.outq, mset.qch, mset.msgs + outq, qch, msgs, gets := mset.outq, mset.qch, mset.msgs, mset.gets // For the ack msgs queue for interest retention. var ( @@ -4413,6 +4418,14 @@ func (mset *stream) internalLoop() { } } msgs.recycle(&ims) + case <-gets.ch: + dgs := gets.pop() + for _, dg := range dgs { + mset.getDirectRequest(&dg.req, dg.reply) + dgPool.Put(dg) + } + gets.recycle(&dgs) + case <-amch: seqs := ackq.pop() for _, seq := range seqs { @@ -5099,7 +5112,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error if err != nil { return nil, err } - if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA { + if hdr.Typeflag != tar.TypeReg { return nil, logAndReturnError() } fpath := filepath.Join(sdir, filepath.Clean(hdr.Name))