From d10c306585d818229b0af94f0a8779427f03769b Mon Sep 17 00:00:00 2001 From: John Hooks Date: Tue, 11 Jan 2022 21:06:26 -0500 Subject: [PATCH 1/6] Check if value is string or int64 --- server/opts.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/server/opts.go b/server/opts.go index 044ce294..9afe35c9 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1680,6 +1680,25 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return nil } +func getInterfaceValue(v interface{}) (int64, error) { + _, ok := v.(int64) + if ok { + return v.(int64), nil + } + + _, ok = v.(string) + if !ok { + return 0, fmt.Errorf("must be int64 or string") + } + + i, err := strconv.Atoi(v.(string)) + if err != nil { + return 0, fmt.Errorf("must represent an int64") + } + + return int64(i), nil +} + // Parse enablement of jetstream for a server. func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var lt token @@ -1714,7 +1733,12 @@ func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]e opts.JetStreamMaxMemory = mv.(int64) opts.maxMemSet = true case "max_file_store", "max_file": - opts.JetStreamMaxStore = mv.(int64) + // need to define error to avoid non-name on left side of := + var err error + opts.JetStreamMaxStore, err = getInterfaceValue(mv) + if err != nil { + return &configErr{tk, fmt.Sprintf("max_file_store %s", err)} + } opts.maxStoreSet = true case "domain": opts.JetStreamDomain = mv.(string) From 12f8179fd8921c143ee28b055aa8b62df4e4c344 Mon Sep 17 00:00:00 2001 From: John Hooks Date: Tue, 11 Jan 2022 22:13:20 -0500 Subject: [PATCH 2/6] Add parsing of string for sizes Checks the suffix of the string to create the size of the int64 --- server/opts.go | 34 +++++++++++++++++++++++++++++++--- server/opts_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/server/opts.go b/server/opts.go index 9afe35c9..0c54ca06 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1691,12 +1691,40 @@ func getInterfaceValue(v interface{}) (int64, error) { return 0, fmt.Errorf("must be int64 or string") } - i, err := strconv.Atoi(v.(string)) + num, err := stringStorageSize(v.(string)) if err != nil { - return 0, fmt.Errorf("must represent an int64") + return 0, err } - return int64(i), nil + return int64(num), nil +} + +func stringStorageSize(s string) (int, error) { + suffix := strings.ToUpper(s[len(s)-1:]) + prefix := s[:len(s)-1] + var num int + var err error + switch suffix { + case "K": + num, err = strconv.Atoi(prefix) + num = num * 1024 + case "M": + num, err = strconv.Atoi(prefix) + num = num * 1048576 + case "G": + num, err = strconv.Atoi(prefix) + num = num * 1073741824 + case "T": + num, err = strconv.Atoi(prefix) + num = num * 1099511627776 + default: + return 0, fmt.Errorf("sizes defined as strings must end in K, M, G, T") + } + if err != nil { + return 0, err + } + + return num, nil } // Parse enablement of jetstream for a server. diff --git a/server/opts_test.go b/server/opts_test.go index 1e73f167..7d513808 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -3256,3 +3256,31 @@ func TestMaxSubTokens(t *testing.T) { t.Fatal("Did not get the permissions error") } } + +func TestStringStoreSize(t *testing.T) { + tt := []struct { + input string + want int64 + err bool + }{ + {"1K", 1024, false}, + {"1M", 1048576, false}, + {"1G", 1073741824, false}, + {"1T", 1099511627776, false}, + {"1L", 0, true}, + {"TT", 0, true}, + } + + for _, v := range tt { + var testErr bool + got, err := getInterfaceValue(v.input) + if err != nil { + testErr = true + } + + if got != v.want || v.err != testErr { + t.Errorf("Got: %v, want %v with error: %v", got, v.want, testErr) + } + } + +} From d956d6d39879baca35da10c3f71cf76b45ac077f Mon Sep 17 00:00:00 2001 From: John Hooks Date: Wed, 12 Jan 2022 17:55:57 -0500 Subject: [PATCH 3/6] Review Rename function More easily read math merged functions together Changed from predefining error Fix empty string issue use same function for max mem store --- server/opts.go | 33 ++++++++++++++++----------------- server/opts_test.go | 5 +++-- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/server/opts.go b/server/opts.go index 0c54ca06..0a8a3ca6 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1680,26 +1680,22 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return nil } -func getInterfaceValue(v interface{}) (int64, error) { +// takes in a storage size as either an int or a string and returns an int64 value based on the input. +func getStorageSize(v interface{}) (int64, error) { _, ok := v.(int64) if ok { return v.(int64), nil } - _, ok = v.(string) + s, ok := v.(string) if !ok { return 0, fmt.Errorf("must be int64 or string") } - num, err := stringStorageSize(v.(string)) - if err != nil { - return 0, err + if s == "" { + return 0, nil } - return int64(num), nil -} - -func stringStorageSize(s string) (int, error) { suffix := strings.ToUpper(s[len(s)-1:]) prefix := s[:len(s)-1] var num int @@ -1710,13 +1706,13 @@ func stringStorageSize(s string) (int, error) { num = num * 1024 case "M": num, err = strconv.Atoi(prefix) - num = num * 1048576 + num = num * 1024 * 1024 case "G": num, err = strconv.Atoi(prefix) - num = num * 1073741824 + num = num * 1024 * 1024 * 1024 case "T": num, err = strconv.Atoi(prefix) - num = num * 1099511627776 + num = num * 1024 * 1024 * 1024 * 1024 default: return 0, fmt.Errorf("sizes defined as strings must end in K, M, G, T") } @@ -1724,7 +1720,7 @@ func stringStorageSize(s string) (int, error) { return 0, err } - return num, nil + return int64(num), nil } // Parse enablement of jetstream for a server. @@ -1758,15 +1754,18 @@ func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]e } opts.StoreDir = mv.(string) case "max_memory_store", "max_mem_store", "max_mem": - opts.JetStreamMaxMemory = mv.(int64) + s, err := getStorageSize(mv) + if err != nil { + return &configErr{tk, fmt.Sprintf("max_mem_store %s", err)} + } + opts.JetStreamMaxMemory = s opts.maxMemSet = true case "max_file_store", "max_file": - // need to define error to avoid non-name on left side of := - var err error - opts.JetStreamMaxStore, err = getInterfaceValue(mv) + s, err := getStorageSize(mv) if err != nil { return &configErr{tk, fmt.Sprintf("max_file_store %s", err)} } + opts.JetStreamMaxStore, err = getStorageSize(s) opts.maxStoreSet = true case "domain": opts.JetStreamDomain = mv.(string) diff --git a/server/opts_test.go b/server/opts_test.go index 7d513808..8a579d1f 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -3257,7 +3257,7 @@ func TestMaxSubTokens(t *testing.T) { } } -func TestStringStoreSize(t *testing.T) { +func TestGetStorageSize(t *testing.T) { tt := []struct { input string want int64 @@ -3269,11 +3269,12 @@ func TestStringStoreSize(t *testing.T) { {"1T", 1099511627776, false}, {"1L", 0, true}, {"TT", 0, true}, + {"", 0, false}, } for _, v := range tt { var testErr bool - got, err := getInterfaceValue(v.input) + got, err := getStorageSize(v.input) if err != nil { testErr = true } From 4aa6c62bbca631957d268b287f7bf08ad6faac9c Mon Sep 17 00:00:00 2001 From: John Hooks Date: Wed, 12 Jan 2022 18:03:24 -0500 Subject: [PATCH 4/6] Fix non used error --- server/opts.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/opts.go b/server/opts.go index 0a8a3ca6..b60e5a3d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1765,7 +1765,7 @@ func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]e if err != nil { return &configErr{tk, fmt.Sprintf("max_file_store %s", err)} } - opts.JetStreamMaxStore, err = getStorageSize(s) + opts.JetStreamMaxStore = s opts.maxStoreSet = true case "domain": opts.JetStreamDomain = mv.(string) From 0b77d519053d6b17d6efd8cacbfd3c6487ac628b Mon Sep 17 00:00:00 2001 From: John Hooks Date: Wed, 12 Jan 2022 21:07:07 -0500 Subject: [PATCH 5/6] Remove switch Removes switch statement and uses map for finding correct multiple Also uses ParseInt instead of Atoi for getting string integer. --- server/opts.go | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/server/opts.go b/server/opts.go index b60e5a3d..2881545a 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1696,31 +1696,22 @@ func getStorageSize(v interface{}) (int64, error) { return 0, nil } - suffix := strings.ToUpper(s[len(s)-1:]) + suffix := s[len(s)-1:] prefix := s[:len(s)-1] - var num int - var err error - switch suffix { - case "K": - num, err = strconv.Atoi(prefix) - num = num * 1024 - case "M": - num, err = strconv.Atoi(prefix) - num = num * 1024 * 1024 - case "G": - num, err = strconv.Atoi(prefix) - num = num * 1024 * 1024 * 1024 - case "T": - num, err = strconv.Atoi(prefix) - num = num * 1024 * 1024 * 1024 * 1024 - default: - return 0, fmt.Errorf("sizes defined as strings must end in K, M, G, T") - } + num, err := strconv.ParseInt(prefix, 10, 64) if err != nil { return 0, err } - return int64(num), nil + suffixMap := map[string]int64{"K": 10, "M": 20, "G": 30, "T": 40} + + mult, ok := suffixMap[suffix] + if !ok { + return 0, fmt.Errorf("sizes defined as strings must end in K, M, G, T") + } + num *= 1 << mult + + return num, nil } // Parse enablement of jetstream for a server. From ffe50d8573f655855963e9e686338cddd41b4568 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 12 Jan 2022 20:37:00 -0700 Subject: [PATCH 6/6] [IMPROVED] JetStream clustering with lots of streams/consumers Some operations could cause the route to block due to lock being held during store operations. On macOS, having lots of streams/consumers and restarting the cluster would cause lots of concurrent IO that would cause lock to be held for too long, causing head-of-line blocking in processing of messages from a route. Signed-off-by: Ivan Kozlovic --- server/raft.go | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/server/raft.go b/server/raft.go index 65333221..fb7687f9 100644 --- a/server/raft.go +++ b/server/raft.go @@ -485,21 +485,10 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { - n.RLock() js := n.js if !n.track || js == nil { - n.RUnlock() return false } - n.RUnlock() - - js.mu.RLock() - jsDisabled := js.disabled - js.mu.RUnlock() - if jsDisabled { - return false - } - return js.limitsExceeded(n.wtype) } @@ -1537,6 +1526,8 @@ func (n *raft) runAsFollower() { } case <-n.respc: // Ignore + case <-n.votes: + n.debug("Ignoring old vote response, we have stepped down") case vreq := <-n.reqs: n.processVoteRequest(vreq) case newLeader := <-n.stepdown: @@ -2827,10 +2818,6 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { // handleAppendEntryResponse processes responses to append entries. func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { - // Ignore if not the leader. - if !n.Leader() { - return - } msg = copyBytes(msg) ar := n.decodeAppendEntryResponse(msg) ar.reply = reply @@ -3140,7 +3127,10 @@ func (n *raft) fileWriter() { n.RLock() copy(buf[0:], n.wtv) n.RUnlock() - if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil { + <-dios + err := ioutil.WriteFile(tvf, buf[:], 0640) + dios <- struct{}{} + if err != nil { n.setWriteErr(err) n.warn("Error writing term and vote file for %q: %v", n.group, err) } @@ -3148,7 +3138,10 @@ func (n *raft) fileWriter() { n.RLock() buf := copyBytes(n.wps) n.RUnlock() - if err := ioutil.WriteFile(psf, buf, 0640); err != nil { + <-dios + err := ioutil.WriteFile(psf, buf, 0640) + dios <- struct{}{} + if err != nil { n.setWriteErr(err) n.warn("Error writing peer state file for %q: %v", n.group, err) } @@ -3216,10 +3209,6 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _ *Account, _, r n.error("Received malformed vote response for %q", n.group) return } - if state := n.State(); state != Candidate && state != Leader { - n.debug("Ignoring old vote response, we have stepped down") - return - } select { case n.votes <- vr: