mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
First pass at account config for jetstream server reload support
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -166,33 +166,98 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
|
||||
if err := s.GlobalAccount().EnableJetStream(nil); err != nil {
|
||||
return fmt.Errorf("Error enabling jetstream on the global account")
|
||||
}
|
||||
} else if err := s.enableAllJetStreamAccounts(); err != nil {
|
||||
} else if err := s.configAllJetStreamAccounts(); err != nil {
|
||||
return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// enableAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested.
|
||||
func (s *Server) enableAllJetStreamAccounts() error {
|
||||
// enableAllJetStreamServiceImports turns on all service imports for jetstream for this account.
|
||||
func (a *Account) enableAllJetStreamServiceImports() error {
|
||||
a.mu.RLock()
|
||||
s := a.srv
|
||||
a.mu.RUnlock()
|
||||
|
||||
if s == nil {
|
||||
return fmt.Errorf("jetstream account not registered")
|
||||
}
|
||||
|
||||
// In case the enabled import exists here.
|
||||
a.removeServiceImport(JetStreamEnabled)
|
||||
|
||||
sys := s.SystemAccount()
|
||||
for _, export := range allJsExports {
|
||||
if err := a.AddServiceImport(sys, export, _EMPTY_); err != nil {
|
||||
return fmt.Errorf("Error setting up jetstream service imports for account: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// enableJetStreamEnabledServiceImportOnly will enable the single service import responder.
|
||||
// Should we do them all regardless?
|
||||
func (a *Account) enableJetStreamEnabledServiceImportOnly() error {
|
||||
a.mu.RLock()
|
||||
s := a.srv
|
||||
a.mu.RUnlock()
|
||||
|
||||
if s == nil {
|
||||
return fmt.Errorf("jetstream account not registered")
|
||||
}
|
||||
sys := s.SystemAccount()
|
||||
if err := a.AddServiceImport(sys, JetStreamEnabled, _EMPTY_); err != nil {
|
||||
return fmt.Errorf("Error setting up jetstream service imports for account: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested.
|
||||
func (s *Server) configAllJetStreamAccounts() error {
|
||||
var jsAccounts []*Account
|
||||
|
||||
// Snapshot into our own list. Might not be needed.
|
||||
s.mu.Lock()
|
||||
s.accounts.Range(func(k, v interface{}) bool {
|
||||
acc := v.(*Account)
|
||||
if acc.jsLimits != nil {
|
||||
jsAccounts = append(jsAccounts, acc)
|
||||
}
|
||||
jsAccounts = append(jsAccounts, v.(*Account))
|
||||
return true
|
||||
})
|
||||
enabled := s.js != nil
|
||||
s.mu.Unlock()
|
||||
|
||||
// Bail if server not enabled. If it was enabled and a reload turns it off
|
||||
// that will be handled elsewhere.
|
||||
if !enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
sys := s.SystemAccount()
|
||||
|
||||
// Process any jetstream enabled accounts here.
|
||||
for _, acc := range jsAccounts {
|
||||
if err := acc.EnableJetStream(acc.jsLimits); err != nil {
|
||||
return err
|
||||
if acc.jsLimits != nil {
|
||||
// Check if already enabled. This can be during a reload.
|
||||
if acc.JetStreamEnabled() {
|
||||
if err := acc.enableAllJetStreamServiceImports(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := acc.UpdateJetStreamLimits(acc.jsLimits); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err := acc.EnableJetStream(acc.jsLimits); err != nil {
|
||||
return err
|
||||
}
|
||||
acc.jsLimits = nil
|
||||
} else if acc != sys {
|
||||
if acc.JetStreamEnabled() {
|
||||
acc.DisableJetStream()
|
||||
}
|
||||
// We will setup basic service imports to respond to
|
||||
// requests if JS is enabled for this account.
|
||||
if err := acc.enableJetStreamEnabledServiceImportOnly(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
acc.jsLimits = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -314,11 +379,8 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
a.mu.Unlock()
|
||||
|
||||
// Create the proper imports here.
|
||||
sys := s.SystemAccount()
|
||||
for _, export := range allJsExports {
|
||||
if err := a.AddServiceImport(sys, export, _EMPTY_); err != nil {
|
||||
return fmt.Errorf("Error setting up jetstream service imports for account: %v", err)
|
||||
}
|
||||
if err := a.enableAllJetStreamServiceImports(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.Debugf("Enabled JetStream for account %q", a.Name)
|
||||
@@ -538,17 +600,16 @@ func (a *Account) LookupStream(name string) (*Stream, error) {
|
||||
func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error {
|
||||
a.mu.RLock()
|
||||
s := a.srv
|
||||
jsa := a.js
|
||||
a.mu.RUnlock()
|
||||
|
||||
if s == nil {
|
||||
return fmt.Errorf("jetstream account not registered")
|
||||
}
|
||||
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return fmt.Errorf("jetstream not enabled")
|
||||
}
|
||||
|
||||
jsa := js.lookupAccount(a)
|
||||
if jsa == nil {
|
||||
return fmt.Errorf("jetstream not enabled for account")
|
||||
}
|
||||
|
||||
@@ -53,6 +53,10 @@ type option interface {
|
||||
// IsClusterPermsChange indicates if this option requires reloading
|
||||
// cluster permissions.
|
||||
IsClusterPermsChange() bool
|
||||
|
||||
// IsJetStreamChange inidicates a change in the servers config for JetStream.
|
||||
// Account changes will be handled separately in reloadAuthorization.
|
||||
IsJetStreamChange() bool
|
||||
}
|
||||
|
||||
// noopOption is a base struct that provides default no-op behaviors.
|
||||
@@ -74,6 +78,10 @@ func (n noopOption) IsClusterPermsChange() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (n noopOption) IsJetStreamChange() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// loggingOption is a base struct that provides default option behaviors for
|
||||
// logging-related options.
|
||||
type loggingOption struct {
|
||||
@@ -531,6 +539,20 @@ func (a *accountsOption) Apply(s *Server) {
|
||||
s.Noticef("Reloaded: accounts")
|
||||
}
|
||||
|
||||
// For changes to a server's config.
|
||||
type jetStreamOption struct {
|
||||
noopOption
|
||||
newValue bool
|
||||
}
|
||||
|
||||
func (a *jetStreamOption) Apply(s *Server) {
|
||||
s.Noticef("Reloaded: jetstream")
|
||||
}
|
||||
|
||||
func (jso jetStreamOption) IsJetStreamChange() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// connectErrorReports implements the option interface for the `connect_error_reports`
|
||||
// setting.
|
||||
type connectErrorReports struct {
|
||||
@@ -872,6 +894,18 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
|
||||
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
||||
field.Name, oldValue, newValue)
|
||||
}
|
||||
case "storedir":
|
||||
return nil, fmt.Errorf("config reload not supported for jetstream storage directory")
|
||||
case "jetstream":
|
||||
new := newValue.(bool)
|
||||
old := oldValue.(bool)
|
||||
if new != old {
|
||||
diffOpts = append(diffOpts, &jetStreamOption{newValue: new})
|
||||
}
|
||||
case "jetstreammaxmemory":
|
||||
return nil, fmt.Errorf("config reload not supported for jetstream max memory")
|
||||
case "jetstreammaxstore":
|
||||
return nil, fmt.Errorf("config reload not supported for jetstream max storage")
|
||||
case "connecterrorreports":
|
||||
diffOpts = append(diffOpts, &connectErrorReports{newValue: newValue.(int)})
|
||||
case "reconnecterrorreports":
|
||||
@@ -1002,10 +1036,11 @@ func (s *Server) reloadAuthorization() {
|
||||
// This map will contain the names of accounts that have their streams
|
||||
// import configuration changed.
|
||||
awcsti := make(map[string]struct{})
|
||||
|
||||
checkJetStream := false
|
||||
s.mu.Lock()
|
||||
|
||||
// This can not be changed for now so ok to check server's trustedKeys
|
||||
// This can not be changed for now so ok to check server's trustedKeys unlocked.
|
||||
// If plain configured accounts, process here.
|
||||
if s.trustedKeys == nil {
|
||||
// We need to drain the old accounts here since we have something
|
||||
// new configured. We do not want s.accounts to change since that would
|
||||
@@ -1040,6 +1075,7 @@ func (s *Server) reloadAuthorization() {
|
||||
newAcc.sl = acc.sl
|
||||
newAcc.rm = acc.rm
|
||||
newAcc.respMap = acc.respMap
|
||||
newAcc.js = acc.js
|
||||
acc.mu.RUnlock()
|
||||
|
||||
// Check if current and new config of this account are same
|
||||
@@ -1054,6 +1090,8 @@ func (s *Server) reloadAuthorization() {
|
||||
}
|
||||
return true
|
||||
})
|
||||
// Double check any JetStream configs.
|
||||
checkJetStream = true
|
||||
} else if s.opts.AccountResolver != nil {
|
||||
s.configureResolver()
|
||||
if _, ok := s.accResolver.(*MemAccResolver); ok {
|
||||
@@ -1147,6 +1185,11 @@ func (s *Server) reloadAuthorization() {
|
||||
route.authViolation()
|
||||
}
|
||||
}
|
||||
|
||||
// We will double check all JetStream configs on a reload.
|
||||
if checkJetStream {
|
||||
s.configAllJetStreamAccounts()
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if given client current account has changed (or user
|
||||
|
||||
@@ -16,6 +16,7 @@ package test
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
@@ -4934,7 +4935,7 @@ func clientConnectToServerWithUP(t *testing.T, opts *server.Options, user, pass
|
||||
func TestJetStreamMultipleAccountsBasics(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
listen: 127.0.0.1:-1
|
||||
jetstream: enabled
|
||||
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
|
||||
accounts: {
|
||||
A: {
|
||||
jetstream: enabled
|
||||
@@ -4958,19 +4959,19 @@ func TestJetStreamMultipleAccountsBasics(t *testing.T) {
|
||||
t.Fatalf("Expected JetStream to be enabled")
|
||||
}
|
||||
|
||||
nc := clientConnectToServerWithUP(t, opts, "ua", "pwd")
|
||||
defer nc.Close()
|
||||
nca := clientConnectToServerWithUP(t, opts, "ua", "pwd")
|
||||
defer nca.Close()
|
||||
|
||||
resp, _ := nc.Request(server.JetStreamEnabled, nil, 250*time.Millisecond)
|
||||
resp, _ := nca.Request(server.JetStreamEnabled, nil, 250*time.Millisecond)
|
||||
expectOKResponse(t, resp)
|
||||
|
||||
nc = clientConnectToServerWithUP(t, opts, "ub", "pwd")
|
||||
defer nc.Close()
|
||||
ncb := clientConnectToServerWithUP(t, opts, "ub", "pwd")
|
||||
defer ncb.Close()
|
||||
|
||||
resp, _ = nc.Request(server.JetStreamEnabled, nil, 250*time.Millisecond)
|
||||
resp, _ = ncb.Request(server.JetStreamEnabled, nil, 250*time.Millisecond)
|
||||
expectOKResponse(t, resp)
|
||||
|
||||
resp, err := nc.Request(server.JetStreamInfo, nil, time.Second)
|
||||
resp, err := ncb.Request(server.JetStreamInfo, nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -4992,12 +4993,102 @@ func TestJetStreamMultipleAccountsBasics(t *testing.T) {
|
||||
if limits.MaxStore != 1024*gb {
|
||||
t.Fatalf("Expected MaxStore to be 1TB, got %d", limits.MaxStore)
|
||||
}
|
||||
// Check C is not enabled.
|
||||
nc = clientConnectToServerWithUP(t, opts, "uc", "pwd")
|
||||
defer nc.Close()
|
||||
|
||||
if _, err = nc.Request(server.JetStreamEnabled, nil, 250*time.Millisecond); err == nil {
|
||||
t.Fatalf("Expected no response for account c")
|
||||
ncc := clientConnectToServerWithUP(t, opts, "uc", "pwd")
|
||||
defer ncc.Close()
|
||||
|
||||
expectNotEnabled := func(resp *nats.Msg, err error) {
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error requesting enabled status")
|
||||
}
|
||||
if resp == nil {
|
||||
t.Fatalf("No response, possible timeout?")
|
||||
}
|
||||
if string(resp.Data) != "-ERR 'jetstream not enabled for account'" {
|
||||
t.Fatalf("Expected to get a response indicating jetstream is not enabled for this account, got %q", resp.Data)
|
||||
}
|
||||
}
|
||||
|
||||
// Check C is not enabled. We expect a negative response, not a timeout.
|
||||
expectNotEnabled(ncc.Request(server.JetStreamEnabled, nil, 250*time.Millisecond))
|
||||
|
||||
// Now do simple reload and check that we do the right thing. Testing enable and disable and also change in limits
|
||||
newConf := []byte(`
|
||||
listen: 127.0.0.1:-1
|
||||
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
|
||||
accounts: {
|
||||
A: {
|
||||
jetstream: disabled
|
||||
users: [ {user: ua, password: pwd} ]
|
||||
},
|
||||
B: {
|
||||
jetstream: {max_mem: 32GB, max_store: 512GB, max_streams: 100, max_consumers: 4k}
|
||||
users: [ {user: ub, password: pwd} ]
|
||||
},
|
||||
C: {
|
||||
jetstream: {max_mem: 1GB, max_store: 1TB, max_streams: 10, max_consumers: 1k}
|
||||
users: [ {user: uc, password: pwd} ]
|
||||
},
|
||||
}
|
||||
`)
|
||||
if err := ioutil.WriteFile(conf, newConf, 0600); err != nil {
|
||||
t.Fatalf("Error rewriting server's config file: %v", err)
|
||||
}
|
||||
if err := s.Reload(); err != nil {
|
||||
t.Fatalf("Error on server reload: %v", err)
|
||||
}
|
||||
expectNotEnabled(nca.Request(server.JetStreamEnabled, nil, 250*time.Millisecond))
|
||||
|
||||
resp, _ = ncb.Request(server.JetStreamEnabled, nil, 250*time.Millisecond)
|
||||
expectOKResponse(t, resp)
|
||||
|
||||
resp, _ = ncc.Request(server.JetStreamEnabled, nil, 250*time.Millisecond)
|
||||
expectOKResponse(t, resp)
|
||||
|
||||
// Now check that limits have been updated.
|
||||
// Account B
|
||||
resp, err = ncb.Request(server.JetStreamInfo, nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if err := json.Unmarshal(resp.Data, &info); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
limits = info.Limits
|
||||
if limits.MaxStreams != 100 {
|
||||
t.Fatalf("Expected 100 for MaxStreams, got %d", limits.MaxStreams)
|
||||
}
|
||||
if limits.MaxConsumers != 4000 {
|
||||
t.Fatalf("Expected MaxConsumers of %d, got %d", 4000, limits.MaxConsumers)
|
||||
}
|
||||
if limits.MaxMemory != 32*gb {
|
||||
t.Fatalf("Expected MaxMemory to be 32GB, got %d", limits.MaxMemory)
|
||||
}
|
||||
if limits.MaxStore != 512*gb {
|
||||
t.Fatalf("Expected MaxStore to be 512GB, got %d", limits.MaxStore)
|
||||
}
|
||||
|
||||
// Account C
|
||||
resp, err = ncc.Request(server.JetStreamInfo, nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if err := json.Unmarshal(resp.Data, &info); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
limits = info.Limits
|
||||
if limits.MaxStreams != 10 {
|
||||
t.Fatalf("Expected 10 for MaxStreams, got %d", limits.MaxStreams)
|
||||
}
|
||||
if limits.MaxConsumers != 1000 {
|
||||
t.Fatalf("Expected MaxConsumers of %d, got %d", 1000, limits.MaxConsumers)
|
||||
}
|
||||
if limits.MaxMemory != gb {
|
||||
t.Fatalf("Expected MaxMemory to be 1GB, got %d", limits.MaxMemory)
|
||||
}
|
||||
if limits.MaxStore != 1024*gb {
|
||||
t.Fatalf("Expected MaxStore to be 1TB, got %d", limits.MaxStore)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user