Merge pull request #3221 from nats-io/direct

Made direct get from a stream part of the $JS.API hierarchy vs separate.
This commit is contained in:
Derek Collison
2022-06-28 09:59:44 -07:00
committed by GitHub
5 changed files with 102 additions and 10 deletions

View File

@@ -110,6 +110,9 @@ const (
// For stalling fast producers
stallClientMinDuration = 100 * time.Millisecond
stallClientMaxDuration = time.Second
// Threshold for not knowingly doing a potential blocking operation when internal and on a route or gateway or leafnode.
noBlockThresh = 500 * time.Millisecond
)
var readLoopReportThreshold = readLoopReport
@@ -384,6 +387,9 @@ type readCache struct {
// These are for readcache flags to avoind locks.
flags readCacheFlag
// Capture the time we started processing our readLoop.
start time.Time
}
// set the flag (would be equivalent to set the boolean to true)
@@ -1206,7 +1212,6 @@ func (c *client) readLoop(pre []byte) {
} else {
bufs[0] = b[:n]
}
start := time.Now()
// Check if the account has mappings and if so set the local readcache flag.
// We check here to make sure any changes such as config reload are reflected here.
@@ -1218,6 +1223,8 @@ func (c *client) readLoop(pre []byte) {
}
}
c.in.start = time.Now()
// Clear inbound stats cache
c.in.msgs = 0
c.in.bytes = 0
@@ -1236,7 +1243,7 @@ func (c *client) readLoop(pre []byte) {
// We don't need to do any of the things below, simply return.
return
}
if dur := time.Since(start); dur >= readLoopReportThreshold {
if dur := time.Since(c.in.start); dur >= readLoopReportThreshold {
c.Warnf("Readloop processing time: %v", dur)
}
// Need to call flushClients because some of the clients have been
@@ -1303,7 +1310,7 @@ func (c *client) readLoop(pre []byte) {
return
}
if dur := time.Since(start); dur >= readLoopReportThreshold {
if dur := time.Since(c.in.start); dur >= readLoopReportThreshold {
c.Warnf("Readloop processing time: %v", dur)
}
@@ -1314,7 +1321,7 @@ func (c *client) readLoop(pre []byte) {
return
}
if cpacc && (start.Sub(lpacc)) >= closedSubsCheckInterval {
if cpacc && (c.in.start.Sub(lpacc)) >= closedSubsCheckInterval {
c.pruneClosedSubFromPerAccountCache()
lpacc = time.Now()
}
@@ -3854,15 +3861,21 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
acc.mu.RLock()
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
checkJSGetNext := !isResponse && si.to == jsAllAPI && strings.HasPrefix(string(c.pa.subject), jsRequestNextPre)
if !shouldReturn && !isResponse && si.to == jsAllAPI {
subj := string(c.pa.subject)
if strings.HasPrefix(subj, jsRequestNextPre) || strings.HasPrefix(subj, jsDirectGetPre) {
checkJS = true
}
}
acc.mu.RUnlock()
// We have a special case where JetStream pulls in all service imports through one export.
// However the GetNext for consumers is a no-op and causes buildups of service imports,
// However the GetNext for consumers and DirectGet for streams are a no-op and causes buildups of service imports,
// response service imports and rrMap entries which all will need to simply expire.
// TODO(dlc) - Come up with something better.
if shouldReturn || (checkJSGetNext && si.se != nil && si.se.acc == c.srv.SystemAccount()) {
if shouldReturn || (checkJS && si.se != nil && si.se.acc == c.srv.SystemAccount()) {
return
}

View File

@@ -2599,7 +2599,20 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
return
}
_, msg = c.msgParts(msg)
o.processNextMsgRequest(reply, msg)
inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}
if inlineOk {
o.processNextMsgRequest(reply, msg)
} else {
go o.processNextMsgRequest(reply, copyBytes(msg))
}
}
func (o *consumer) processNextMsgRequest(reply string, msg []byte) {

View File

@@ -121,8 +121,11 @@ const (
// JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
// Will return the message similar to how a consumer receives the message, no JSON processing.
// If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
JSDirectMsgGet = "$JS.DS.GET.*"
JSDirectMsgGetT = "$JS.DS.GET.%s"
JSDirectMsgGet = "$JS.API.DIRECT.GET.*"
JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
// jsDirectGetPre
jsDirectGetPre = "$JS.API.DIRECT.GET"
// JSApiConsumerCreate is the endpoint to create ephemeral consumers for streams.
// Will return JSON response.

View File

@@ -4764,6 +4764,49 @@ func TestJetStreamClusterStreamGetMsg(t *testing.T) {
}
}
func TestJetStreamClusterStreamDirectGetMsg(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, _ := jsClientConnect(t, s)
defer nc.Close()
// Do by hand for now.
cfg := &StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: MemoryStorage,
Replicas: 3,
MaxMsgsPer: 1,
AllowDirect: true,
}
addStream(t, nc, cfg)
sendStreamMsg(t, nc, "foo", "bar")
getSubj := fmt.Sprintf(JSDirectMsgGetT, "TEST")
getMsg := func(req *JSApiMsgGetRequest) *nats.Msg {
var b []byte
var err error
if req != nil {
b, err = json.Marshal(req)
require_NoError(t, err)
}
m, err := nc.Request(getSubj, b, time.Second)
require_NoError(t, err)
return m
}
m := getMsg(&JSApiMsgGetRequest{LastFor: "foo"})
require_True(t, string(m.Data) == "bar")
require_True(t, m.Header.Get(JSStream) == "TEST")
require_True(t, m.Header.Get(JSSequence) == "1")
require_True(t, m.Header.Get(JSSubject) == "foo")
require_True(t, m.Subject != "foo")
require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_)
}
func TestJetStreamClusterStreamPerf(t *testing.T) {
// Comment out to run, holding place for now.
skip(t)

View File

@@ -3207,8 +3207,28 @@ func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Accou
return
}
inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}
if inlineOk {
mset.getDirectRequest(&req, reply)
} else {
go mset.getDirectRequest(&req, reply)
}
}
// Do actual work on a direct msg request.
// This could be called in a Go routine if we are inline for a non-client connection.
func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
var svp StoreMsg
var sm *StoreMsg
var err error
mset.mu.RLock()
store, name := mset.store, mset.cfg.Name