Optimize non-inline direct gets to not use simple go routines

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-06 07:50:57 -07:00
parent a5326c97ef
commit 2da50512e2
2 changed files with 37 additions and 27 deletions

View File

@@ -110,9 +110,6 @@ const (
// For stalling fast producers
stallClientMinDuration = 100 * time.Millisecond
stallClientMaxDuration = time.Second
// Threshold for not knowingly doing a potential blocking operation when internal and on a route or gateway or leafnode.
noBlockThresh = 500 * time.Millisecond
)
var readLoopReportThreshold = readLoopReport

View File

@@ -193,6 +193,7 @@ type stream struct {
pubAck []byte
outq *jsOutQ
msgs *ipQueue[*inMsg]
gets *ipQueue[*directGetReq]
store StreamStore
ackq *ipQueue[uint64]
lseq uint64
@@ -460,6 +461,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
@@ -3528,12 +3530,25 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
mset.queueInbound(mset.msgs, subj, rply, hdr, msg)
}
var dgPool = sync.Pool{
New: func() interface{} {
return &directGetReq{}
},
}
// For when we need to not inline the request.
type directGetReq struct {
// Copy of this is correct for this.
req JSApiMsgGetRequest
reply string
}
// processDirectGetRequest handles direct get request for stream messages.
func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
_, msg := c.msgParts(rmsg)
if len(msg) == 0 {
hdr := []byte("NATS/1.0 408 Empty Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
@@ -3566,26 +3581,20 @@ func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Accou
inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}
if inlineOk {
mset.getDirectRequest(&req, reply)
dg := dgPool.Get().(*directGetReq)
dg.req, dg.reply = req, reply
mset.gets.push(dg)
} else {
go mset.getDirectRequest(&req, reply)
mset.getDirectRequest(&req, reply)
}
}
// This is for direct get by last subject which is part of the subject itself.
func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
_, msg := c.msgParts(rmsg)
// This version expects no payload.
if len(msg) != 0 {
hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n")
@@ -3611,19 +3620,15 @@ func (mset *stream) processDirectGetLastBySubjectRequest(_ *subscription, c *cli
return
}
req := JSApiMsgGetRequest{LastFor: key}
inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}
if inlineOk {
mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply)
dg := dgPool.Get().(*directGetReq)
dg.req, dg.reply = req, reply
mset.gets.push(dg)
} else {
go mset.getDirectRequest(&JSApiMsgGetRequest{LastFor: key}, reply)
mset.getDirectRequest(&req, reply)
}
}
@@ -4332,7 +4337,7 @@ func (mset *stream) internalLoop() {
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, msgs := mset.outq, mset.qch, mset.msgs
outq, qch, msgs, gets := mset.outq, mset.qch, mset.msgs, mset.gets
// For the ack msgs queue for interest retention.
var (
@@ -4413,6 +4418,14 @@ func (mset *stream) internalLoop() {
}
}
msgs.recycle(&ims)
case <-gets.ch:
dgs := gets.pop()
for _, dg := range dgs {
mset.getDirectRequest(&dg.req, dg.reply)
dgPool.Put(dg)
}
gets.recycle(&dgs)
case <-amch:
seqs := ackq.pop()
for _, seq := range seqs {
@@ -5099,7 +5112,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
if err != nil {
return nil, err
}
if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA {
if hdr.Typeflag != tar.TypeReg {
return nil, logAndReturnError()
}
fpath := filepath.Join(sdir, filepath.Clean(hdr.Name))