Replaced catchup and stream restore channels

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-12-24 19:02:31 -07:00
parent 645a9a14b7
commit 62a07adeb9
3 changed files with 47 additions and 63 deletions

View File

@@ -2724,7 +2724,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
// For signaling to upper layers.
resultCh := make(chan result, 1)
activeCh := make(chan int, 32)
activeQ := newIPQueue() // of int
var total int
@@ -2776,7 +2776,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
return
}
activeCh <- len(msg)
activeQ.push(len(msg))
s.sendInternalAccountMsg(acc, reply, nil)
}
@@ -2862,7 +2862,8 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
// Signal to the upper layers.
doneCh <- err
return
case n := <-activeCh:
case <-activeQ.ch:
n := activeQ.popOne().(int)
total += n
notActive.Reset(activityInterval)
case <-notActive.C:

View File

@@ -5074,21 +5074,14 @@ RETRY:
reply string
}
sz := int(sreq.LastSeq-sreq.FirstSeq) + 1
msgsC := make(chan *im, sz)
msgsQ := newIPQueue() // of *im
// Send our catchup request here.
reply := syncReplySubject()
sub, err = s.sysSubscribe(reply, func(_ *subscription, _ *client, _ *Account, _, reply string, msg []byte) {
// Make copies
// TODO(dlc) - Since we are using a buffer from the inbound client/route.
select {
case msgsC <- &im{copyBytes(msg), reply}:
default:
s.Warnf("Failed to place catchup message onto internal channel: %d pending", len(msgsC))
return
}
msgsQ.push(&im{copyBytes(msg), reply})
})
if err != nil {
s.Errorf("Could not subscribe to stream catchup: %v", err)
@@ -5105,34 +5098,40 @@ RETRY:
// Run our own select loop here.
for qch, lch := n.QuitC(), n.LeadChangeC(); ; {
select {
case mrec := <-msgsC:
case <-msgsQ.ch:
notActive.Reset(activityInterval)
msg := mrec.msg
// Check for eof signaling.
if len(msg) == 0 {
return nil
}
if lseq, err := mset.processCatchupMsg(msg); err == nil {
if lseq >= last {
mrecs := msgsQ.pop()
for _, mreci := range mrecs {
mrec := mreci.(*im)
msg := mrec.msg
// Check for eof signaling.
if len(msg) == 0 {
return nil
}
} else if isOutOfSpaceErr(err) {
return err
} else if err == NewJSInsufficientResourcesError() {
if mset.js.limitsExceeded(mset.cfg.Storage) {
s.resourcesExeededError()
if lseq, err := mset.processCatchupMsg(msg); err == nil {
if lseq >= last {
return nil
}
} else if isOutOfSpaceErr(err) {
return err
} else if err == NewJSInsufficientResourcesError() {
if mset.js.limitsExceeded(mset.cfg.Storage) {
s.resourcesExeededError()
} else {
s.Warnf("Catchup for stream '%s > %s' errored, account resources exceeded: %v", mset.account(), mset.name(), err)
}
return err
} else {
s.Warnf("Catchup for stream '%s > %s' errored, account resources exceeded: %v", mset.account(), mset.name(), err)
s.Warnf("Catchup for stream '%s > %s' errored, will retry: %v", mset.account(), mset.name(), err)
goto RETRY
}
if mrec.reply != _EMPTY_ {
s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil)
}
return err
} else {
s.Warnf("Catchup for stream '%s > %s' errored, will retry: %v", mset.account(), mset.name(), err)
goto RETRY
}
if mrec.reply != _EMPTY_ {
s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil)
}
msgsQ.recycle(&mrecs)
case <-notActive.C:
s.Warnf("Catchup for stream '%s > %s' stalled", mset.account(), mset.name())
notActive.Reset(activityInterval)

View File

@@ -181,22 +181,22 @@ type raft struct {
catchup *catchupState
// For leader or server catching up a follower.
progress map[string]chan uint64
progress map[string]*ipQueue // of uint64
// For when we have paused our applyC.
paused bool
hcommit uint64
// Channels
// Queues and Channels
propc *ipQueue // of *Entry
entryc *ipQueue // of *appendEntry
respc *ipQueue // of *appendEntryResponse
applyc *ipQueue // of *CommittedEntry
quit chan struct{}
reqs *ipQueue // of *voteRequest
votes *ipQueue // of *voteResponse
leadc chan bool
stepdown *ipQueue // of string
quit chan struct{}
}
// cacthupState structure that holds our subscription, and catchup term and index
@@ -1932,7 +1932,7 @@ func (n *raft) loadFirstEntry() (ae *appendEntry, err error) {
return n.loadEntry(state.FirstSeq)
}
func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesC <-chan uint64) {
func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue /* of uint64 */) {
n.RLock()
s, reply := n.s, n.areply
peer, subj, last := ar.peer, ar.reply, n.pindex
@@ -2004,7 +2004,8 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesC <-chan uint64)
case <-timeout.C:
n.debug("Catching up for %q stalled", peer)
return
case index := <-indexUpdatesC:
case <-indexUpdatesQ.ch:
index := indexUpdatesQ.popOne().(uint64)
// Update our activity timer.
timeout.Reset(activityInterval)
// Update outstanding total.
@@ -2051,15 +2052,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.debug("Being asked to catch up follower: %q", ar.peer)
n.Lock()
if n.progress == nil {
n.progress = make(map[string]chan uint64)
} else if ch, ok := n.progress[ar.peer]; ok {
n.progress = make(map[string]*ipQueue)
} else if q, ok := n.progress[ar.peer]; ok {
n.debug("Will cancel existing entry for catching up %q", ar.peer)
delete(n.progress, ar.peer)
// Try to pop them out but make sure to not block.
select {
case ch <- n.pindex:
default:
}
q.push(n.pindex)
}
// Check to make sure we have this entry.
start := ar.index + 1
@@ -2096,18 +2093,9 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
if ae.pindex != ar.index || ae.pterm != ar.term {
n.debug("Our first entry does not match request from follower")
}
// Create a chan for delivering updates from responses.
isz := 256
if ae.pindex > ar.index && ae.pindex-ar.index > uint64(isz) {
isz = int(ae.pindex - ar.index)
}
// Check if we already have one in place and its bigger.
if preCh, ok := n.progress[ar.peer]; ok && cap(preCh) > isz {
isz = cap(preCh)
}
indexUpdates := make(chan uint64, isz)
indexUpdates <- ae.pindex
// Create a queue for delivering updates from responses.
indexUpdates := newIPQueue() // of uint64
indexUpdates.push(ae.pindex)
n.progress[ar.peer] = indexUpdates
n.Unlock()
@@ -2256,12 +2244,8 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
}
// If we are tracking this peer as a catchup follower, update that here.
if indexUpdateC := n.progress[ar.peer]; indexUpdateC != nil {
select {
case indexUpdateC <- ar.index:
default:
n.warn("TrackResponse failed to place progress update on internal channel")
}
if indexUpdateQ := n.progress[ar.peer]; indexUpdateQ != nil {
indexUpdateQ.push(ar.index)
}
// Ignore items already committed.