From aa52c2fecf73d2e9c236330b6a242298f375c00d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 25 Oct 2022 16:11:35 -0700 Subject: [PATCH 1/2] Added warning for high message lag into a clustered stream. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3895bafe..e3ff6b61 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6631,6 +6631,14 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ if err != nil && mset.clseq > 0 { mset.clseq-- } + + // Check to see if we are being overrun. + // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. + const warnThreshold = 10_000 + if mset.clseq-lseq > warnThreshold { + err := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, mset.cfg.Name) + s.RateLimitWarnf(err.Error()) + } mset.clMu.Unlock() if err != nil { From 2241ad089e2cc03273e1bf7aed7980d0eacdfd38 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 25 Oct 2022 16:56:10 -0700 Subject: [PATCH 2/2] Make local error since non-fatal for now. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e3ff6b61..7aadc30e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6636,8 +6636,8 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. const warnThreshold = 10_000 if mset.clseq-lseq > warnThreshold { - err := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, mset.cfg.Name) - s.RateLimitWarnf(err.Error()) + lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, mset.cfg.Name) + s.RateLimitWarnf(lerr.Error()) } mset.clMu.Unlock()