diff --git a/server/client.go b/server/client.go index f6d6239d..a7aa6356 100644 --- a/server/client.go +++ b/server/client.go @@ -3187,11 +3187,15 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su client.mu.Unlock() // Internal account clients are for service imports and need the '\r\n'. + start := time.Now() if client.kind == ACCOUNT { sub.icb(sub, c, acc, string(subject), string(reply), msg) } else { sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize]) } + if dur := time.Since(start); dur >= readLoopReportThreshold { + srv.Warnf("Internal subscription on %q took too long: %v", subject, dur) + } return true } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 6f90ba61..fc9fde03 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -760,7 +760,11 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // If this is directly from a client connection ok to do in place. if c.kind != ROUTER && c.kind != GATEWAY { + start := time.Now() jsub.icb(sub, c, acc, subject, reply, rmsg) + if dur := time.Since(start); dur >= readLoopReportThreshold { + s.Warnf("Internal subscription on %q took too long: %v", subject, dur) + } return } @@ -788,7 +792,11 @@ func (s *Server) processJSAPIRoutedRequests() { for _, req := range reqs { r := req.(*jsAPIRoutedReq) client.pa = r.pa + start := time.Now() r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg) + if dur := time.Since(start); dur >= readLoopReportThreshold { + s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur) + } } queue.recycle(&reqs) case <-s.quitCh: