Updating jetstream account settings from jwt

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2020-09-04 15:56:08 -04:00
parent 9c210284a4
commit a3c5fd4575
4 changed files with 268 additions and 32 deletions

View File

@@ -2420,6 +2420,16 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
return clients
}
jsEnabled := s.JetStreamEnabled()
if jsEnabled && a == s.SystemAccount() {
for _, export := range allJsExports {
s.Debugf("Adding jetstream service export %q for %s", export, a.Name)
if err := a.AddServiceExport(export, nil); err != nil {
s.Errorf("Error setting up jetstream service exports: %v", err)
}
}
}
for _, e := range ac.Exports {
switch e.Type {
case jwt.Stream:
@@ -2591,6 +2601,23 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
for _, i := range incompleteImports {
s.incompleteAccExporterMap.Store(i.Account, struct{}{})
}
if a.srv == nil {
a.srv = s
}
if jsEnabled {
if ac.Limits.JetStreamLimits.DiskStorage != 0 || ac.Limits.JetStreamLimits.MemoryStorage != 0 {
// JetStreamAccountLimits and jwt.JetStreamLimits use same value for unlimited
a.jsLimits = &JetStreamAccountLimits{
MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage,
MaxStore: ac.Limits.JetStreamLimits.DiskStorage,
MaxStreams: int(ac.Limits.JetStreamLimits.Streams),
MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer),
}
} else if a.jsLimits != nil {
// covers failed update followed by disable
a.jsLimits = nil
}
}
a.mu.Unlock()
clients := gatherClients()
@@ -2600,6 +2627,17 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
return clients[i].start.After(clients[j].start)
})
}
if jsEnabled {
if err := s.configJetStream(a); err != nil {
s.Errorf("Error configuring jetstream for account [%s]: %v", a.Name, err.Error())
a.mu.Lock()
// Absent reload of js server cfg, this is going to be broken until js is disabled
a.incomplete = true
a.mu.Unlock()
}
}
now := time.Now().Unix()
for i, c := range clients {
a.mu.RLock()

View File

@@ -227,51 +227,54 @@ func (a *Account) enableJetStreamInfoServiceImportOnly() error {
return nil
}
func (s *Server) configJetStream(acc *Account) error {
if acc.jsLimits != nil {
// Check if already enabled. This can be during a reload.
if acc.JetStreamEnabled() {
if err := acc.enableAllJetStreamServiceImports(); err != nil {
return err
}
if err := acc.UpdateJetStreamLimits(acc.jsLimits); err != nil {
return err
}
} else if err := acc.EnableJetStream(acc.jsLimits); err != nil {
return err
}
acc.jsLimits = nil
} else if acc != s.SystemAccount() {
if acc.JetStreamEnabled() {
acc.DisableJetStream()
}
// We will setup basic service imports to respond to
// requests if JS is enabled for this account.
if err := acc.enableJetStreamInfoServiceImportOnly(); err != nil {
return err
}
}
return nil
}
// configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested.
func (s *Server) configAllJetStreamAccounts() error {
var jsAccounts []*Account
// Snapshot into our own list. Might not be needed.
s.mu.Lock()
// Bail if server not enabled. If it was enabled and a reload turns it off
// that will be handled elsewhere.
if s.js == nil {
s.mu.Unlock()
return nil
}
s.accounts.Range(func(k, v interface{}) bool {
jsAccounts = append(jsAccounts, v.(*Account))
return true
})
enabled := s.js != nil
s.mu.Unlock()
// Bail if server not enabled. If it was enabled and a reload turns it off
// that will be handled elsewhere.
if !enabled {
return nil
}
sys := s.SystemAccount()
// Process any jetstream enabled accounts here.
for _, acc := range jsAccounts {
if acc.jsLimits != nil {
// Check if already enabled. This can be during a reload.
if acc.JetStreamEnabled() {
if err := acc.enableAllJetStreamServiceImports(); err != nil {
return err
}
if err := acc.UpdateJetStreamLimits(acc.jsLimits); err != nil {
return err
}
} else if err := acc.EnableJetStream(acc.jsLimits); err != nil {
return err
}
acc.jsLimits = nil
} else if acc != sys {
if acc.JetStreamEnabled() {
acc.DisableJetStream()
}
// We will setup basic service imports to respond to
// requests if JS is enabled for this account.
if err := acc.enableJetStreamInfoServiceImportOnly(); err != nil {
return err
}
if err := s.configJetStream(acc); err != nil {
return err
}
}
return nil

View File

@@ -3836,3 +3836,194 @@ func TestJWTNoOperatorMode(t *testing.T) {
})
}
}
func TestJWTJetStreamLimits(t *testing.T) {
updateJwt := func(url string, creds string, pubKey string, jwt string) {
t.Helper()
c := natsConnect(t, url, nats.UserCredentials(creds))
defer c.Close()
if msg, err := c.Request(fmt.Sprintf(accUpdateEventSubj, pubKey), []byte(jwt), time.Second); err != nil {
t.Fatal("error not expected in this test", err)
} else {
content := make(map[string]interface{})
if err := json.Unmarshal(msg.Data, &content); err != nil {
t.Fatalf("%v", err)
} else if _, ok := content["data"]; !ok {
t.Fatalf("did not get an ok response got: %v", content)
}
}
}
require_IdenticalLimits := func(infoLim JetStreamAccountLimits, lim jwt.JetStreamLimits) {
t.Helper()
if int64(infoLim.MaxConsumers) != lim.Consumer || int64(infoLim.MaxStreams) != lim.Streams ||
infoLim.MaxMemory != lim.MemoryStorage || infoLim.MaxStore != lim.DiskStorage {
t.Fatalf("limits do not match %v != %v", infoLim, lim)
}
}
expect_JSDisabledForAccount := func(c *nats.Conn) {
t.Helper()
if _, err := c.Request("$JS.API.INFO", nil, time.Second); err != nats.ErrTimeout {
t.Fatalf("Unexpected error: %v", err)
}
}
expect_InfoError := func(c *nats.Conn) {
t.Helper()
var info JSApiAccountInfoResponse
if resp, err := c.Request("$JS.API.INFO", nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if err = json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("response1 %v got error %v", string(resp.Data), err)
} else if info.Error == nil {
t.Fatalf("expected error")
}
}
validate_limits := func(c *nats.Conn, expectedLimits jwt.JetStreamLimits) {
t.Helper()
var info JSApiAccountInfoResponse
if resp, err := c.Request("$JS.API.INFO", nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if err = json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("response1 %v got error %v", string(resp.Data), err)
} else {
require_IdenticalLimits(info.Limits, expectedLimits)
}
}
// create system account
sysKp, _ := nkeys.CreateAccount()
sysPub, _ := sysKp.PublicKey()
claim := jwt.NewAccountClaims(sysPub)
sysJwt, err := claim.Encode(oKp)
require_NoError(t, err)
sysUKp, _ := nkeys.CreateUser()
sysUSeed, _ := sysUKp.Seed()
uclaim := newJWTTestUserClaims()
uclaim.Subject, _ = sysUKp.PublicKey()
sysUserJwt, err := uclaim.Encode(sysKp)
require_NoError(t, err)
sysKp.Seed()
sysCreds := genCredsFile(t, sysUserJwt, sysUSeed)
// limits to apply and check
limits1 := jwt.JetStreamLimits{MemoryStorage: 1024 * 1024, DiskStorage: 2048 * 1024, Streams: 1, Consumer: 2}
// has valid limits that would fail when incorrectly applied twice
limits2 := jwt.JetStreamLimits{MemoryStorage: 4096 * 1024, DiskStorage: 8192 * 1024, Streams: 3, Consumer: 4}
// limits exceeding actual configured value of DiskStorage
limitsExceeded := jwt.JetStreamLimits{MemoryStorage: 8192 * 1024, DiskStorage: 16384 * 1024, Streams: 5, Consumer: 6}
// create account using jetstream with both limits
akp, _ := nkeys.CreateAccount()
aPub, _ := akp.PublicKey()
claim = jwt.NewAccountClaims(aPub)
claim.Limits.JetStreamLimits = limits1
aJwt1, err := claim.Encode(oKp)
require_NoError(t, err)
claim.Limits.JetStreamLimits = limits2
aJwt2, err := claim.Encode(oKp)
require_NoError(t, err)
claim.Limits.JetStreamLimits = limitsExceeded
aJwtLimitsExceeded, err := claim.Encode(oKp)
require_NoError(t, err)
claim.Limits.JetStreamLimits = jwt.JetStreamLimits{} // disabled
aJwt4, err := claim.Encode(oKp)
require_NoError(t, err)
// account user
uKp, _ := nkeys.CreateUser()
uSeed, _ := uKp.Seed()
uclaim = newJWTTestUserClaims()
uclaim.Subject, _ = uKp.PublicKey()
userJwt, err := uclaim.Encode(akp)
require_NoError(t, err)
userCreds := genCredsFile(t, userJwt, uSeed)
dir, err := ioutil.TempDir("", "srv")
require_NoError(t, err)
defer os.RemoveAll(dir)
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: -1
jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb}
operator: %s
resolver: {
type: full
dir: %s
}
system_account: %s
`, ojwt, dir, sysPub)))
defer os.Remove(conf)
opts := LoadConfig(conf)
s := RunServer(opts)
defer s.Shutdown()
updateJwt(s.ClientURL(), sysCreds, sysPub, sysJwt)
sys := natsConnect(t, s.ClientURL(), nats.UserCredentials(sysCreds))
expect_InfoError(sys)
sys.Close()
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt1)
c := natsConnect(t, s.ClientURL(), nats.UserCredentials(userCreds), nats.ReconnectWait(200*time.Millisecond))
defer c.Close()
validate_limits(c, limits1)
// keep using the same connection
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2)
validate_limits(c, limits2)
// keep using the same connection but do NOT CHANGE anything.
// This tests if the jwt is applied a second time (would fail)
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2)
validate_limits(c, limits2)
// keep using the same connection. This update EXCEEDS LIMITS
updateJwt(s.ClientURL(), sysCreds, aPub, aJwtLimitsExceeded)
validate_limits(c, limits2)
// disable test after failure
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt4)
expect_InfoError(c)
// re enable, again testing with a value that can't be applied twice
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2)
validate_limits(c, limits2)
// disable test no prior failure
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt4)
expect_InfoError(c)
// Wrong limits form start
updateJwt(s.ClientURL(), sysCreds, aPub, aJwtLimitsExceeded)
expect_JSDisabledForAccount(c)
// enable js but exceed limits. Followed by fix via restart
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2)
validate_limits(c, limits2)
updateJwt(s.ClientURL(), sysCreds, aPub, aJwtLimitsExceeded)
validate_limits(c, limits2)
s.Shutdown()
conf = createConfFile(t, []byte(fmt.Sprintf(`
listen: %d
jetstream: {max_mem_store: 20Mb, max_file_store: 20Mb}
operator: %s
resolver: {
type: full
dir: %s
}
system_account: %s
`, opts.Port, ojwt, dir, sysPub)))
defer os.Remove(conf)
s = RunServer(LoadConfig(conf))
c.Flush() // force client to discover the disconnect
checkClientsCount(t, s, 1)
validate_limits(c, limitsExceeded)
// disable jetstream test
s.Shutdown()
conf = createConfFile(t, []byte(fmt.Sprintf(`
listen: %d
operator: %s
resolver: {
type: full
dir: %s
}
system_account: %s
`, opts.Port, ojwt, dir, sysPub)))
defer os.Remove(conf)
opts = LoadConfig(conf)
opts.NoLog = false
opts.Debug = true
opts.Trace = true
s = RunServer(opts)
c.Flush() // force client to discover the disconnect
checkClientsCount(t, s, 1)
expect_JSDisabledForAccount(c)
// test that it stays disabled
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2)
expect_JSDisabledForAccount(c)
}

View File

@@ -721,6 +721,10 @@ func (s *Server) generateRouteInfoJSON() {
func (s *Server) globalAccountOnly() bool {
var hasOthers bool
if len(s.trustedKeys) > 0 {
return false
}
s.mu.Lock()
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)