diff --git a/server/opts.go b/server/opts.go index 044ce294..2881545a 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1680,6 +1680,40 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn return nil } +// takes in a storage size as either an int or a string and returns an int64 value based on the input. +func getStorageSize(v interface{}) (int64, error) { + _, ok := v.(int64) + if ok { + return v.(int64), nil + } + + s, ok := v.(string) + if !ok { + return 0, fmt.Errorf("must be int64 or string") + } + + if s == "" { + return 0, nil + } + + suffix := s[len(s)-1:] + prefix := s[:len(s)-1] + num, err := strconv.ParseInt(prefix, 10, 64) + if err != nil { + return 0, err + } + + suffixMap := map[string]int64{"K": 10, "M": 20, "G": 30, "T": 40} + + mult, ok := suffixMap[suffix] + if !ok { + return 0, fmt.Errorf("sizes defined as strings must end in K, M, G, T") + } + num *= 1 << mult + + return num, nil +} + // Parse enablement of jetstream for a server. func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var lt token @@ -1711,10 +1745,18 @@ func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]e } opts.StoreDir = mv.(string) case "max_memory_store", "max_mem_store", "max_mem": - opts.JetStreamMaxMemory = mv.(int64) + s, err := getStorageSize(mv) + if err != nil { + return &configErr{tk, fmt.Sprintf("max_mem_store %s", err)} + } + opts.JetStreamMaxMemory = s opts.maxMemSet = true case "max_file_store", "max_file": - opts.JetStreamMaxStore = mv.(int64) + s, err := getStorageSize(mv) + if err != nil { + return &configErr{tk, fmt.Sprintf("max_file_store %s", err)} + } + opts.JetStreamMaxStore = s opts.maxStoreSet = true case "domain": opts.JetStreamDomain = mv.(string) diff --git a/server/opts_test.go b/server/opts_test.go index 1e73f167..8a579d1f 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -3256,3 +3256,32 @@ func TestMaxSubTokens(t *testing.T) { t.Fatal("Did not get the permissions error") } } + +func TestGetStorageSize(t *testing.T) { + tt := []struct { + input string + want int64 + err bool + }{ + {"1K", 1024, false}, + {"1M", 1048576, false}, + {"1G", 1073741824, false}, + {"1T", 1099511627776, false}, + {"1L", 0, true}, + {"TT", 0, true}, + {"", 0, false}, + } + + for _, v := range tt { + var testErr bool + got, err := getStorageSize(v.input) + if err != nil { + testErr = true + } + + if got != v.want || v.err != testErr { + t.Errorf("Got: %v, want %v with error: %v", got, v.want, testErr) + } + } + +} diff --git a/server/raft.go b/server/raft.go index 65333221..fb7687f9 100644 --- a/server/raft.go +++ b/server/raft.go @@ -485,21 +485,10 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { - n.RLock() js := n.js if !n.track || js == nil { - n.RUnlock() return false } - n.RUnlock() - - js.mu.RLock() - jsDisabled := js.disabled - js.mu.RUnlock() - if jsDisabled { - return false - } - return js.limitsExceeded(n.wtype) } @@ -1537,6 +1526,8 @@ func (n *raft) runAsFollower() { } case <-n.respc: // Ignore + case <-n.votes: + n.debug("Ignoring old vote response, we have stepped down") case vreq := <-n.reqs: n.processVoteRequest(vreq) case newLeader := <-n.stepdown: @@ -2827,10 +2818,6 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { // handleAppendEntryResponse processes responses to append entries. func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { - // Ignore if not the leader. - if !n.Leader() { - return - } msg = copyBytes(msg) ar := n.decodeAppendEntryResponse(msg) ar.reply = reply @@ -3140,7 +3127,10 @@ func (n *raft) fileWriter() { n.RLock() copy(buf[0:], n.wtv) n.RUnlock() - if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil { + <-dios + err := ioutil.WriteFile(tvf, buf[:], 0640) + dios <- struct{}{} + if err != nil { n.setWriteErr(err) n.warn("Error writing term and vote file for %q: %v", n.group, err) } @@ -3148,7 +3138,10 @@ func (n *raft) fileWriter() { n.RLock() buf := copyBytes(n.wps) n.RUnlock() - if err := ioutil.WriteFile(psf, buf, 0640); err != nil { + <-dios + err := ioutil.WriteFile(psf, buf, 0640) + dios <- struct{}{} + if err != nil { n.setWriteErr(err) n.warn("Error writing peer state file for %q: %v", n.group, err) } @@ -3216,10 +3209,6 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _ *Account, _, r n.error("Received malformed vote response for %q", n.group) return } - if state := n.State(); state != Candidate && state != Leader { - n.debug("Ignoring old vote response, we have stepped down") - return - } select { case n.votes <- vr: