Added warning if internal sub callback takes too long

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-10-10 14:39:37 -06:00
parent ffc1797867
commit 3358247e6b
2 changed files with 12 additions and 0 deletions

View File

@@ -3187,11 +3187,15 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
client.mu.Unlock()
// Internal account clients are for service imports and need the '\r\n'.
start := time.Now()
if client.kind == ACCOUNT {
sub.icb(sub, c, acc, string(subject), string(reply), msg)
} else {
sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize])
}
if dur := time.Since(start); dur >= readLoopReportThreshold {
srv.Warnf("Internal subscription on %q took too long: %v", subject, dur)
}
return true
}

View File

@@ -760,7 +760,11 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// If this is directly from a client connection ok to do in place.
if c.kind != ROUTER && c.kind != GATEWAY {
start := time.Now()
jsub.icb(sub, c, acc, subject, reply, rmsg)
if dur := time.Since(start); dur >= readLoopReportThreshold {
s.Warnf("Internal subscription on %q took too long: %v", subject, dur)
}
return
}
@@ -788,7 +792,11 @@ func (s *Server) processJSAPIRoutedRequests() {
for _, req := range reqs {
r := req.(*jsAPIRoutedReq)
client.pa = r.pa
start := time.Now()
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
if dur := time.Since(start); dur >= readLoopReportThreshold {
s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
}
}
queue.recycle(&reqs)
case <-s.quitCh: