From ec0bc1dbec477c28e31a3d5ce192071ec2435bcf Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 12 Apr 2020 18:27:53 -0700 Subject: [PATCH] First pass account configuration for jetstream Signed-off-by: Derek Collison --- conf/lex.go | 2 +- conf/parse.go | 12 ++++ server/accounts.go | 7 ++- server/jetstream.go | 26 ++++++++ server/opts.go | 138 ++++++++++++++++++++++++++++++++++------- server/server.go | 1 - test/jetstream_test.go | 79 +++++++++++++++++++++++ 7 files changed, 238 insertions(+), 27 deletions(-) diff --git a/conf/lex.go b/conf/lex.go index 7015290d..b2357702 100644 --- a/conf/lex.go +++ b/conf/lex.go @@ -1132,7 +1132,7 @@ func lexSkip(lx *lexer, nextState stateFn) stateFn { // Tests to see if we have a number suffix func isNumberSuffix(r rune) bool { - return r == 'k' || r == 'K' || r == 'm' || r == 'M' || r == 'g' || r == 'G' + return r == 'k' || r == 'K' || r == 'm' || r == 'M' || r == 'g' || r == 'G' || r == 't' || r == 'T' || r == 'p' || r == 'P' || r == 'e' || r == 'E' } // Tests for both key separators diff --git a/conf/parse.go b/conf/parse.go index 4b4ae449..a22b3e3d 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -263,6 +263,18 @@ func (p *parser) processItem(it item, fp string) error { setValue(it, num*1000*1000*1000) case "gb": setValue(it, num*1024*1024*1024) + case "t": + setValue(it, num*1000*1000*1000*1000) + case "tb": + setValue(it, num*1024*1024*1024*1024) + case "p": + setValue(it, num*1000*1000*1000*1000*1000) + case "pb": + setValue(it, num*1024*1024*1024*1024*1024) + case "e": + setValue(it, num*1000*1000*1000*1000*1000*1000) + case "eb": + setValue(it, num*1024*1024*1024*1024*1024*1024) } case itemFloat: num, err := strconv.ParseFloat(it.val, 64) diff --git a/server/accounts.go b/server/accounts.go index 0ef6d43e..50c9ab9d 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -45,7 +45,7 @@ type Account struct { sqmu sync.Mutex sl *Sublist ic *client - isid int + isid uint64 etmr *time.Timer ctmr *time.Timer strack map[string]sconns @@ -63,6 +63,7 @@ type Account struct { imports importMap exports exportMap js *jsAccount + jsLimits *JetStreamAccountLimits limits nae int32 pruning bool @@ -242,6 +243,8 @@ func (a *Account) shallowCopy() *Account { } } } + na.jsLimits = a.jsLimits + return na } @@ -1066,7 +1069,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error { a.ic.acc = a } c := a.ic - sid := strconv.Itoa(a.isid + 1) + sid := strconv.FormatUint(a.isid+1, 10) a.mu.RUnlock() // This will happen in parsing when the account has not been properly setup. diff --git a/server/jetstream.go b/server/jetstream.go index 95cf8b16..80e453bc 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -166,11 +166,37 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { if err := s.GlobalAccount().EnableJetStream(nil); err != nil { return fmt.Errorf("Error enabling jetstream on the global account") } + } else if err := s.enableAllJetStreamAccounts(); err != nil { + return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err) } return nil } +// enableAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested. +func (s *Server) enableAllJetStreamAccounts() error { + var jsAccounts []*Account + + s.mu.Lock() + s.accounts.Range(func(k, v interface{}) bool { + acc := v.(*Account) + if acc.jsLimits != nil { + jsAccounts = append(jsAccounts, acc) + } + return true + }) + s.mu.Unlock() + + // Process any jetstream enabled accounts here. + for _, acc := range jsAccounts { + if err := acc.EnableJetStream(acc.jsLimits); err != nil { + return err + } + acc.jsLimits = nil + } + return nil +} + // JetStreamEnabled reports if jetstream is enabled. func (s *Server) JetStreamEnabled() bool { s.mu.Lock() diff --git a/server/opts.go b/server/opts.go index 801a2740..05fe8327 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1133,38 +1133,124 @@ func parseGateway(v interface{}, o *Options, errors *[]error, warnings *[]error) return nil } +var dynamicJSAccountLimits = &JetStreamAccountLimits{-1, -1, -1, -1} + +// Parses jetstream account limits for an account. Simple setup with boolen is allowed, and we will +// use dynamic account limits. +func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warnings *[]error) error { + var lt token + + tk, v := unwrapValue(v, <) + + // Value here can be bool, or string "enabled" or a map. + switch vv := v.(type) { + case bool: + if vv { + acc.jsLimits = dynamicJSAccountLimits + } + case string: + switch strings.ToLower(vv) { + case "enabled", "enable": + acc.jsLimits = dynamicJSAccountLimits + case "disabled", "disable": + acc.jsLimits = nil + default: + return &configErr{tk, fmt.Sprintf("Expected 'enabled' or 'disabled' for string value, got '%s'", vv)} + } + case map[string]interface{}: + jsLimits := &JetStreamAccountLimits{-1, -1, -1, -1} + for mk, mv := range vv { + tk, mv = unwrapValue(mv, <) + switch strings.ToLower(mk) { + case "max_memory", "max_mem", "mem", "memory": + vv, ok := mv.(int64) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} + } + jsLimits.MaxMemory = int64(vv) + case "max_store", "max_file", "max_disk", "store", "disk": + vv, ok := mv.(int64) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} + } + jsLimits.MaxStore = int64(vv) + case "max_streams", "streams": + vv, ok := mv.(int64) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} + } + jsLimits.MaxStreams = int(vv) + case "max_consumers", "consumers": + vv, ok := mv.(int64) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} + } + jsLimits.MaxConsumers = int(vv) + default: + if !tk.IsUsedVariable() { + err := &unknownConfigFieldErr{ + field: mk, + configErr: configErr{ + token: tk, + }, + } + *errors = append(*errors, err) + continue + } + } + } + acc.jsLimits = jsLimits + default: + return &configErr{tk, fmt.Sprintf("Expected map, bool or string to define JetStream, got %T", v)} + } + + return nil +} + +// Parse enablement of jetstream for a server. func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var lt token tk, v := unwrapValue(v, <) - cm, ok := v.(map[string]interface{}) - if !ok { - return &configErr{tk, fmt.Sprintf("Expected map to define JetStream, got %T", v)} - } - opts.JetStream = true - - for mk, mv := range cm { - tk, mv = unwrapValue(mv, <) - switch strings.ToLower(mk) { - case "store_dir", "storedir": - opts.StoreDir = mv.(string) - case "max_memory_store", "max_mem_store": - opts.JetStreamMaxMemory = mv.(int64) - case "max_file_store": - opts.JetStreamMaxStore = mv.(int64) + // Value here can be bool, or string "enabled" or a map. + switch vv := v.(type) { + case bool: + opts.JetStream = v.(bool) + case string: + switch strings.ToLower(vv) { + case "enabled", "enable": + opts.JetStream = true + case "disabled", "disable": + opts.JetStream = false default: - if !tk.IsUsedVariable() { - err := &unknownConfigFieldErr{ - field: mk, - configErr: configErr{ - token: tk, - }, + return &configErr{tk, fmt.Sprintf("Expected 'enabled' or 'disabled' for string value, got '%s'", vv)} + } + case map[string]interface{}: + for mk, mv := range vv { + tk, mv = unwrapValue(mv, <) + switch strings.ToLower(mk) { + case "store_dir", "storedir": + opts.StoreDir = mv.(string) + case "max_memory_store", "max_mem_store": + opts.JetStreamMaxMemory = mv.(int64) + case "max_file_store": + opts.JetStreamMaxStore = mv.(int64) + default: + if !tk.IsUsedVariable() { + err := &unknownConfigFieldErr{ + field: mk, + configErr: configErr{ + token: tk, + }, + } + *errors = append(*errors, err) + continue } - *errors = append(*errors, err) - continue } } + default: + return &configErr{tk, fmt.Sprintf("Expected map, bool or string to define JetStream, got %T", v)} } return nil @@ -1687,6 +1773,12 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er } exportStreams = append(exportStreams, streams...) exportServices = append(exportServices, services...) + case "jetstream": + err := parseJetStreamForAccount(mv, acc, errors, warnings) + if err != nil { + *errors = append(*errors, err) + continue + } case "users": nkeys, users, err := parseUsers(mv, opts, errors, warnings) if err != nil { diff --git a/server/server.go b/server/server.go index d7df8706..529477ad 100644 --- a/server/server.go +++ b/server/server.go @@ -480,7 +480,6 @@ func (s *Server) configureAccounts() error { ea.approved[sub] = acc } } - s.accounts.Range(func(k, v interface{}) bool { acc := v.(*Account) // Exports diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 0f3f9d75..d71e8449 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -4922,6 +4922,85 @@ func TestJetStreamSingleInstanceRemoteAccess(t *testing.T) { } } +func clientConnectToServerWithUP(t *testing.T, opts *server.Options, user, pass string) *nats.Conn { + curl := fmt.Sprintf("nats://%s:%s@%s:%d", user, pass, opts.Host, opts.Port) + nc, err := nats.Connect(curl, nats.Name("JS-UP-TEST"), nats.ReconnectWait(5*time.Millisecond), nats.MaxReconnects(-1)) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + return nc +} + +func TestJetStreamMultipleAccountsBasics(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + jetstream: enabled + accounts: { + A: { + jetstream: enabled + users: [ {user: ua, password: pwd} ] + }, + B: { + jetstream: {max_mem: 1GB, max_store: 1TB, max_streams: 10, max_consumers: 1k} + users: [ {user: ub, password: pwd} ] + }, + C: { + users: [ {user: uc, password: pwd} ] + }, + } + `)) + defer os.Remove(conf) + + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + if !s.JetStreamEnabled() { + t.Fatalf("Expected JetStream to be enabled") + } + + nc := clientConnectToServerWithUP(t, opts, "ua", "pwd") + defer nc.Close() + + resp, _ := nc.Request(server.JetStreamEnabled, nil, 250*time.Millisecond) + expectOKResponse(t, resp) + + nc = clientConnectToServerWithUP(t, opts, "ub", "pwd") + defer nc.Close() + + resp, _ = nc.Request(server.JetStreamEnabled, nil, 250*time.Millisecond) + expectOKResponse(t, resp) + + resp, err := nc.Request(server.JetStreamInfo, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var info server.JetStreamAccountStats + if err := json.Unmarshal(resp.Data, &info); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + limits := info.Limits + if limits.MaxStreams != 10 { + t.Fatalf("Expected 10 for MaxStreams, got %d", limits.MaxStreams) + } + if limits.MaxConsumers != 1000 { + t.Fatalf("Expected MaxConsumers of %d, got %d", 1000, limits.MaxConsumers) + } + gb := int64(1024 * 1024 * 1024) + if limits.MaxMemory != gb { + t.Fatalf("Expected MaxMemory to be 1GB, got %d", limits.MaxMemory) + } + if limits.MaxStore != 1024*gb { + t.Fatalf("Expected MaxStore to be 1TB, got %d", limits.MaxStore) + } + // Check C is not enabled. + nc = clientConnectToServerWithUP(t, opts, "uc", "pwd") + defer nc.Close() + + if _, err = nc.Request(server.JetStreamEnabled, nil, 250*time.Millisecond); err == nil { + t.Fatalf("Expected no response for account c") + } +} + //////////////////////////////////////// // Benchmark placeholders ////////////////////////////////////////