diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3895bafe..7aadc30e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6631,6 +6631,14 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ if err != nil && mset.clseq > 0 { mset.clseq-- } + + // Check to see if we are being overrun. + // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. + const warnThreshold = 10_000 + if mset.clseq-lseq > warnThreshold { + lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, mset.cfg.Name) + s.RateLimitWarnf(lerr.Error()) + } mset.clMu.Unlock() if err != nil {