From 8bfe14bbfd75f2f9f8fedd76662f523837ce0a7f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 25 Jul 2019 16:53:54 -0700 Subject: [PATCH] check response perms more often, make sure we limit memory growth Signed-off-by: Derek Collison --- server/client.go | 9 +++---- test/norace_test.go | 66 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/server/client.go b/server/client.go index 6baae0ab..5cdb8845 100644 --- a/server/client.go +++ b/server/client.go @@ -177,7 +177,6 @@ type client struct { subs map[string]*subscription perms *permissions replies map[string]*resp - rcheck time.Time mperms *msgDeny darray []string in readCache @@ -271,7 +270,6 @@ const ( pruneSize = 32 routeTargetInit = 8 replyPermLimit = 4096 - replyCheckMin = 30 * time.Second ) // Used in readloop to cache hot subject lookups and group statistics. @@ -613,7 +611,6 @@ func (c *client) setPermissions(perms *Permissions) { rp := *perms.Response c.perms.resp = &rp c.replies = make(map[string]*resp) - c.rcheck = time.Now() } // Loop over subscribe permissions @@ -2189,18 +2186,18 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { // in our reply cache. We make sure to not check too often. func (c *client) pruneReplyPerms() { // Make sure we do not check too often. - if c.perms.resp == nil || time.Since(c.rcheck) < replyCheckMin { + if c.perms.resp == nil { return } mm := c.perms.resp.MaxMsgs ttl := c.perms.resp.Expires - c.rcheck = time.Now() + now := time.Now() for k, resp := range c.replies { if mm > 0 && resp.n >= mm { delete(c.replies, k) - } else if ttl > 0 && c.rcheck.Sub(resp.t) > ttl { + } else if ttl > 0 && now.Sub(resp.t) > ttl { delete(c.replies, k) } } diff --git a/test/norace_test.go b/test/norace_test.go index 7c19e714..413ff40c 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -20,6 +20,8 @@ import ( "io/ioutil" "net" "os" + "regexp" + "runtime" "sync" "testing" "time" @@ -204,3 +206,67 @@ func TestNoRaceRouteSendSubs(t *testing.T) { t.Fatalf("Error on reload: %v", err) } } + +func TestNoRaceDynamicResponsePermsMemory(t *testing.T) { + srv, opts := RunServerWithConfig("./configs/authorization.conf") + defer srv.Shutdown() + + // We will test the timeout to make sure that we are not showing excessive growth + // when a reply subject is not utilized by the responder. + + // Alice can do anything, so she will be our requestor + rc := createClientConn(t, opts.Host, opts.Port) + defer rc.Close() + expectAuthRequired(t, rc) + doAuthConnect(t, rc, "", "alice", DefaultPass) + expectResult(t, rc, okRe) + + // MY_STREAM_SERVICE has an expiration of 10ms for the response permissions. + c := createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "svcb", DefaultPass) + expectResult(t, c, okRe) + + sendProto(t, c, "SUB my.service.req 1\r\n") + expectResult(t, c, okRe) + + var m runtime.MemStats + + runtime.GC() + runtime.ReadMemStats(&m) + pta := m.TotalAlloc + + // Need this so we do not blow the allocs on expectResult which makes 32k each time. + expBuf := make([]byte, 32768) + expect := func(c net.Conn, re *regexp.Regexp) { + t.Helper() + c.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, _ := c.Read(expBuf) + c.SetReadDeadline(time.Time{}) + buf := expBuf[:n] + if !re.Match(buf) { + t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'", buf, re) + } + } + + // Now send off some requests. We will not answer them and this will build up reply + // permissions in the server. + for i := 0; i < 10000; i++ { + pub := fmt.Sprintf("PUB my.service.req resp.%d 2\r\nok\r\n", i) + sendProto(t, rc, pub) + expect(rc, okRe) + expect(c, msgRe) + } + + const max = 20 * 1024 * 1024 // 20MB + checkFor(t, time.Second, 25*time.Millisecond, func() error { + runtime.GC() + runtime.ReadMemStats(&m) + used := m.TotalAlloc - pta + if used > max { + return fmt.Errorf("Using too much memory, expect < 20MB, got %dMB", used/(1024*1024)) + } + return nil + }) +}