Merge pull request #1059 from nats-io/mp

MaxPayload async INFO updates
This commit is contained in:
Derek Collison
2019-07-08 16:05:17 -07:00
committed by GitHub
2 changed files with 54 additions and 18 deletions

View File

@@ -493,7 +493,8 @@ func (c *client) applyAccountLimits() {
c.mpay = c.acc.mpay
}
opts := c.srv.getOpts()
s := c.srv
opts := s.getOpts()
// We check here if the server has an option set that is lower than the account limit.
if c.mpay != jwt.NoLimit && opts.MaxPayload != 0 && int32(opts.MaxPayload) < c.acc.mpay {
@@ -529,16 +530,17 @@ func (c *client) RegisterUser(user *User) {
}
c.mu.Lock()
defer c.mu.Unlock()
// Assign permissions.
if user.Permissions == nil {
// Reset perms to nil in case client previously had them.
c.perms = nil
c.mperms = nil
c.mu.Unlock()
return
}
c.setPermissions(user.Permissions)
c.mu.Unlock()
}
// RegisterNkey allows auth to call back into a new nkey
@@ -1425,6 +1427,7 @@ func (c *client) sendPing() {
// Assume lock is held.
func (c *client) generateClientInfoJSON(info Info) []byte {
info.CID = c.cid
info.MaxPayload = c.mpay
// Generate the info json
b, _ := json.Marshal(info)
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
@@ -1460,29 +1463,25 @@ func (c *client) processPing() {
c.mu.Unlock()
return
}
c.sendPong()
// Record this to suppress us sending one if this
// is within a given time interval for activity.
c.ping.last = time.Now()
// If not a CLIENT, we are done
if c.kind != CLIENT {
// If not a CLIENT, we are done. Also the CONNECT should
// have been received, but make sure it is so before proceeding
if c.kind != CLIENT || !c.flags.isSet(connectReceived) {
c.mu.Unlock()
return
}
// The CONNECT should have been received, but make sure it
// is so before proceeding
if !c.flags.isSet(connectReceived) {
c.mu.Unlock()
return
}
// If we are here, the CONNECT has been received so we know
// if this client supports async INFO or not.
var (
checkClusterChange bool
srv = c.srv
checkInfoChange bool
srv = c.srv
)
// For older clients, just flip the firstPongSent flag if not already
// set and we are done.
@@ -1491,21 +1490,24 @@ func (c *client) processPing() {
} else {
// This is a client that supports async INFO protocols.
// If this is the first PING (so firstPongSent is not set yet),
// we will need to check if there was a change in cluster topology.
checkClusterChange = !c.flags.isSet(firstPongSent)
// we will need to check if there was a change in cluster topology
// or we have a different max payload. We will send this first before
// pong since most clients do flush after connect call.
checkInfoChange = !c.flags.isSet(firstPongSent)
}
c.mu.Unlock()
if checkClusterChange {
if checkInfoChange {
opts := srv.getOpts()
srv.mu.Lock()
c.mu.Lock()
// Now that we are under both locks, we can flip the flag.
// This prevents sendAsyncInfoToClients() and and code here
// to send a double INFO protocol.
// This prevents sendAsyncInfoToClients() and code here to
// send a double INFO protocol.
c.flags.set(firstPongSent)
// If there was a cluster update since this client was created,
// send an updated INFO protocol now.
if srv.lastCURLsUpdate >= c.start.UnixNano() {
if srv.lastCURLsUpdate >= c.start.UnixNano() || c.mpay != int32(opts.MaxPayload) {
c.sendInfo(c.generateClientInfoJSON(srv.copyInfo()))
}
c.mu.Unlock()

View File

@@ -21,7 +21,9 @@ import (
"testing"
"time"
"github.com/nats-io/jwt"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)
func TestMaxPayload(t *testing.T) {
@@ -122,3 +124,35 @@ func TestMaxPayloadOverrun(t *testing.T) {
send("PUB foo 18446744073709551615123\r\n")
expectDisconnect(t, c)
}
func TestAsyncInfoWithSmallerMaxPayload(t *testing.T) {
s, opts := runOperatorServer(t)
defer s.Shutdown()
const testMaxPayload = 522
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
nac.Limits.Payload = testMaxPayload
ajwt, err := nac.Encode(okp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
if err := s.AccountResolver().Store(apub, ajwt); err != nil {
t.Fatalf("Account Resolver returned an error: %v", err)
}
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
nc.Flush()
defer nc.Close()
if mp := nc.MaxPayload(); mp != testMaxPayload {
t.Fatalf("Expected MaxPayload of %d, got %d", testMaxPayload, mp)
}
}