Use sync.Map for peers vs internal storage for appendEntryResponses

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-07 08:16:42 -07:00
parent 1caa56a34f
commit 2ff6f18ccd

View File

@@ -1982,8 +1982,7 @@ type appendEntryResponse struct {
term uint64
index uint64
peer string
reply string // internal usage.
_pb_ [idLen]byte // internal bytes from wire for peer name.
reply string // internal usage.
success bool
}
@@ -2015,6 +2014,9 @@ func (ar *appendEntryResponse) encode(b []byte) []byte {
return buf[:appendEntryResponseLen]
}
// Track all peers we may have ever seen to use an string interns for appendEntryResponse decoding.
var peers sync.Map
func (n *raft) decodeAppendEntryResponse(msg []byte) *appendEntryResponse {
if len(msg) != appendEntryResponseLen {
return nil
@@ -2023,8 +2025,14 @@ func (n *raft) decodeAppendEntryResponse(msg []byte) *appendEntryResponse {
ar := arPool.Get().(*appendEntryResponse)
ar.term = le.Uint64(msg[0:])
ar.index = le.Uint64(msg[8:])
copy(ar._pb_[:idLen], msg[16:16+idLen])
ar.peer = string(ar._pb_[:])
peer, ok := peers.Load(string(msg[16 : 16+idLen]))
if !ok {
// We missed so store inline here.
peer = string(msg[16 : 16+idLen])
peers.Store(peer, peer)
}
ar.peer = peer.(string)
ar.success = msg[24] == 1
return ar
}
@@ -2500,6 +2508,9 @@ func (n *raft) applyCommit(index uint64) error {
newPeer := string(e.Data)
n.debug("Added peer %q", newPeer)
// Store our peer in our global peer map for all peers.
peers.LoadOrStore(newPeer, newPeer)
// If we were on the removed list reverse that here.
if n.removed != nil {
delete(n.removed, newPeer)
@@ -3099,6 +3110,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
} else {
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false}
}
// Store our peer in our global peer map for all peers.
peers.LoadOrStore(newPeer, newPeer)
}
}
}