From 673543c180fc856322477debc1a3685e14ee2ea1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 8 Mar 2021 12:49:57 -0600 Subject: [PATCH] Modified flow control for clustered mode. Set channels into and out of RAFT layers to block. Signed-off-by: Derek Collison --- server/consumer.go | 11 ++-- server/jetstream_cluster.go | 28 ++++++++-- server/raft.go | 57 ++++++++++---------- server/stream.go | 104 +++++++++++++++++++++--------------- 4 files changed, 121 insertions(+), 79 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 157ff23a..c067cd20 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -233,7 +233,7 @@ const ( JsDeleteWaitTimeDefault = 5 * time.Second // JsFlowControlMaxPending specifies default pending bytes during flow control that can be // outstanding. - JsFlowControlMaxPending = 64 * 1024 * 1024 + JsFlowControlMaxPending = 32 * 1024 * 1024 ) func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { @@ -572,7 +572,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) { mset.mu.Unlock() o.deleteWithoutAdvisory() - return nil, fmt.Errorf("consumer requires interest for delivery subject when ephemeral") + return nil, errNoInterest } } } @@ -1734,8 +1734,11 @@ func (o *consumer) isFilteredMatch(subj string) bool { return subjectIsSubsetMatch(subj, o.cfg.FilterSubject) } -var errMaxAckPending = errors.New("max ack pending reached") -var errBadConsumer = errors.New("consumer not valid") +var ( + errMaxAckPending = errors.New("max ack pending reached") + errBadConsumer = errors.New("consumer not valid") + errNoInterest = errors.New("consumer requires interest for delivery subject when ephemeral") +) // Get next available message from underlying store. // Is partition aware and redeliver aware. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ff86d6c2..61107fb8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1309,7 +1309,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { continue } // Apply our entries. - //TODO mset may be nil see doSnapshot(). applyStreamEntries is sensitive to this + // TODO mset may be nil see doSnapshot(). applyStreamEntries is sensitive to this. if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { ne, nb := n.Applied(ce.Index) // If we have at least min entries to compact, go ahead and snapshot/compact. @@ -1463,6 +1463,17 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco return err } } + + // Check for flowcontrol here. + mset.mu.Lock() + if mset.fcr != nil { + if rply := mset.fcr[ce.Index]; rply != _EMPTY_ { + delete(mset.fcr, ce.Index) + mset.outq.send(&jsPubMsg{rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) + } + } + mset.mu.Unlock() + case deleteMsgOp: md, err := decodeMsgDelete(buf[1:]) if err != nil { @@ -2378,6 +2389,10 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, } result.Response.Error = jsError(err) + } else if err == errNoInterest { + // This is a stranded ephemeral, let's clean this one up. + subject := fmt.Sprintf(JSApiConsumerDeleteT, ca.Stream, ca.Name) + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } js.mu.Unlock() @@ -3949,19 +3964,22 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ if mset.clseq == 0 { mset.clseq = mset.lseq } + esm := encodeStreamMsg(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano()) + mset.clseq++ // Do proposal. - err := mset.node.Propose(encodeStreamMsg(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano())) + mset.mu.Unlock() + err := mset.node.Propose(esm) if err != nil { + mset.mu.Lock() + mset.clseq-- + mset.mu.Unlock() if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.cfg.Name}} resp.Error = &ApiError{Code: 503, Description: err.Error()} response, _ = json.Marshal(resp) } - } else { - mset.clseq++ } - mset.mu.Unlock() // If we errored out respond here. if err != nil && canRespond { diff --git a/server/raft.go b/server/raft.go index 91dbf726..a367bcb7 100644 --- a/server/raft.go +++ b/server/raft.go @@ -226,7 +226,6 @@ var ( errUnknownPeer = errors.New("raft: unknown peer") errCorruptPeers = errors.New("raft: corrupt peer state") errStepdownFailed = errors.New("raft: stepdown failed") - errFailedToApply = errors.New("raft: could not place apply entry") errEntryLoadFailed = errors.New("raft: could not load entry from WAL") errNodeClosed = errors.New("raft: node is closed") errBadSnapName = errors.New("raft: snapshot name could not be parsed") @@ -344,9 +343,9 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { reqs: make(chan *voteRequest, 8), votes: make(chan *voteResponse, 32), propc: make(chan *Entry, 8192), - entryc: make(chan *appendEntry, 32768), + entryc: make(chan *appendEntry, 8192), respc: make(chan *appendEntryResponse, 32768), - applyc: make(chan *CommittedEntry, 32768), + applyc: make(chan *CommittedEntry, 8192), leadc: make(chan bool, 8), stepdown: make(chan string, 8), } @@ -566,12 +565,9 @@ func (n *raft) Propose(data []byte) error { propc := n.propc n.RUnlock() - select { - case propc <- &Entry{EntryNormal, data}: - default: - n.warn("Propose failed to be placed on internal channel") - return errProposalFailed - } + // For entering and exiting the system, proposals and apply we + // will block. + propc <- &Entry{EntryNormal, data} return nil } @@ -583,11 +579,8 @@ func (n *raft) ForwardProposal(entry []byte) error { if n.Leader() { return n.Propose(entry) } - n.RLock() - subj := n.psubj - n.RUnlock() - n.sendRPC(subj, _EMPTY_, entry) + n.sendRPC(n.psubj, _EMPTY_, entry) return nil } @@ -690,6 +683,8 @@ func (n *raft) Compact(index uint64) error { // byte size that could be removed with a snapshot/compact. func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { n.Lock() + defer n.Unlock() + // Ignore if already applied. if index > n.applied { n.applied = index @@ -700,7 +695,6 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { if state.Msgs > 0 { bytes = entries * state.Bytes / state.Msgs } - n.Unlock() return entries, bytes } @@ -879,6 +873,8 @@ func (n *raft) setupLastSnapshot() { // Set latest snapshot we have. n.Lock() + defer n.Unlock() + n.snapfile = latest snap, err := n.loadLastSnapshot() if err != nil { @@ -893,7 +889,6 @@ func (n *raft) setupLastSnapshot() { n.setWriteErrLocked(err) } } - n.Unlock() } // loadLastSnapshot will load and return our last snapshot. @@ -946,8 +941,9 @@ func (n *raft) Leader() bool { return false } n.RLock() - defer n.RUnlock() - return n.state == Leader + isLeader := n.state == Leader + n.RUnlock() + return isLeader } // Lock should be held. @@ -1531,7 +1527,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ select { case propc <- &Entry{EntryRemovePeer, []byte(peer)}: default: - n.warn("Failed to place peer removal proposal onto propose chan") + n.warn("Failed to place peer removal proposal onto propose channel") } } @@ -1772,8 +1768,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesC <-chan uint64) next = index } // Check if we are done. - finished := index > last - if finished || sendNext() { + if index > last || sendNext() { n.debug("Finished catching up") return } @@ -1878,6 +1873,8 @@ func (n *raft) applyCommit(index uint64) error { delete(n.acks, index) } + var fpae bool + ae := n.pae[index] if ae == nil { var state StreamState @@ -1896,8 +1893,10 @@ func (n *raft) applyCommit(index uint64) error { n.commit = original return errEntryLoadFailed } + } else { + fpae = true } - delete(n.pae, index) + ae.buf = nil var committed []*Entry @@ -1946,13 +1945,14 @@ func (n *raft) applyCommit(index uint64) error { } // Pass to the upper layers if we have normal entries. if len(committed) > 0 { - select { - case n.applyc <- &CommittedEntry{index, committed}: - default: - n.debug("Failed to place committed entry onto our apply channel") - n.commit = original - return errFailedToApply + if fpae { + delete(n.pae, index) } + // For entering and exiting the system, proposals and apply we + // will block. + n.Unlock() + n.applyc <- &CommittedEntry{index, committed} + n.Lock() } else { // If we processed inline update our applied index. n.applied = index @@ -2971,6 +2971,9 @@ func (n *raft) switchState(state RaftState) { if n.state == Leader && state != Leader { n.updateLeadChange(false) } else if state == Leader && n.state != Leader { + if len(n.pae) > 0 { + n.pae = make(map[uint64]*appendEntry) + } n.updateLeadChange(true) } diff --git a/server/stream.go b/server/stream.go index dc295dc3..3dd5caf9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -159,6 +159,9 @@ type stream struct { // Sources sources map[string]*sourceInfo + // For flowcontrol processing for source and mirror internal consumers. + fcr map[uint64]string + // TODO(dlc) - Hide everything below behind two pointers. // Clustered mode. sa *streamAssignment @@ -987,30 +990,16 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo { return mset.sourceInfo(mset.mirror) } -// processClusteredMirrorMsg will propose the inbound mirrored message to the underlying raft group. -func (mset *stream) processClusteredMirrorMsg(subject string, hdr, msg []byte, seq uint64, ts int64) error { - mset.mu.RLock() - node := mset.node - mset.mu.RUnlock() - // Do proposal. - if node == nil { - return nil - } - return node.Propose(encodeStreamMsg(subject, _EMPTY_, hdr, msg, seq, ts)) -} - const sourceHealthCheckInterval = 10 * time.Second -// Will run as a Go routine to process messages. +// Will run as a Go routine to process mirror consumer messages. func (mset *stream) processMirrorMsgs() { s := mset.srv defer s.grWG.Done() // Grab stream quit channel. mset.mu.RLock() - msgs := mset.mirror.msgs - mch := msgs.mch - qch := mset.qch + msgs, mch, qch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch mset.mu.RUnlock() t := time.NewTicker(sourceHealthCheckInterval) @@ -1044,30 +1033,54 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) { mset.mu.Unlock() return } + if !mset.isLeader() { + mset.mu.Unlock() + mset.cancelMirrorConsumer() + return + } mset.mirror.last = time.Now() + node := mset.node // Check for heartbeats and flow control messages. if len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) { + // Flow controls have reply subjects. if m.rply != _EMPTY_ { - mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) + // If we are clustered we want to delay signaling back the the upstream consumer. + if node != nil { + index, _, _ := node.Progress() + if mset.fcr == nil { + mset.fcr = make(map[uint64]string) + } + mset.fcr[index] = m.rply + } else { + mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) + } } mset.mu.Unlock() return } - sseq, _, _, ts, pending := replyInfo(m.rply) + sseq, _, dc, ts, pending := replyInfo(m.rply) + + if dc > 1 { + mset.mu.Unlock() + return + } // Mirror info tracking. olag := mset.mirror.lag - mset.mirror.lag = pending - isClustered := mset.node != nil + if pending == 0 { + mset.mirror.lag = 0 + } else { + mset.mirror.lag = pending - 1 + } mset.mu.Unlock() s := mset.srv var err error - if isClustered { - err = mset.processClusteredMirrorMsg(m.subj, m.hdr, m.msg, sseq-1, ts) + if node != nil { + err = node.Propose(encodeStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts)) } else { err = mset.processJetStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts) } @@ -1240,12 +1253,10 @@ func (mset *stream) setupMirrorConsumer() error { mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil}) go func() { - var shouldRetry bool select { case ccr := <-respCh: if ccr.Error != nil { mset.cancelMirrorConsumer() - shouldRetry = false // We will retry every 10 seconds or so time.AfterFunc(10*time.Second, mset.retryMirrorConsumer) } else { @@ -1258,9 +1269,6 @@ func (mset *stream) setupMirrorConsumer() error { } mset.setMirrorErr(ccr.Error) case <-time.After(2 * time.Second): - shouldRetry = true - } - if shouldRetry { mset.resetMirrorConsumer() } }() @@ -1398,7 +1406,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil}) go func() { - var shouldRetry bool select { case ccr := <-respCh: mset.mu.Lock() @@ -1407,10 +1414,9 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { if ccr.Error != nil { mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error) si.err = ccr.Error - shouldRetry = false - // We will retry every 5 seconds or so + // We will retry every 10 seconds or so mset.cancelSourceConsumer(sname) - time.AfterFunc(5*time.Second, func() { mset.retrySourceConsumer(sname) }) + time.AfterFunc(10*time.Second, func() { mset.retrySourceConsumer(sname) }) } else { // Capture consumer name. si.cname = ccr.ConsumerInfo.Name @@ -1418,9 +1424,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { } mset.mu.Unlock() case <-time.After(2 * time.Second): - shouldRetry = true - } - if shouldRetry { // Make sure things have not changed. mset.mu.Lock() if si := mset.sources[sname]; si != nil { @@ -1441,9 +1444,7 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { // Grab stream quit channel. mset.mu.RLock() - msgs := si.msgs - mch := msgs.mch - qch := mset.qch + msgs, mch, qch := si.msgs, si.msgs.mch, mset.qch mset.mu.RUnlock() t := time.NewTicker(sourceHealthCheckInterval) @@ -1473,15 +1474,31 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { // processInboundSourceMsg handles processing other stream messages bound for this stream. func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { - mset.mu.Lock() + if !mset.isLeader() { + mset.mu.Unlock() + mset.cancelSourceConsumer(si.name) + return + } + si.last = time.Now() + node := mset.node // Check for heartbeats and flow control messages. if len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) { + // Flow controls have reply subjects. if m.rply != _EMPTY_ { - mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) + // If we are clustered we want to delay signaling back the the upstream consumer. + if node != nil { + index, _, _ := node.Progress() + if mset.fcr == nil { + mset.fcr = make(map[uint64]string) + } + mset.fcr[index] = m.rply + } else { + mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) + } } mset.mu.Unlock() return @@ -1503,6 +1520,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { } } else { if dseq > si.dseq { + // FIXME(dlc) - No rapid fire. mset.setSourceConsumer(si.name, si.sseq+1) } mset.mu.Unlock() @@ -1514,7 +1532,6 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { } else { si.lag = pending - 1 } - isClustered := mset.node != nil mset.mu.Unlock() hdr, msg := m.hdr, m.msg @@ -1528,11 +1545,12 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { var err error // If we are clustered we need to propose this message to the underlying raft group. - if isClustered { + if node != nil { err = mset.processClusteredInboundMsg(m.subj, _EMPTY_, hdr, msg) } else { err = mset.processJetStreamMsg(m.subj, _EMPTY_, hdr, msg, 0, 0) } + if err != nil { s := mset.srv if err == errLastSeqMismatch { @@ -2405,8 +2423,8 @@ func (mset *stream) name() string { if mset == nil { return _EMPTY_ } - mset.mu.Lock() - defer mset.mu.Unlock() + mset.mu.RLock() + defer mset.mu.RUnlock() return mset.cfg.Name }