mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Modified flow control for clustered mode.
Set channels into and out of RAFT layers to block. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -233,7 +233,7 @@ const (
|
||||
JsDeleteWaitTimeDefault = 5 * time.Second
|
||||
// JsFlowControlMaxPending specifies default pending bytes during flow control that can be
|
||||
// outstanding.
|
||||
JsFlowControlMaxPending = 64 * 1024 * 1024
|
||||
JsFlowControlMaxPending = 32 * 1024 * 1024
|
||||
)
|
||||
|
||||
func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
|
||||
@@ -572,7 +572,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
|
||||
mset.mu.Unlock()
|
||||
o.deleteWithoutAdvisory()
|
||||
return nil, fmt.Errorf("consumer requires interest for delivery subject when ephemeral")
|
||||
return nil, errNoInterest
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1734,8 +1734,11 @@ func (o *consumer) isFilteredMatch(subj string) bool {
|
||||
return subjectIsSubsetMatch(subj, o.cfg.FilterSubject)
|
||||
}
|
||||
|
||||
var errMaxAckPending = errors.New("max ack pending reached")
|
||||
var errBadConsumer = errors.New("consumer not valid")
|
||||
var (
|
||||
errMaxAckPending = errors.New("max ack pending reached")
|
||||
errBadConsumer = errors.New("consumer not valid")
|
||||
errNoInterest = errors.New("consumer requires interest for delivery subject when ephemeral")
|
||||
)
|
||||
|
||||
// Get next available message from underlying store.
|
||||
// Is partition aware and redeliver aware.
|
||||
|
||||
@@ -1309,7 +1309,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
|
||||
continue
|
||||
}
|
||||
// Apply our entries.
|
||||
//TODO mset may be nil see doSnapshot(). applyStreamEntries is sensitive to this
|
||||
// TODO mset may be nil see doSnapshot(). applyStreamEntries is sensitive to this.
|
||||
if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
|
||||
ne, nb := n.Applied(ce.Index)
|
||||
// If we have at least min entries to compact, go ahead and snapshot/compact.
|
||||
@@ -1463,6 +1463,17 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Check for flowcontrol here.
|
||||
mset.mu.Lock()
|
||||
if mset.fcr != nil {
|
||||
if rply := mset.fcr[ce.Index]; rply != _EMPTY_ {
|
||||
delete(mset.fcr, ce.Index)
|
||||
mset.outq.send(&jsPubMsg{rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
||||
}
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
|
||||
case deleteMsgOp:
|
||||
md, err := decodeMsgDelete(buf[1:])
|
||||
if err != nil {
|
||||
@@ -2378,6 +2389,10 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
|
||||
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
|
||||
}
|
||||
result.Response.Error = jsError(err)
|
||||
} else if err == errNoInterest {
|
||||
// This is a stranded ephemeral, let's clean this one up.
|
||||
subject := fmt.Sprintf(JSApiConsumerDeleteT, ca.Stream, ca.Name)
|
||||
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
||||
}
|
||||
js.mu.Unlock()
|
||||
|
||||
@@ -3949,19 +3964,22 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
|
||||
if mset.clseq == 0 {
|
||||
mset.clseq = mset.lseq
|
||||
}
|
||||
esm := encodeStreamMsg(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano())
|
||||
mset.clseq++
|
||||
|
||||
// Do proposal.
|
||||
err := mset.node.Propose(encodeStreamMsg(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano()))
|
||||
mset.mu.Unlock()
|
||||
err := mset.node.Propose(esm)
|
||||
if err != nil {
|
||||
mset.mu.Lock()
|
||||
mset.clseq--
|
||||
mset.mu.Unlock()
|
||||
if canRespond {
|
||||
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.cfg.Name}}
|
||||
resp.Error = &ApiError{Code: 503, Description: err.Error()}
|
||||
response, _ = json.Marshal(resp)
|
||||
}
|
||||
} else {
|
||||
mset.clseq++
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
|
||||
// If we errored out respond here.
|
||||
if err != nil && canRespond {
|
||||
|
||||
@@ -226,7 +226,6 @@ var (
|
||||
errUnknownPeer = errors.New("raft: unknown peer")
|
||||
errCorruptPeers = errors.New("raft: corrupt peer state")
|
||||
errStepdownFailed = errors.New("raft: stepdown failed")
|
||||
errFailedToApply = errors.New("raft: could not place apply entry")
|
||||
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
|
||||
errNodeClosed = errors.New("raft: node is closed")
|
||||
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
|
||||
@@ -344,9 +343,9 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
|
||||
reqs: make(chan *voteRequest, 8),
|
||||
votes: make(chan *voteResponse, 32),
|
||||
propc: make(chan *Entry, 8192),
|
||||
entryc: make(chan *appendEntry, 32768),
|
||||
entryc: make(chan *appendEntry, 8192),
|
||||
respc: make(chan *appendEntryResponse, 32768),
|
||||
applyc: make(chan *CommittedEntry, 32768),
|
||||
applyc: make(chan *CommittedEntry, 8192),
|
||||
leadc: make(chan bool, 8),
|
||||
stepdown: make(chan string, 8),
|
||||
}
|
||||
@@ -566,12 +565,9 @@ func (n *raft) Propose(data []byte) error {
|
||||
propc := n.propc
|
||||
n.RUnlock()
|
||||
|
||||
select {
|
||||
case propc <- &Entry{EntryNormal, data}:
|
||||
default:
|
||||
n.warn("Propose failed to be placed on internal channel")
|
||||
return errProposalFailed
|
||||
}
|
||||
// For entering and exiting the system, proposals and apply we
|
||||
// will block.
|
||||
propc <- &Entry{EntryNormal, data}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -583,11 +579,8 @@ func (n *raft) ForwardProposal(entry []byte) error {
|
||||
if n.Leader() {
|
||||
return n.Propose(entry)
|
||||
}
|
||||
n.RLock()
|
||||
subj := n.psubj
|
||||
n.RUnlock()
|
||||
|
||||
n.sendRPC(subj, _EMPTY_, entry)
|
||||
n.sendRPC(n.psubj, _EMPTY_, entry)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -690,6 +683,8 @@ func (n *raft) Compact(index uint64) error {
|
||||
// byte size that could be removed with a snapshot/compact.
|
||||
func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
// Ignore if already applied.
|
||||
if index > n.applied {
|
||||
n.applied = index
|
||||
@@ -700,7 +695,6 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
|
||||
if state.Msgs > 0 {
|
||||
bytes = entries * state.Bytes / state.Msgs
|
||||
}
|
||||
n.Unlock()
|
||||
return entries, bytes
|
||||
}
|
||||
|
||||
@@ -879,6 +873,8 @@ func (n *raft) setupLastSnapshot() {
|
||||
|
||||
// Set latest snapshot we have.
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
n.snapfile = latest
|
||||
snap, err := n.loadLastSnapshot()
|
||||
if err != nil {
|
||||
@@ -893,7 +889,6 @@ func (n *raft) setupLastSnapshot() {
|
||||
n.setWriteErrLocked(err)
|
||||
}
|
||||
}
|
||||
n.Unlock()
|
||||
}
|
||||
|
||||
// loadLastSnapshot will load and return our last snapshot.
|
||||
@@ -946,8 +941,9 @@ func (n *raft) Leader() bool {
|
||||
return false
|
||||
}
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.state == Leader
|
||||
isLeader := n.state == Leader
|
||||
n.RUnlock()
|
||||
return isLeader
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
@@ -1531,7 +1527,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _
|
||||
select {
|
||||
case propc <- &Entry{EntryRemovePeer, []byte(peer)}:
|
||||
default:
|
||||
n.warn("Failed to place peer removal proposal onto propose chan")
|
||||
n.warn("Failed to place peer removal proposal onto propose channel")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1772,8 +1768,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesC <-chan uint64)
|
||||
next = index
|
||||
}
|
||||
// Check if we are done.
|
||||
finished := index > last
|
||||
if finished || sendNext() {
|
||||
if index > last || sendNext() {
|
||||
n.debug("Finished catching up")
|
||||
return
|
||||
}
|
||||
@@ -1878,6 +1873,8 @@ func (n *raft) applyCommit(index uint64) error {
|
||||
delete(n.acks, index)
|
||||
}
|
||||
|
||||
var fpae bool
|
||||
|
||||
ae := n.pae[index]
|
||||
if ae == nil {
|
||||
var state StreamState
|
||||
@@ -1896,8 +1893,10 @@ func (n *raft) applyCommit(index uint64) error {
|
||||
n.commit = original
|
||||
return errEntryLoadFailed
|
||||
}
|
||||
} else {
|
||||
fpae = true
|
||||
}
|
||||
delete(n.pae, index)
|
||||
|
||||
ae.buf = nil
|
||||
|
||||
var committed []*Entry
|
||||
@@ -1946,13 +1945,14 @@ func (n *raft) applyCommit(index uint64) error {
|
||||
}
|
||||
// Pass to the upper layers if we have normal entries.
|
||||
if len(committed) > 0 {
|
||||
select {
|
||||
case n.applyc <- &CommittedEntry{index, committed}:
|
||||
default:
|
||||
n.debug("Failed to place committed entry onto our apply channel")
|
||||
n.commit = original
|
||||
return errFailedToApply
|
||||
if fpae {
|
||||
delete(n.pae, index)
|
||||
}
|
||||
// For entering and exiting the system, proposals and apply we
|
||||
// will block.
|
||||
n.Unlock()
|
||||
n.applyc <- &CommittedEntry{index, committed}
|
||||
n.Lock()
|
||||
} else {
|
||||
// If we processed inline update our applied index.
|
||||
n.applied = index
|
||||
@@ -2971,6 +2971,9 @@ func (n *raft) switchState(state RaftState) {
|
||||
if n.state == Leader && state != Leader {
|
||||
n.updateLeadChange(false)
|
||||
} else if state == Leader && n.state != Leader {
|
||||
if len(n.pae) > 0 {
|
||||
n.pae = make(map[uint64]*appendEntry)
|
||||
}
|
||||
n.updateLeadChange(true)
|
||||
}
|
||||
|
||||
|
||||
104
server/stream.go
104
server/stream.go
@@ -159,6 +159,9 @@ type stream struct {
|
||||
// Sources
|
||||
sources map[string]*sourceInfo
|
||||
|
||||
// For flowcontrol processing for source and mirror internal consumers.
|
||||
fcr map[uint64]string
|
||||
|
||||
// TODO(dlc) - Hide everything below behind two pointers.
|
||||
// Clustered mode.
|
||||
sa *streamAssignment
|
||||
@@ -987,30 +990,16 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo {
|
||||
return mset.sourceInfo(mset.mirror)
|
||||
}
|
||||
|
||||
// processClusteredMirrorMsg will propose the inbound mirrored message to the underlying raft group.
|
||||
func (mset *stream) processClusteredMirrorMsg(subject string, hdr, msg []byte, seq uint64, ts int64) error {
|
||||
mset.mu.RLock()
|
||||
node := mset.node
|
||||
mset.mu.RUnlock()
|
||||
// Do proposal.
|
||||
if node == nil {
|
||||
return nil
|
||||
}
|
||||
return node.Propose(encodeStreamMsg(subject, _EMPTY_, hdr, msg, seq, ts))
|
||||
}
|
||||
|
||||
const sourceHealthCheckInterval = 10 * time.Second
|
||||
|
||||
// Will run as a Go routine to process messages.
|
||||
// Will run as a Go routine to process mirror consumer messages.
|
||||
func (mset *stream) processMirrorMsgs() {
|
||||
s := mset.srv
|
||||
defer s.grWG.Done()
|
||||
|
||||
// Grab stream quit channel.
|
||||
mset.mu.RLock()
|
||||
msgs := mset.mirror.msgs
|
||||
mch := msgs.mch
|
||||
qch := mset.qch
|
||||
msgs, mch, qch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch
|
||||
mset.mu.RUnlock()
|
||||
|
||||
t := time.NewTicker(sourceHealthCheckInterval)
|
||||
@@ -1044,30 +1033,54 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) {
|
||||
mset.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if !mset.isLeader() {
|
||||
mset.mu.Unlock()
|
||||
mset.cancelMirrorConsumer()
|
||||
return
|
||||
}
|
||||
|
||||
mset.mirror.last = time.Now()
|
||||
node := mset.node
|
||||
|
||||
// Check for heartbeats and flow control messages.
|
||||
if len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) {
|
||||
// Flow controls have reply subjects.
|
||||
if m.rply != _EMPTY_ {
|
||||
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
||||
// If we are clustered we want to delay signaling back the the upstream consumer.
|
||||
if node != nil {
|
||||
index, _, _ := node.Progress()
|
||||
if mset.fcr == nil {
|
||||
mset.fcr = make(map[uint64]string)
|
||||
}
|
||||
mset.fcr[index] = m.rply
|
||||
} else {
|
||||
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
||||
}
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
sseq, _, _, ts, pending := replyInfo(m.rply)
|
||||
sseq, _, dc, ts, pending := replyInfo(m.rply)
|
||||
|
||||
if dc > 1 {
|
||||
mset.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Mirror info tracking.
|
||||
olag := mset.mirror.lag
|
||||
mset.mirror.lag = pending
|
||||
isClustered := mset.node != nil
|
||||
if pending == 0 {
|
||||
mset.mirror.lag = 0
|
||||
} else {
|
||||
mset.mirror.lag = pending - 1
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
|
||||
s := mset.srv
|
||||
var err error
|
||||
if isClustered {
|
||||
err = mset.processClusteredMirrorMsg(m.subj, m.hdr, m.msg, sseq-1, ts)
|
||||
if node != nil {
|
||||
err = node.Propose(encodeStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts))
|
||||
} else {
|
||||
err = mset.processJetStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts)
|
||||
}
|
||||
@@ -1240,12 +1253,10 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
|
||||
|
||||
go func() {
|
||||
var shouldRetry bool
|
||||
select {
|
||||
case ccr := <-respCh:
|
||||
if ccr.Error != nil {
|
||||
mset.cancelMirrorConsumer()
|
||||
shouldRetry = false
|
||||
// We will retry every 10 seconds or so
|
||||
time.AfterFunc(10*time.Second, mset.retryMirrorConsumer)
|
||||
} else {
|
||||
@@ -1258,9 +1269,6 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
}
|
||||
mset.setMirrorErr(ccr.Error)
|
||||
case <-time.After(2 * time.Second):
|
||||
shouldRetry = true
|
||||
}
|
||||
if shouldRetry {
|
||||
mset.resetMirrorConsumer()
|
||||
}
|
||||
}()
|
||||
@@ -1398,7 +1406,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
|
||||
|
||||
go func() {
|
||||
var shouldRetry bool
|
||||
select {
|
||||
case ccr := <-respCh:
|
||||
mset.mu.Lock()
|
||||
@@ -1407,10 +1414,9 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
if ccr.Error != nil {
|
||||
mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error)
|
||||
si.err = ccr.Error
|
||||
shouldRetry = false
|
||||
// We will retry every 5 seconds or so
|
||||
// We will retry every 10 seconds or so
|
||||
mset.cancelSourceConsumer(sname)
|
||||
time.AfterFunc(5*time.Second, func() { mset.retrySourceConsumer(sname) })
|
||||
time.AfterFunc(10*time.Second, func() { mset.retrySourceConsumer(sname) })
|
||||
} else {
|
||||
// Capture consumer name.
|
||||
si.cname = ccr.ConsumerInfo.Name
|
||||
@@ -1418,9 +1424,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
case <-time.After(2 * time.Second):
|
||||
shouldRetry = true
|
||||
}
|
||||
if shouldRetry {
|
||||
// Make sure things have not changed.
|
||||
mset.mu.Lock()
|
||||
if si := mset.sources[sname]; si != nil {
|
||||
@@ -1441,9 +1444,7 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
|
||||
|
||||
// Grab stream quit channel.
|
||||
mset.mu.RLock()
|
||||
msgs := si.msgs
|
||||
mch := msgs.mch
|
||||
qch := mset.qch
|
||||
msgs, mch, qch := si.msgs, si.msgs.mch, mset.qch
|
||||
mset.mu.RUnlock()
|
||||
|
||||
t := time.NewTicker(sourceHealthCheckInterval)
|
||||
@@ -1473,15 +1474,31 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
|
||||
|
||||
// processInboundSourceMsg handles processing other stream messages bound for this stream.
|
||||
func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
|
||||
|
||||
mset.mu.Lock()
|
||||
|
||||
if !mset.isLeader() {
|
||||
mset.mu.Unlock()
|
||||
mset.cancelSourceConsumer(si.name)
|
||||
return
|
||||
}
|
||||
|
||||
si.last = time.Now()
|
||||
node := mset.node
|
||||
|
||||
// Check for heartbeats and flow control messages.
|
||||
if len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) {
|
||||
// Flow controls have reply subjects.
|
||||
if m.rply != _EMPTY_ {
|
||||
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
||||
// If we are clustered we want to delay signaling back the the upstream consumer.
|
||||
if node != nil {
|
||||
index, _, _ := node.Progress()
|
||||
if mset.fcr == nil {
|
||||
mset.fcr = make(map[uint64]string)
|
||||
}
|
||||
mset.fcr[index] = m.rply
|
||||
} else {
|
||||
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
||||
}
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
return
|
||||
@@ -1503,6 +1520,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
|
||||
}
|
||||
} else {
|
||||
if dseq > si.dseq {
|
||||
// FIXME(dlc) - No rapid fire.
|
||||
mset.setSourceConsumer(si.name, si.sseq+1)
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
@@ -1514,7 +1532,6 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
|
||||
} else {
|
||||
si.lag = pending - 1
|
||||
}
|
||||
isClustered := mset.node != nil
|
||||
mset.mu.Unlock()
|
||||
|
||||
hdr, msg := m.hdr, m.msg
|
||||
@@ -1528,11 +1545,12 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
|
||||
|
||||
var err error
|
||||
// If we are clustered we need to propose this message to the underlying raft group.
|
||||
if isClustered {
|
||||
if node != nil {
|
||||
err = mset.processClusteredInboundMsg(m.subj, _EMPTY_, hdr, msg)
|
||||
} else {
|
||||
err = mset.processJetStreamMsg(m.subj, _EMPTY_, hdr, msg, 0, 0)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
s := mset.srv
|
||||
if err == errLastSeqMismatch {
|
||||
@@ -2405,8 +2423,8 @@ func (mset *stream) name() string {
|
||||
if mset == nil {
|
||||
return _EMPTY_
|
||||
}
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
mset.mu.RLock()
|
||||
defer mset.mu.RUnlock()
|
||||
return mset.cfg.Name
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user