diff --git a/server/client.go b/server/client.go index 7fc01ca2..3dbd290c 100644 --- a/server/client.go +++ b/server/client.go @@ -3157,9 +3157,11 @@ var needFlush = struct{}{} // deliverMsg will deliver a message to a matching subscription and its underlying client. // We process all connection/client types. mh is the part that will be protocol/client specific. func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) bool { - if sub.client == nil { + // Check sub client and check echo + if sub.client == nil || c == sub.client && !sub.client.echo { return false } + client := sub.client client.mu.Lock() @@ -4211,7 +4213,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // delivery subject for clients var dsubj []byte // Used as scratch if mapping - var _dsubj [64]byte + var _dsubj [128]byte // For stats, we will keep track of the number of messages that have been // delivered and then multiply by the size of that message and update diff --git a/server/consumer.go b/server/consumer.go index 3f4215da..89636643 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1799,7 +1799,7 @@ func newJSAckMsg(subj, reply string, hdr int, msg []byte) *jsAckMsg { } else { m = &jsAckMsg{} } - // When getting something from a pool it is criticical that all fields are + // When getting something from a pool it is critical that all fields are // initialized. Doing this way guarantees that if someone adds a field to // the structure, the compiler will fail the build if this line is not updated. (*m) = jsAckMsg{subj, reply, hdr, msg} @@ -1899,7 +1899,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { const maxBatch = 256 * 1024 var entries []*Entry for sz := 0; proposal != nil; proposal = proposal.next { - entries = append(entries, &Entry{EntryNormal, proposal.data}) + entry := entryPool.Get().(*Entry) + entry.Type, entry.Data = EntryNormal, proposal.data + entries = append(entries, entry) sz += len(proposal.data) if sz > maxBatch { node.ProposeDirect(entries) diff --git a/server/events.go b/server/events.go index d624392d..3e416a9a 100644 --- a/server/events.go +++ b/server/events.go @@ -2300,7 +2300,7 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, _ *Account, s if !s.eventsRunning() { return } - rl := remoteLatency{} + var rl remoteLatency if err := json.Unmarshal(msg, &rl); err != nil { s.Errorf("Error unmarshalling remote latency measurement: %v", err) return @@ -2322,25 +2322,22 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, _ *Account, s acc.mu.RUnlock() return } - m1 := si.m1 - m2 := rl.M2 - lsub := si.latency.subject acc.mu.RUnlock() + si.acc.mu.Lock() + m1 := si.m1 + m2 := rl.M2 + // So we have not processed the response tracking measurement yet. if m1 == nil { - acc.mu.Lock() - // Double check since could have slipped in. - m1 = si.m1 - if m1 == nil { - // Store our value there for them to pick up. - si.m1 = &m2 - } - acc.mu.Unlock() - if m1 == nil { - return - } + // Store our value there for them to pick up. + si.m1 = &m2 + } + si.acc.mu.Unlock() + + if m1 == nil { + return } // Calculate the correct latencies given M1 and M2. diff --git a/server/filestore.go b/server/filestore.go index 0fa4c7c6..0a245637 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4719,31 +4719,50 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store return sm, nil } +// Used to intern strings for subjects. +// Based on idea from https://github.com/josharian/intern/blob/master/intern.go +var subjPool = sync.Pool{ + New: func() any { + return make(map[string]string) + }, +} + +// Get an interned string from a byte slice. +func subjFromBytes(b []byte) string { + sm := subjPool.Get().(map[string]string) + defer subjPool.Put(sm) + subj, ok := sm[string(b)] + if ok { + return subj + } + s := string(b) + sm[s] = s + return s +} + // Given the `key` byte slice, this function will return the subject -// as a copy of `key` or a configured subject as to minimize memory allocations. +// as an interned string of `key` or a configured subject as to minimize memory allocations. // Lock should be held. func (mb *msgBlock) subjString(skey []byte) string { if len(skey) == 0 { return _EMPTY_ } - key := string(skey) if lsubjs := len(mb.fs.cfg.Subjects); lsubjs > 0 { if lsubjs == 1 { // The cast for the comparison does not make a copy - if key == mb.fs.cfg.Subjects[0] { + if string(skey) == mb.fs.cfg.Subjects[0] { return mb.fs.cfg.Subjects[0] } } else { for _, subj := range mb.fs.cfg.Subjects { - if key == subj { + if string(skey) == subj { return subj } } } } - - return key + return subjFromBytes(skey) } // LoadMsg will lookup the message by sequence number and return it if found. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1f6c163a..ed8c4c62 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1138,6 +1138,7 @@ func (js *jetStream) monitorCluster() { } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } + ce.ReturnToPool() } } aq.recycle(&ces) @@ -2101,6 +2102,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { // Update our applied. ne, nb = n.Applied(ce.Index) + ce.ReturnToPool() } else { s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err) if isClusterResetErr(err) { @@ -2467,39 +2469,39 @@ func (mset *stream) resetClusteredState(err error) bool { // Preserve our current state and messages unless we have a first sequence mismatch. shouldDelete := err == errFirstSequenceMismatch - mset.monitorWg.Wait() - mset.resetAndWaitOnConsumers() - // Stop our stream. - mset.stop(shouldDelete, false) + // Need to do the rest in a separate Go routine. + go func() { + mset.monitorWg.Wait() + mset.resetAndWaitOnConsumers() + // Stop our stream. + mset.stop(shouldDelete, false) - if sa != nil { - js.mu.Lock() - s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) - // Now wipe groups from assignments. - sa.Group.node = nil - var consumers []*consumerAssignment - if cc := js.cluster; cc != nil && cc.meta != nil { - ourID := cc.meta.ID() - for _, ca := range sa.consumers { - if rg := ca.Group; rg != nil && rg.isMember(ourID) { - rg.node = nil // Erase group raft/node state. - consumers = append(consumers, ca) + if sa != nil { + js.mu.Lock() + s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) + // Now wipe groups from assignments. + sa.Group.node = nil + var consumers []*consumerAssignment + if cc := js.cluster; cc != nil && cc.meta != nil { + ourID := cc.meta.ID() + for _, ca := range sa.consumers { + if rg := ca.Group; rg != nil && rg.isMember(ourID) { + rg.node = nil // Erase group raft/node state. + consumers = append(consumers, ca) + } } } - } - js.mu.Unlock() + js.mu.Unlock() - // restart in a separate Go routine. - // This will reset the stream and consumers. - go func() { + // This will reset the stream and consumers. // Reset stream. js.processClusterCreateStream(acc, sa) // Reset consumers. for _, ca := range consumers { js.processClusterCreateConsumer(ca, nil, false) } - }() - } + } + }() return true } @@ -4272,6 +4274,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } else if !recovering { if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { ne, nb := n.Applied(ce.Index) + ce.ReturnToPool() // If we have at least min entries to compact, go ahead and snapshot/compact. if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { doSnapshot(false) @@ -7150,7 +7153,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Check to see if we are being overrun. // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. if mset.clseq-(lseq+clfs) > streamLagWarnThreshold { - lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, mset.cfg.Name) + lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, name) s.RateLimitWarnf(lerr.Error()) } mset.clMu.Unlock() diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index ee3d247d..599a25e0 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -5238,12 +5238,14 @@ func TestJetStreamClusterDeleteAndRestoreAndRestart(t *testing.T) { nc, js = jsClientConnect(t, c.randomServer()) defer nc.Close() - si, err := js.StreamInfo("TEST") - require_NoError(t, err) - - if si.State.Msgs != 22 { - t.Fatalf("State is not correct after restart") - } + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.State.Msgs != 22 { + return fmt.Errorf("State is not correct after restart, expected 22 msgs, got %d", si.State.Msgs) + } + return nil + }) ci, err := js.ConsumerInfo("TEST", "dlc") require_NoError(t, err) diff --git a/server/norace_test.go b/server/norace_test.go index 07207e77..4804ce78 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -2423,7 +2423,7 @@ func TestNoRaceJetStreamSlowFilteredInititalPendingAndFirstMsg(t *testing.T) { }) // Threshold for taking too long. - const thresh = 50 * time.Millisecond + const thresh = 100 * time.Millisecond var dindex int testConsumerCreate := func(subj string, startSeq, expectedNumPending uint64) { @@ -3799,10 +3799,12 @@ func TestNoRaceJetStreamClusterStreamReset(t *testing.T) { return err }) - // Grab number go routines. - if after := runtime.NumGoroutine(); base > after { - t.Fatalf("Expected %d go routines, got %d", base, after) - } + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + if after := runtime.NumGoroutine(); base > after { + return fmt.Errorf("Expected %d go routines, got %d", base, after) + } + return nil + }) // Simulate a low level write error on our consumer and make sure we can recover etc. cl = c.consumerLeader("$G", "TEST", "d1") @@ -6614,11 +6616,11 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) { wg := sync.WaitGroup{} for i := 0; i < numSourceStreams; i++ { sname := fmt.Sprintf("EVENT-%s", nuid.Next()) + sources = append(sources, sname) wg.Add(1) go func(stream string) { defer wg.Done() t.Logf(" %q", stream) - sources = append(sources, stream) subj := fmt.Sprintf("%s.>", stream) _, err := js.AddStream(&nats.StreamConfig{ Name: stream, diff --git a/server/raft.go b/server/raft.go index c3fdd00a..a8641601 100644 --- a/server/raft.go +++ b/server/raft.go @@ -665,7 +665,7 @@ func (n *raft) Propose(data []byte) error { prop := n.prop n.RUnlock() - prop.push(&Entry{EntryNormal, data}) + prop.push(newEntry(EntryNormal, data)) return nil } @@ -717,7 +717,7 @@ func (n *raft) ProposeAddPeer(peer string) error { prop := n.prop n.RUnlock() - prop.push(&Entry{EntryAddPeer, []byte(peer)}) + prop.push(newEntry(EntryAddPeer, []byte(peer))) return nil } @@ -750,7 +750,7 @@ func (n *raft) ProposeRemovePeer(peer string) error { } if isLeader { - prop.push(&Entry{EntryRemovePeer, []byte(peer)}) + prop.push(newEntry(EntryRemovePeer, []byte(peer))) n.doRemovePeerAsLeader(peer) return nil } @@ -1090,7 +1090,7 @@ func (n *raft) setupLastSnapshot() { n.pterm = snap.lastTerm n.commit = snap.lastIndex n.applied = snap.lastIndex - n.apply.push(&CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}}) + n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) } @@ -1351,7 +1351,7 @@ func (n *raft) StepDown(preferred ...string) error { // If we have a new leader selected, transfer over to them. if maybeLeader != noLeader { n.debug("Selected %q for new leader", maybeLeader) - prop.push(&Entry{EntryLeaderTransfer, []byte(maybeLeader)}) + prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader))) time.AfterFunc(250*time.Millisecond, func() { stepdown.push(noLeader) }) } else { // Force us to stepdown here. @@ -1817,12 +1817,60 @@ func (n *raft) runAsFollower() { } } +// Pool for CommitedEntry re-use. +var cePool = sync.Pool{ + New: func() any { + return &CommittedEntry{} + }, +} + // CommitEntry is handed back to the user to apply a commit to their FSM. type CommittedEntry struct { Index uint64 Entries []*Entry } +// Create a new ComittedEntry. +func newCommittedEntry(index uint64, entries []*Entry) *CommittedEntry { + ce := cePool.Get().(*CommittedEntry) + ce.Index, ce.Entries = index, entries + return ce +} + +func (ce *CommittedEntry) ReturnToPool() { + if ce == nil { + return + } + if len(ce.Entries) > 0 { + for _, e := range ce.Entries { + entryPool.Put(e) + } + } + ce.Index, ce.Entries = 0, nil + cePool.Put(ce) +} + +// Pool for Entry re-use. +var entryPool = sync.Pool{ + New: func() any { + return &Entry{} + }, +} + +// Helper to create new entries. +func newEntry(t EntryType, data []byte) *Entry { + entry := entryPool.Get().(*Entry) + entry.Type, entry.Data = t, data + return entry +} + +// Pool for appendEntry re-use. +var aePool = sync.Pool{ + New: func() any { + return &appendEntry{} + }, +} + // appendEntry is the main struct that is used to sync raft peers. type appendEntry struct { leader string @@ -1837,6 +1885,20 @@ type appendEntry struct { buf []byte } +// Create a new appendEntry. +func newAppendEntry(leader string, term, commit, pterm, pindex uint64, entries []*Entry) *appendEntry { + ae := aePool.Get().(*appendEntry) + ae.leader, ae.term, ae.commit, ae.pterm, ae.pindex, ae.entries = leader, term, commit, pterm, pindex, entries + ae.reply, ae.sub, ae.buf = _EMPTY_, nil, nil + return ae +} + +// Will return this append entry, and its interior entries to their respective pools. +func (ae *appendEntry) returnToPool() { + ae.entries, ae.buf, ae.sub, ae.reply = nil, nil, nil, _EMPTY_ + aePool.Put(ae) +} + type EntryType uint8 const ( @@ -1928,15 +1990,10 @@ func (n *raft) decodeAppendEntry(msg []byte, sub *subscription, reply string) (* } var le = binary.LittleEndian - ae := &appendEntry{ - leader: string(msg[:idLen]), - term: le.Uint64(msg[8:]), - commit: le.Uint64(msg[16:]), - pterm: le.Uint64(msg[24:]), - pindex: le.Uint64(msg[32:]), - sub: sub, - reply: reply, - } + + ae := newAppendEntry(string(msg[:idLen]), le.Uint64(msg[8:]), le.Uint64(msg[16:]), le.Uint64(msg[24:]), le.Uint64(msg[32:]), nil) + ae.reply, ae.sub = reply, sub + // Decode Entries. ne, ri := int(le.Uint16(msg[40:])), 42 for i, max := 0, len(msg); i < ne; i++ { @@ -1948,28 +2005,43 @@ func (n *raft) decodeAppendEntry(msg []byte, sub *subscription, reply string) (* if le <= 0 || ri+le > max { return nil, errBadAppendEntry } - etype := EntryType(msg[ri]) - ae.entries = append(ae.entries, &Entry{etype, msg[ri+1 : ri+le]}) + entry := newEntry(EntryType(msg[ri]), msg[ri+1:ri+le]) + ae.entries = append(ae.entries, entry) ri += le } ae.buf = msg return ae, nil } -// appendEntryResponse is our response to a received appendEntry. -type appendEntryResponse struct { - term uint64 - index uint64 - peer string - success bool - // internal - reply string +// Pool for appendEntryResponse re-use. +var arPool = sync.Pool{ + New: func() any { + return &appendEntryResponse{} + }, } // We want to make sure this does not change from system changing length of syshash. const idLen = 8 const appendEntryResponseLen = 24 + 1 +// appendEntryResponse is our response to a received appendEntry. +type appendEntryResponse struct { + term uint64 + index uint64 + peer string + reply string // internal usage. + success bool +} + +// Create a new appendEntryResponse. +func newAppendEntryResponse(term, index uint64, peer string, success bool) *appendEntryResponse { + ar := arPool.Get().(*appendEntryResponse) + ar.term, ar.index, ar.peer, ar.success = term, index, peer, success + // Always empty out. + ar.reply = _EMPTY_ + return ar +} + func (ar *appendEntryResponse) encode(b []byte) []byte { var buf []byte if cap(b) >= appendEntryResponseLen { @@ -1989,16 +2061,25 @@ func (ar *appendEntryResponse) encode(b []byte) []byte { return buf[:appendEntryResponseLen] } +// Track all peers we may have ever seen to use an string interns for appendEntryResponse decoding. +var peers sync.Map + func (n *raft) decodeAppendEntryResponse(msg []byte) *appendEntryResponse { if len(msg) != appendEntryResponseLen { return nil } var le = binary.LittleEndian - ar := &appendEntryResponse{ - term: le.Uint64(msg[0:]), - index: le.Uint64(msg[8:]), - peer: string(msg[16 : 16+idLen]), + ar := arPool.Get().(*appendEntryResponse) + ar.term = le.Uint64(msg[0:]) + ar.index = le.Uint64(msg[8:]) + + peer, ok := peers.Load(string(msg[16 : 16+idLen])) + if !ok { + // We missed so store inline here. + peer = string(msg[16 : 16+idLen]) + peers.Store(peer, peer) } + ar.peer = peer.(string) ar.success = msg[24] == 1 return ar } @@ -2015,8 +2096,6 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ n.warn("Received invalid peer name for remove proposal: %q", msg) return } - // Need to copy since this is underlying client/route buffer. - peer := string(copyBytes(msg)) n.RLock() prop, werr := n.prop, n.werr @@ -2027,7 +2106,9 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ return } - prop.push(&Entry{EntryRemovePeer, []byte(peer)}) + // Need to copy since this is underlying client/route buffer. + peer := copyBytes(msg) + prop.push(newEntry(EntryRemovePeer, peer)) } // Called when a peer has forwarded a proposal. @@ -2048,7 +2129,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account, return } - prop.push(&Entry{EntryNormal, msg}) + prop.push(newEntry(EntryNormal, msg)) } func (n *raft) runAsLeader() { @@ -2229,6 +2310,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64 n.RUnlock() defer s.grWG.Done() + defer arPool.Put(ar) defer func() { n.Lock() @@ -2364,6 +2446,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { if lastIndex, err := n.sendSnapshotToFollower(ar.reply); err != nil { n.error("Error sending snapshot to follower [%s]: %v", ar.peer, err) n.Unlock() + arPool.Put(ar) return } else { start = lastIndex + 1 @@ -2371,6 +2454,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { if state.Msgs == 0 || start > state.LastSeq { n.debug("Finished catching up") n.Unlock() + arPool.Put(ar) return } n.debug("Snapshot sent, reset first catchup entry to %d", lastIndex) @@ -2385,6 +2469,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { if err != nil || ae == nil { n.warn("Could not find a starting entry for catchup request: %v", err) n.Unlock() + arPool.Put(ar) return } if ae.pindex != ar.index || ae.pterm != ar.term { @@ -2460,7 +2545,7 @@ func (n *raft) applyCommit(index uint64) error { committed = append(committed, e) case EntryOldSnapshot: // For old snapshots in our WAL. - committed = append(committed, &Entry{EntrySnapshot, e.Data}) + committed = append(committed, newEntry(EntrySnapshot, e.Data)) case EntrySnapshot: committed = append(committed, e) case EntryPeerState: @@ -2473,6 +2558,9 @@ func (n *raft) applyCommit(index uint64) error { newPeer := string(e.Data) n.debug("Added peer %q", newPeer) + // Store our peer in our global peer map for all peers. + peers.LoadOrStore(newPeer, newPeer) + // If we were on the removed list reverse that here. if n.removed != nil { delete(n.removed, newPeer) @@ -2515,6 +2603,9 @@ func (n *raft) applyCommit(index uint64) error { n.stepdown.push(n.selectNextLeader()) } + // Remove from string intern map. + peers.Delete(peer) + // We pass these up as well. committed = append(committed, e) } @@ -2524,11 +2615,13 @@ func (n *raft) applyCommit(index uint64) error { if fpae { delete(n.pae, index) } - n.apply.push(&CommittedEntry{index, committed}) + n.apply.push(newCommittedEntry(index, committed)) } else { // If we processed inline update our applied index. n.applied = index } + // Place back in the pool. + ae.returnToPool() return nil } @@ -2841,9 +2934,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.stepdown.push(ae.leader) } else { // Let them know we are the leader. - ar := &appendEntryResponse{n.term, n.pindex, n.id, false, _EMPTY_} + ar := newAppendEntryResponse(n.term, n.pindex, n.id, false) n.debug("AppendEntry ignoring old term from another leader") n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) + arPool.Put(ar) } // Always return here from processing. n.Unlock() @@ -2897,11 +2991,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if n.catchupStalled() { n.debug("Catchup may be stalled, will request again") inbox = n.createCatchup(ae) - ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_} + ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false) } n.Unlock() if ar != nil { n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) + arPool.Put(ar) } // Ignore new while catching up or replaying. return @@ -2949,9 +3044,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.cancelCatchup() // Create response. - ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, success, _EMPTY_} + ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success) n.Unlock() n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) + arPool.Put(ar) return } @@ -2999,7 +3095,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. - n.apply.push(&CommittedEntry{n.commit, ae.entries[:1]}) + n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) n.Unlock() return @@ -3010,9 +3106,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if ae.pindex > n.pindex { // Setup our state for catching up. inbox := n.createCatchup(ae) - ar := &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_} + ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) n.Unlock() n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) + arPool.Put(ar) return } } @@ -3066,6 +3163,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } else { n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false} } + // Store our peer in our global peer map for all peers. + peers.LoadOrStore(newPeer, newPeer) } } } @@ -3086,13 +3185,14 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { var ar *appendEntryResponse if sub != nil { - ar = &appendEntryResponse{n.pterm, n.pindex, n.id, true, _EMPTY_} + ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true) } n.Unlock() // Success. Send our response. if ar != nil { n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) + arPool.Put(ar) } } @@ -3122,6 +3222,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { if ar.success { n.trackResponse(ar) + arPool.Put(ar) } else if ar.term > n.term { // False here and they have a higher term. n.Lock() @@ -3132,6 +3233,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { n.stepdown.push(noLeader) n.resetWAL() n.Unlock() + arPool.Put(ar) } else if ar.reply != _EMPTY_ { n.catchupFollower(ar) } @@ -3139,14 +3241,13 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { // handleAppendEntryResponse processes responses to append entries. func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { - msg = copyBytes(msg) ar := n.decodeAppendEntryResponse(msg) ar.reply = reply n.resp.push(ar) } func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { - return &appendEntry{n.id, n.term, n.commit, n.pterm, n.pindex, entries, _EMPTY_, nil, nil} + return newAppendEntry(n.id, n.term, n.commit, n.pterm, n.pindex, entries) } // Determine if we should store an entry. @@ -3210,7 +3311,8 @@ func (n *raft) sendAppendEntry(entries []*Entry) { } // If we have entries store this in our wal. - if ae.shouldStore() { + shouldStore := ae.shouldStore() + if shouldStore { if err := n.storeToWAL(ae); err != nil { return } @@ -3225,6 +3327,9 @@ func (n *raft) sendAppendEntry(entries []*Entry) { } } n.sendRPC(n.asubj, n.areply, ae.buf) + if !shouldStore { + ae.returnToPool() + } } type extensionState uint16 diff --git a/server/sendq.go b/server/sendq.go index 2c413971..0287c554 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -50,6 +50,14 @@ func (sq *sendq) internalLoop() { defer c.closeConnection(ClientClosed) + // To optimize for not converting a string to a []byte slice. + var ( + subj [256]byte + rply [256]byte + szb [10]byte + hdb [10]byte + ) + for s.isRunning() { select { case <-s.quitCh: @@ -57,14 +65,18 @@ func (sq *sendq) internalLoop() { case <-q.ch: pms := q.pop() for _, pm := range pms { - c.pa.subject = []byte(pm.subj) + c.pa.subject = append(subj[:0], pm.subj...) c.pa.size = len(pm.msg) + len(pm.hdr) - c.pa.szb = []byte(strconv.Itoa(c.pa.size)) - c.pa.reply = []byte(pm.rply) + c.pa.szb = append(szb[:0], strconv.Itoa(c.pa.size)...) + if len(pm.rply) > 0 { + c.pa.reply = append(rply[:0], pm.rply...) + } else { + c.pa.reply = nil + } var msg []byte if len(pm.hdr) > 0 { c.pa.hdr = len(pm.hdr) - c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + c.pa.hdb = append(hdb[:0], strconv.Itoa(c.pa.hdr)...) msg = append(pm.hdr, pm.msg...) msg = append(msg, _CRLF_...) } else { @@ -74,6 +86,7 @@ func (sq *sendq) internalLoop() { } c.processInboundClientMsg(msg) c.pa.szb = nil + outMsgPool.Put(pm) } // TODO: should this be in the for-loop instead? c.flushClients(0) @@ -82,8 +95,20 @@ func (sq *sendq) internalLoop() { } } +var outMsgPool = sync.Pool{ + New: func() any { + return &outMsg{} + }, +} + func (sq *sendq) send(subj, rply string, hdr, msg []byte) { - out := &outMsg{subj, rply, nil, nil} + if sq == nil { + return + } + out := outMsgPool.Get().(*outMsg) + out.subj, out.rply = subj, rply + out.hdr, out.msg = nil, nil + // We will copy these for now. if len(hdr) > 0 { hdr = copyBytes(hdr) diff --git a/server/stream.go b/server/stream.go index 1e77922d..9af331a1 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4351,16 +4351,29 @@ func (mset *stream) internalLoop() { // This should be rarely used now so can be smaller. var _r [1024]byte + // To optimize for not converting a string to a []byte slice. + var ( + subj [256]byte + dsubj [256]byte + rply [256]byte + szb [10]byte + hdb [10]byte + ) + for { select { case <-outq.ch: pms := outq.pop() for _, pm := range pms { - c.pa.subject = []byte(pm.dsubj) - c.pa.deliver = []byte(pm.subj) + c.pa.subject = append(dsubj[:0], pm.dsubj...) + c.pa.deliver = append(subj[:0], pm.subj...) c.pa.size = len(pm.msg) + len(pm.hdr) - c.pa.szb = []byte(strconv.Itoa(c.pa.size)) - c.pa.reply = []byte(pm.reply) + c.pa.szb = append(szb[:0], strconv.Itoa(c.pa.size)...) + if len(pm.reply) > 0 { + c.pa.reply = append(rply[:0], pm.reply...) + } else { + c.pa.reply = nil + } // If we have an underlying buf that is the wire contents for hdr + msg, else construct on the fly. var msg []byte @@ -4383,6 +4396,7 @@ func (mset *stream) internalLoop() { if len(pm.hdr) > 0 { c.pa.hdr = len(pm.hdr) c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + c.pa.hdb = append(hdb[:0], strconv.Itoa(c.pa.hdr)...) } else { c.pa.hdr = -1 c.pa.hdb = nil @@ -5005,13 +5019,13 @@ func (mset *stream) clearPreAck(o *consumer, seq uint64) { // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. func (mset *stream) ackMsg(o *consumer, seq uint64) { - if seq == 0 || mset.cfg.Retention == LimitsPolicy { + if seq == 0 { return } // Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers. mset.mu.Lock() - if mset.closed || mset.store == nil { + if mset.closed || mset.store == nil || mset.cfg.Retention == LimitsPolicy { mset.mu.Unlock() return }