Merge pull request #2304 from nats-io/clamp_max_subs_errors

[CHANGED] Reduce print for an account subs limit to every 2 sec
This commit is contained in:
Ivan Kozlovic
2021-06-22 11:39:57 -06:00
committed by GitHub
3 changed files with 81 additions and 1 deletions

View File

@@ -39,6 +39,10 @@ import (
// account will be grouped in the default global account.
const globalAccountName = DEFAULT_GLOBAL_ACCOUNT
const defaultMaxSubLimitReportThreshold = int64(2 * time.Second)
var maxSubLimitReportThreshold = defaultMaxSubLimitReportThreshold
// Account are subject namespace definitions. By default no messages are shared between accounts.
// You can share via Exports and Imports of Streams and Services.
type Account struct {
@@ -84,6 +88,7 @@ type Account struct {
defaultPerms *Permissions
tags jwt.TagList
nameTag string
lastLimErr int64
}
// Account based limits.
@@ -505,6 +510,22 @@ func (a *Account) TotalSubs() int {
return int(a.sl.Count())
}
func (a *Account) shouldLogMaxSubErr() bool {
if a == nil {
return true
}
a.mu.RLock()
last := a.lastLimErr
a.mu.RUnlock()
if now := time.Now().UnixNano(); now-last >= maxSubLimitReportThreshold {
a.mu.Lock()
a.lastLimErr = now
a.mu.Unlock()
return true
}
return false
}
// MapDest is for mapping published subjects for clients.
type MapDest struct {
Subject string `json:"subject"`

View File

@@ -1896,7 +1896,10 @@ func (c *client) maxConnExceeded() {
}
func (c *client) maxSubsExceeded() {
c.sendErrAndErr(ErrTooManySubs.Error())
if c.acc.shouldLogMaxSubErr() {
c.Errorf(ErrTooManySubs.Error())
}
c.sendErr(ErrTooManySubs.Error())
}
func (c *client) maxPayloadViolation(sz int, max int32) {

View File

@@ -21,6 +21,7 @@ import (
"io"
"math"
"net"
"net/url"
"reflect"
"regexp"
"runtime"
@@ -2517,3 +2518,58 @@ func TestClientLimits(t *testing.T) {
})
}
}
func TestClientClampMaxSubsErrReport(t *testing.T) {
maxSubLimitReportThreshold = int64(100 * time.Millisecond)
defer func() { maxSubLimitReportThreshold = defaultMaxSubLimitReportThreshold }()
o1 := DefaultOptions()
o1.MaxSubs = 1
o1.LeafNode.Host = "127.0.0.1"
o1.LeafNode.Port = -1
s1 := RunServer(o1)
defer s1.Shutdown()
l := &captureErrorLogger{errCh: make(chan string, 10)}
s1.SetLogger(l, false, false)
o2 := DefaultOptions()
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port))
o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
s2 := RunServer(o2)
defer s2.Shutdown()
checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s2)
nc := natsConnect(t, s2.ClientURL())
natsSubSync(t, nc, "foo")
natsSubSync(t, nc, "bar")
// Make sure we receive only 1
check := func() {
t.Helper()
for i := 0; i < 2; i++ {
select {
case errStr := <-l.errCh:
if i > 0 {
t.Fatalf("Should not have logged a second time: %s", errStr)
}
if !strings.Contains(errStr, "maximum subscriptions") {
t.Fatalf("Unexpected error: %s", errStr)
}
case <-time.After(300 * time.Millisecond):
if i == 0 {
t.Fatal("Error should have been logged")
}
}
}
}
check()
// The above will have waited long enough to clear the report threshold.
// So create two new subs and check again that we get only 1 report.
natsSubSync(t, nc, "baz")
natsSubSync(t, nc, "bat")
check()
}