diff --git a/server/accounts.go b/server/accounts.go index 55dcdb30..1a083ec2 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2420,6 +2420,16 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim return clients } + jsEnabled := s.JetStreamEnabled() + if jsEnabled && a == s.SystemAccount() { + for _, export := range allJsExports { + s.Debugf("Adding jetstream service export %q for %s", export, a.Name) + if err := a.AddServiceExport(export, nil); err != nil { + s.Errorf("Error setting up jetstream service exports: %v", err) + } + } + } + for _, e := range ac.Exports { switch e.Type { case jwt.Stream: @@ -2591,6 +2601,23 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim for _, i := range incompleteImports { s.incompleteAccExporterMap.Store(i.Account, struct{}{}) } + if a.srv == nil { + a.srv = s + } + if jsEnabled { + if ac.Limits.JetStreamLimits.DiskStorage != 0 || ac.Limits.JetStreamLimits.MemoryStorage != 0 { + // JetStreamAccountLimits and jwt.JetStreamLimits use same value for unlimited + a.jsLimits = &JetStreamAccountLimits{ + MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage, + MaxStore: ac.Limits.JetStreamLimits.DiskStorage, + MaxStreams: int(ac.Limits.JetStreamLimits.Streams), + MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer), + } + } else if a.jsLimits != nil { + // covers failed update followed by disable + a.jsLimits = nil + } + } a.mu.Unlock() clients := gatherClients() @@ -2600,6 +2627,17 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim return clients[i].start.After(clients[j].start) }) } + + if jsEnabled { + if err := s.configJetStream(a); err != nil { + s.Errorf("Error configuring jetstream for account [%s]: %v", a.Name, err.Error()) + a.mu.Lock() + // Absent reload of js server cfg, this is going to be broken until js is disabled + a.incomplete = true + a.mu.Unlock() + } + } + now := time.Now().Unix() for i, c := range clients { a.mu.RLock() diff --git a/server/jetstream.go b/server/jetstream.go index cd7a9b50..da3eeb1c 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -227,51 +227,54 @@ func (a *Account) enableJetStreamInfoServiceImportOnly() error { return nil } +func (s *Server) configJetStream(acc *Account) error { + if acc.jsLimits != nil { + // Check if already enabled. This can be during a reload. + if acc.JetStreamEnabled() { + if err := acc.enableAllJetStreamServiceImports(); err != nil { + return err + } + if err := acc.UpdateJetStreamLimits(acc.jsLimits); err != nil { + return err + } + } else if err := acc.EnableJetStream(acc.jsLimits); err != nil { + return err + } + acc.jsLimits = nil + } else if acc != s.SystemAccount() { + if acc.JetStreamEnabled() { + acc.DisableJetStream() + } + // We will setup basic service imports to respond to + // requests if JS is enabled for this account. + if err := acc.enableJetStreamInfoServiceImportOnly(); err != nil { + return err + } + } + return nil +} + // configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested. func (s *Server) configAllJetStreamAccounts() error { var jsAccounts []*Account // Snapshot into our own list. Might not be needed. s.mu.Lock() + // Bail if server not enabled. If it was enabled and a reload turns it off + // that will be handled elsewhere. + if s.js == nil { + s.mu.Unlock() + return nil + } s.accounts.Range(func(k, v interface{}) bool { jsAccounts = append(jsAccounts, v.(*Account)) return true }) - enabled := s.js != nil s.mu.Unlock() - - // Bail if server not enabled. If it was enabled and a reload turns it off - // that will be handled elsewhere. - if !enabled { - return nil - } - - sys := s.SystemAccount() - // Process any jetstream enabled accounts here. for _, acc := range jsAccounts { - if acc.jsLimits != nil { - // Check if already enabled. This can be during a reload. - if acc.JetStreamEnabled() { - if err := acc.enableAllJetStreamServiceImports(); err != nil { - return err - } - if err := acc.UpdateJetStreamLimits(acc.jsLimits); err != nil { - return err - } - } else if err := acc.EnableJetStream(acc.jsLimits); err != nil { - return err - } - acc.jsLimits = nil - } else if acc != sys { - if acc.JetStreamEnabled() { - acc.DisableJetStream() - } - // We will setup basic service imports to respond to - // requests if JS is enabled for this account. - if err := acc.enableJetStreamInfoServiceImportOnly(); err != nil { - return err - } + if err := s.configJetStream(acc); err != nil { + return err } } return nil diff --git a/server/jwt_test.go b/server/jwt_test.go index d6e379d8..98773ea1 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3836,3 +3836,194 @@ func TestJWTNoOperatorMode(t *testing.T) { }) } } + +func TestJWTJetStreamLimits(t *testing.T) { + updateJwt := func(url string, creds string, pubKey string, jwt string) { + t.Helper() + c := natsConnect(t, url, nats.UserCredentials(creds)) + defer c.Close() + if msg, err := c.Request(fmt.Sprintf(accUpdateEventSubj, pubKey), []byte(jwt), time.Second); err != nil { + t.Fatal("error not expected in this test", err) + } else { + content := make(map[string]interface{}) + if err := json.Unmarshal(msg.Data, &content); err != nil { + t.Fatalf("%v", err) + } else if _, ok := content["data"]; !ok { + t.Fatalf("did not get an ok response got: %v", content) + } + } + } + require_IdenticalLimits := func(infoLim JetStreamAccountLimits, lim jwt.JetStreamLimits) { + t.Helper() + if int64(infoLim.MaxConsumers) != lim.Consumer || int64(infoLim.MaxStreams) != lim.Streams || + infoLim.MaxMemory != lim.MemoryStorage || infoLim.MaxStore != lim.DiskStorage { + t.Fatalf("limits do not match %v != %v", infoLim, lim) + } + } + expect_JSDisabledForAccount := func(c *nats.Conn) { + t.Helper() + if _, err := c.Request("$JS.API.INFO", nil, time.Second); err != nats.ErrTimeout { + t.Fatalf("Unexpected error: %v", err) + } + } + expect_InfoError := func(c *nats.Conn) { + t.Helper() + var info JSApiAccountInfoResponse + if resp, err := c.Request("$JS.API.INFO", nil, time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if err = json.Unmarshal(resp.Data, &info); err != nil { + t.Fatalf("response1 %v got error %v", string(resp.Data), err) + } else if info.Error == nil { + t.Fatalf("expected error") + } + } + validate_limits := func(c *nats.Conn, expectedLimits jwt.JetStreamLimits) { + t.Helper() + var info JSApiAccountInfoResponse + if resp, err := c.Request("$JS.API.INFO", nil, time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if err = json.Unmarshal(resp.Data, &info); err != nil { + t.Fatalf("response1 %v got error %v", string(resp.Data), err) + } else { + require_IdenticalLimits(info.Limits, expectedLimits) + } + } + // create system account + sysKp, _ := nkeys.CreateAccount() + sysPub, _ := sysKp.PublicKey() + claim := jwt.NewAccountClaims(sysPub) + sysJwt, err := claim.Encode(oKp) + require_NoError(t, err) + sysUKp, _ := nkeys.CreateUser() + sysUSeed, _ := sysUKp.Seed() + uclaim := newJWTTestUserClaims() + uclaim.Subject, _ = sysUKp.PublicKey() + sysUserJwt, err := uclaim.Encode(sysKp) + require_NoError(t, err) + sysKp.Seed() + sysCreds := genCredsFile(t, sysUserJwt, sysUSeed) + // limits to apply and check + limits1 := jwt.JetStreamLimits{MemoryStorage: 1024 * 1024, DiskStorage: 2048 * 1024, Streams: 1, Consumer: 2} + // has valid limits that would fail when incorrectly applied twice + limits2 := jwt.JetStreamLimits{MemoryStorage: 4096 * 1024, DiskStorage: 8192 * 1024, Streams: 3, Consumer: 4} + // limits exceeding actual configured value of DiskStorage + limitsExceeded := jwt.JetStreamLimits{MemoryStorage: 8192 * 1024, DiskStorage: 16384 * 1024, Streams: 5, Consumer: 6} + // create account using jetstream with both limits + akp, _ := nkeys.CreateAccount() + aPub, _ := akp.PublicKey() + claim = jwt.NewAccountClaims(aPub) + claim.Limits.JetStreamLimits = limits1 + aJwt1, err := claim.Encode(oKp) + require_NoError(t, err) + claim.Limits.JetStreamLimits = limits2 + aJwt2, err := claim.Encode(oKp) + require_NoError(t, err) + claim.Limits.JetStreamLimits = limitsExceeded + aJwtLimitsExceeded, err := claim.Encode(oKp) + require_NoError(t, err) + claim.Limits.JetStreamLimits = jwt.JetStreamLimits{} // disabled + aJwt4, err := claim.Encode(oKp) + require_NoError(t, err) + // account user + uKp, _ := nkeys.CreateUser() + uSeed, _ := uKp.Seed() + uclaim = newJWTTestUserClaims() + uclaim.Subject, _ = uKp.PublicKey() + userJwt, err := uclaim.Encode(akp) + require_NoError(t, err) + userCreds := genCredsFile(t, userJwt, uSeed) + dir, err := ioutil.TempDir("", "srv") + require_NoError(t, err) + defer os.RemoveAll(dir) + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb} + operator: %s + resolver: { + type: full + dir: %s + } + system_account: %s + `, ojwt, dir, sysPub))) + defer os.Remove(conf) + opts := LoadConfig(conf) + s := RunServer(opts) + defer s.Shutdown() + + updateJwt(s.ClientURL(), sysCreds, sysPub, sysJwt) + sys := natsConnect(t, s.ClientURL(), nats.UserCredentials(sysCreds)) + expect_InfoError(sys) + sys.Close() + updateJwt(s.ClientURL(), sysCreds, aPub, aJwt1) + c := natsConnect(t, s.ClientURL(), nats.UserCredentials(userCreds), nats.ReconnectWait(200*time.Millisecond)) + defer c.Close() + validate_limits(c, limits1) + // keep using the same connection + updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2) + validate_limits(c, limits2) + // keep using the same connection but do NOT CHANGE anything. + // This tests if the jwt is applied a second time (would fail) + updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2) + validate_limits(c, limits2) + // keep using the same connection. This update EXCEEDS LIMITS + updateJwt(s.ClientURL(), sysCreds, aPub, aJwtLimitsExceeded) + validate_limits(c, limits2) + // disable test after failure + updateJwt(s.ClientURL(), sysCreds, aPub, aJwt4) + expect_InfoError(c) + // re enable, again testing with a value that can't be applied twice + updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2) + validate_limits(c, limits2) + // disable test no prior failure + updateJwt(s.ClientURL(), sysCreds, aPub, aJwt4) + expect_InfoError(c) + // Wrong limits form start + updateJwt(s.ClientURL(), sysCreds, aPub, aJwtLimitsExceeded) + expect_JSDisabledForAccount(c) + // enable js but exceed limits. Followed by fix via restart + updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2) + validate_limits(c, limits2) + updateJwt(s.ClientURL(), sysCreds, aPub, aJwtLimitsExceeded) + validate_limits(c, limits2) + + s.Shutdown() + conf = createConfFile(t, []byte(fmt.Sprintf(` + listen: %d + jetstream: {max_mem_store: 20Mb, max_file_store: 20Mb} + operator: %s + resolver: { + type: full + dir: %s + } + system_account: %s + `, opts.Port, ojwt, dir, sysPub))) + defer os.Remove(conf) + s = RunServer(LoadConfig(conf)) + c.Flush() // force client to discover the disconnect + checkClientsCount(t, s, 1) + validate_limits(c, limitsExceeded) + + // disable jetstream test + s.Shutdown() + conf = createConfFile(t, []byte(fmt.Sprintf(` + listen: %d + operator: %s + resolver: { + type: full + dir: %s + } + system_account: %s + `, opts.Port, ojwt, dir, sysPub))) + defer os.Remove(conf) + opts = LoadConfig(conf) + opts.NoLog = false + opts.Debug = true + opts.Trace = true + s = RunServer(opts) + c.Flush() // force client to discover the disconnect + checkClientsCount(t, s, 1) + expect_JSDisabledForAccount(c) + // test that it stays disabled + updateJwt(s.ClientURL(), sysCreds, aPub, aJwt2) + expect_JSDisabledForAccount(c) +} diff --git a/server/server.go b/server/server.go index aa38273a..00b63980 100644 --- a/server/server.go +++ b/server/server.go @@ -721,6 +721,10 @@ func (s *Server) generateRouteInfoJSON() { func (s *Server) globalAccountOnly() bool { var hasOthers bool + if len(s.trustedKeys) > 0 { + return false + } + s.mu.Lock() s.accounts.Range(func(k, v interface{}) bool { acc := v.(*Account)