Merge branch 'main' into pull-consumer

This commit is contained in:
Derek Collison
2022-01-13 09:14:33 -08:00
3 changed files with 83 additions and 23 deletions

View File

@@ -1680,6 +1680,40 @@ func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warn
return nil return nil
} }
// takes in a storage size as either an int or a string and returns an int64 value based on the input.
func getStorageSize(v interface{}) (int64, error) {
_, ok := v.(int64)
if ok {
return v.(int64), nil
}
s, ok := v.(string)
if !ok {
return 0, fmt.Errorf("must be int64 or string")
}
if s == "" {
return 0, nil
}
suffix := s[len(s)-1:]
prefix := s[:len(s)-1]
num, err := strconv.ParseInt(prefix, 10, 64)
if err != nil {
return 0, err
}
suffixMap := map[string]int64{"K": 10, "M": 20, "G": 30, "T": 40}
mult, ok := suffixMap[suffix]
if !ok {
return 0, fmt.Errorf("sizes defined as strings must end in K, M, G, T")
}
num *= 1 << mult
return num, nil
}
// Parse enablement of jetstream for a server. // Parse enablement of jetstream for a server.
func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]error) error {
var lt token var lt token
@@ -1711,10 +1745,18 @@ func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]e
} }
opts.StoreDir = mv.(string) opts.StoreDir = mv.(string)
case "max_memory_store", "max_mem_store", "max_mem": case "max_memory_store", "max_mem_store", "max_mem":
opts.JetStreamMaxMemory = mv.(int64) s, err := getStorageSize(mv)
if err != nil {
return &configErr{tk, fmt.Sprintf("max_mem_store %s", err)}
}
opts.JetStreamMaxMemory = s
opts.maxMemSet = true opts.maxMemSet = true
case "max_file_store", "max_file": case "max_file_store", "max_file":
opts.JetStreamMaxStore = mv.(int64) s, err := getStorageSize(mv)
if err != nil {
return &configErr{tk, fmt.Sprintf("max_file_store %s", err)}
}
opts.JetStreamMaxStore = s
opts.maxStoreSet = true opts.maxStoreSet = true
case "domain": case "domain":
opts.JetStreamDomain = mv.(string) opts.JetStreamDomain = mv.(string)

View File

@@ -3256,3 +3256,32 @@ func TestMaxSubTokens(t *testing.T) {
t.Fatal("Did not get the permissions error") t.Fatal("Did not get the permissions error")
} }
} }
func TestGetStorageSize(t *testing.T) {
tt := []struct {
input string
want int64
err bool
}{
{"1K", 1024, false},
{"1M", 1048576, false},
{"1G", 1073741824, false},
{"1T", 1099511627776, false},
{"1L", 0, true},
{"TT", 0, true},
{"", 0, false},
}
for _, v := range tt {
var testErr bool
got, err := getStorageSize(v.input)
if err != nil {
testErr = true
}
if got != v.want || v.err != testErr {
t.Errorf("Got: %v, want %v with error: %v", got, v.want, testErr)
}
}
}

View File

@@ -485,21 +485,10 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
// outOfResources checks to see if we are out of resources. // outOfResources checks to see if we are out of resources.
func (n *raft) outOfResources() bool { func (n *raft) outOfResources() bool {
n.RLock()
js := n.js js := n.js
if !n.track || js == nil { if !n.track || js == nil {
n.RUnlock()
return false return false
} }
n.RUnlock()
js.mu.RLock()
jsDisabled := js.disabled
js.mu.RUnlock()
if jsDisabled {
return false
}
return js.limitsExceeded(n.wtype) return js.limitsExceeded(n.wtype)
} }
@@ -1537,6 +1526,8 @@ func (n *raft) runAsFollower() {
} }
case <-n.respc: case <-n.respc:
// Ignore // Ignore
case <-n.votes:
n.debug("Ignoring old vote response, we have stepped down")
case vreq := <-n.reqs: case vreq := <-n.reqs:
n.processVoteRequest(vreq) n.processVoteRequest(vreq)
case newLeader := <-n.stepdown: case newLeader := <-n.stepdown:
@@ -2827,10 +2818,6 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
// handleAppendEntryResponse processes responses to append entries. // handleAppendEntryResponse processes responses to append entries.
func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
// Ignore if not the leader.
if !n.Leader() {
return
}
msg = copyBytes(msg) msg = copyBytes(msg)
ar := n.decodeAppendEntryResponse(msg) ar := n.decodeAppendEntryResponse(msg)
ar.reply = reply ar.reply = reply
@@ -3140,7 +3127,10 @@ func (n *raft) fileWriter() {
n.RLock() n.RLock()
copy(buf[0:], n.wtv) copy(buf[0:], n.wtv)
n.RUnlock() n.RUnlock()
if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil { <-dios
err := ioutil.WriteFile(tvf, buf[:], 0640)
dios <- struct{}{}
if err != nil {
n.setWriteErr(err) n.setWriteErr(err)
n.warn("Error writing term and vote file for %q: %v", n.group, err) n.warn("Error writing term and vote file for %q: %v", n.group, err)
} }
@@ -3148,7 +3138,10 @@ func (n *raft) fileWriter() {
n.RLock() n.RLock()
buf := copyBytes(n.wps) buf := copyBytes(n.wps)
n.RUnlock() n.RUnlock()
if err := ioutil.WriteFile(psf, buf, 0640); err != nil { <-dios
err := ioutil.WriteFile(psf, buf, 0640)
dios <- struct{}{}
if err != nil {
n.setWriteErr(err) n.setWriteErr(err)
n.warn("Error writing peer state file for %q: %v", n.group, err) n.warn("Error writing peer state file for %q: %v", n.group, err)
} }
@@ -3216,10 +3209,6 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _ *Account, _, r
n.error("Received malformed vote response for %q", n.group) n.error("Received malformed vote response for %q", n.group)
return return
} }
if state := n.State(); state != Candidate && state != Leader {
n.debug("Ignoring old vote response, we have stepped down")
return
}
select { select {
case n.votes <- vr: case n.votes <- vr: