Merge pull request #1084 from nats-io/rcheck

Check response perms more often, make sure we limit memory growth
This commit is contained in:
Derek Collison
2019-07-25 17:07:40 -07:00
committed by GitHub
2 changed files with 69 additions and 6 deletions

View File

@@ -177,7 +177,6 @@ type client struct {
subs map[string]*subscription
perms *permissions
replies map[string]*resp
rcheck time.Time
mperms *msgDeny
darray []string
in readCache
@@ -271,7 +270,6 @@ const (
pruneSize = 32
routeTargetInit = 8
replyPermLimit = 4096
replyCheckMin = 30 * time.Second
)
// Used in readloop to cache hot subject lookups and group statistics.
@@ -613,7 +611,6 @@ func (c *client) setPermissions(perms *Permissions) {
rp := *perms.Response
c.perms.resp = &rp
c.replies = make(map[string]*resp)
c.rcheck = time.Now()
}
// Loop over subscribe permissions
@@ -2189,18 +2186,18 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
// in our reply cache. We make sure to not check too often.
func (c *client) pruneReplyPerms() {
// Make sure we do not check too often.
if c.perms.resp == nil || time.Since(c.rcheck) < replyCheckMin {
if c.perms.resp == nil {
return
}
mm := c.perms.resp.MaxMsgs
ttl := c.perms.resp.Expires
c.rcheck = time.Now()
now := time.Now()
for k, resp := range c.replies {
if mm > 0 && resp.n >= mm {
delete(c.replies, k)
} else if ttl > 0 && c.rcheck.Sub(resp.t) > ttl {
} else if ttl > 0 && now.Sub(resp.t) > ttl {
delete(c.replies, k)
}
}

View File

@@ -20,6 +20,8 @@ import (
"io/ioutil"
"net"
"os"
"regexp"
"runtime"
"sync"
"testing"
"time"
@@ -204,3 +206,67 @@ func TestNoRaceRouteSendSubs(t *testing.T) {
t.Fatalf("Error on reload: %v", err)
}
}
func TestNoRaceDynamicResponsePermsMemory(t *testing.T) {
srv, opts := RunServerWithConfig("./configs/authorization.conf")
defer srv.Shutdown()
// We will test the timeout to make sure that we are not showing excessive growth
// when a reply subject is not utilized by the responder.
// Alice can do anything, so she will be our requestor
rc := createClientConn(t, opts.Host, opts.Port)
defer rc.Close()
expectAuthRequired(t, rc)
doAuthConnect(t, rc, "", "alice", DefaultPass)
expectResult(t, rc, okRe)
// MY_STREAM_SERVICE has an expiration of 10ms for the response permissions.
c := createClientConn(t, opts.Host, opts.Port)
defer c.Close()
expectAuthRequired(t, c)
doAuthConnect(t, c, "", "svcb", DefaultPass)
expectResult(t, c, okRe)
sendProto(t, c, "SUB my.service.req 1\r\n")
expectResult(t, c, okRe)
var m runtime.MemStats
runtime.GC()
runtime.ReadMemStats(&m)
pta := m.TotalAlloc
// Need this so we do not blow the allocs on expectResult which makes 32k each time.
expBuf := make([]byte, 32768)
expect := func(c net.Conn, re *regexp.Regexp) {
t.Helper()
c.SetReadDeadline(time.Now().Add(2 * time.Second))
n, _ := c.Read(expBuf)
c.SetReadDeadline(time.Time{})
buf := expBuf[:n]
if !re.Match(buf) {
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'", buf, re)
}
}
// Now send off some requests. We will not answer them and this will build up reply
// permissions in the server.
for i := 0; i < 10000; i++ {
pub := fmt.Sprintf("PUB my.service.req resp.%d 2\r\nok\r\n", i)
sendProto(t, rc, pub)
expect(rc, okRe)
expect(c, msgRe)
}
const max = 20 * 1024 * 1024 // 20MB
checkFor(t, time.Second, 25*time.Millisecond, func() error {
runtime.GC()
runtime.ReadMemStats(&m)
used := m.TotalAlloc - pta
if used > max {
return fmt.Errorf("Using too much memory, expect < 20MB, got %dMB", used/(1024*1024))
}
return nil
})
}