Merge pull request #2879 from nats-io/some_updates

Various changes
This commit is contained in:
Ivan Kozlovic
2022-02-23 09:01:57 -07:00
committed by GitHub
2 changed files with 22 additions and 8 deletions

View File

@@ -101,7 +101,7 @@ func (q *ipQueue) push(e interface{}) int {
}
q.elts = append(q.elts, e)
l++
if l >= q.lt && q.logger != nil {
if l >= q.lt && q.logger != nil && (l <= q.lt+10 || q.lt%10000 == 0) {
q.logger.log(q.name, l)
}
q.Unlock()

View File

@@ -197,6 +197,9 @@ type raft struct {
stepdown *ipQueue // of string
leadc chan bool
quit chan struct{}
// Random generator, used to generate inboxes for instance
prand *rand.Rand
}
// cacthupState structure that holds our subscription, and catchup term and index
@@ -340,6 +343,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
sq := s.sys.sq
sacc := s.sys.account
hash := s.sys.shash
pub := s.info.ID
s.mu.Unlock()
ps, err := readPeerState(cfg.Store)
@@ -351,6 +355,12 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
}
qpfx := fmt.Sprintf("RAFT [%s - %s] ", hash[:idLen], cfg.Name)
rsrc := time.Now().UnixNano()
if len(pub) >= 32 {
if h, _ := highwayhash.New64([]byte(pub[:32])); h != nil {
rsrc += int64(h.Sum64())
}
}
n := &raft{
created: time.Now(),
id: hash[:idLen],
@@ -383,6 +393,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
leadc: make(chan bool, 1),
observer: cfg.Observer,
extSt: ps.domainExt,
prand: rand.New(rand.NewSource(rsrc)),
}
n.c.registerWithAccount(sacc)
@@ -444,12 +455,6 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
// Send nil entry to signal the upper layers we are done doing replay/restore.
n.apply.push(nil)
// Setup our internal subscriptions.
if err := n.createInternalSubs(); err != nil {
n.shutdown(true)
return nil, err
}
// Make sure to track ourselves.
n.trackPeer(n.id)
// Track known peers
@@ -460,6 +465,12 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
}
}
// Setup our internal subscriptions.
if err := n.createInternalSubs(); err != nil {
n.shutdown(true)
return nil, err
}
n.debug("Started")
n.Lock()
@@ -1344,9 +1355,10 @@ func (n *raft) shutdown(shouldDelete bool) {
}
}
// Lock should be held (due to use of random generator)
func (n *raft) newInbox() string {
var b [replySuffixLen]byte
rn := rand.Int63()
rn := n.prand.Int63()
for i, l := 0, rn; i < len(b); i++ {
b[i] = digits[l%base]
l /= base
@@ -1377,6 +1389,8 @@ func (n *raft) unsubscribe(sub *subscription) {
}
func (n *raft) createInternalSubs() error {
n.Lock()
defer n.Unlock()
n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox()
n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox()
n.psubj = fmt.Sprintf(raftPropSubj, n.group)