diff --git a/server/consumer.go b/server/consumer.go index b0443956..157ff23a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2022,6 +2022,10 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 o.dseq++ pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil} + if o.maxpb > 0 { + o.pbytes += pmsg.size() + } + mset := o.mset ap := o.cfg.AckPolicy @@ -2041,11 +2045,8 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 } // Flow control. - if o.maxpb > 0 { - o.pbytes += pmsg.size() - if o.needFlowControl() { - o.sendFlowControl() - } + if o.maxpb > 0 && o.needFlowControl() { + o.sendFlowControl() } // FIXME(dlc) - Capture errors? diff --git a/server/jetstream.go b/server/jetstream.go index 036cb9f8..edb2b8ab 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -186,13 +186,13 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { s.Warnf("| || | _| | | \\__ \\ | | | / _| / _ \\| |\\/| |") s.Warnf(" \\__/|___| |_| |___/ |_| |_|_\\___/_/ \\_\\_| |_|") s.Warnf("") - s.Warnf(" https://github.com/nats-io/jetstream") + s.Warnf(" https://github.com/nats-io/jetstream") s.Noticef("") - s.Noticef("----------- JETSTREAM -----------") + s.Noticef("---------------- JETSTREAM ----------------") s.Noticef(" Max Memory: %s", friendlyBytes(cfg.MaxMemory)) s.Noticef(" Max Storage: %s", friendlyBytes(cfg.MaxStore)) s.Noticef(" Store Directory: %q", cfg.StoreDir) - s.Noticef("---------------------------------") + s.Noticef("-------------------------------------------") // Setup our internal subscriptions. if err := s.setJetStreamExportSubs(); err != nil { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index aee7e974..2c753e7b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1254,8 +1254,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { const ( compactInterval = 2 * time.Minute - compactSizeMin = 32 * 1024 * 1024 - compactNumMin = 4 + compactSizeMin = 64 * 1024 * 1024 + compactNumMin = 8192 ) t := time.NewTicker(compactInterval) @@ -1315,8 +1315,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { n.Applied(ce.Index) ne := ce.Index - lastApplied - // If over our compact min and we have at least min entries to compact, go ahead and snapshot/compact. - if _, b := n.Size(); b > compactSizeMin && ne >= compactNumMin { + // If we have at least min entries to compact, go ahead and snapshot/compact. + if ne >= compactNumMin { doSnapshot() } } else { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 4cb28252..e1f5b6d9 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -7084,7 +7084,9 @@ func TestJetStreamPushConsumerFlowControl(t *testing.T) { } else if obs := mset.lookupConsumer("dlc"); obs == nil { t.Fatalf("Error looking up stream: %v", err) } else { + obs.mu.Lock() obs.setMaxPendingBytes(16 * 1024) + obs.mu.Unlock() } msgSize := 1024 diff --git a/server/raft.go b/server/raft.go index c01a49f9..281b6a60 100644 --- a/server/raft.go +++ b/server/raft.go @@ -341,9 +341,9 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { reqs: make(chan *voteRequest, 8), votes: make(chan *voteResponse, 32), propc: make(chan *Entry, 8192), - entryc: make(chan *appendEntry, 8192), + entryc: make(chan *appendEntry, 32768), respc: make(chan *appendEntryResponse, 32768), - applyc: make(chan *CommittedEntry, 8192), + applyc: make(chan *CommittedEntry, 32768), leadc: make(chan bool, 8), stepdown: make(chan string, 8), } @@ -1975,6 +1975,7 @@ func (n *raft) trackResponse(ar *appendEntryResponse) { sendHB = len(n.propc) == 0 } } + n.Unlock() if sendHB { diff --git a/server/stream.go b/server/stream.go index 596ec862..23f767b9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -987,12 +987,18 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo { // processClusteredMirrorMsg will propose the inbound mirrored message to the underlying raft group. func (mset *stream) processClusteredMirrorMsg(subject string, hdr, msg []byte, seq uint64, ts int64) error { - mset.mu.Lock() - defer mset.mu.Unlock() + mset.mu.RLock() + node := mset.node + mset.mu.RUnlock() // Do proposal. - return mset.node.Propose(encodeStreamMsg(subject, _EMPTY_, hdr, msg, seq, ts)) + if node == nil { + return nil + } + return node.Propose(encodeStreamMsg(subject, _EMPTY_, hdr, msg, seq, ts)) } +const sourceHealthCheckInterval = 10 * time.Second + // Will run as a Go routine to process messages. func (mset *stream) processMirrorMsgs() { s := mset.srv @@ -1005,6 +1011,9 @@ func (mset *stream) processMirrorMsgs() { qch := mset.qch mset.mu.RUnlock() + t := time.NewTicker(sourceHealthCheckInterval) + defer t.Stop() + for { select { case <-s.quitCh: @@ -1012,12 +1021,15 @@ func (mset *stream) processMirrorMsgs() { case <-qch: return case <-mch: - for im := mset.pending(msgs); im != nil; { + for im := mset.pending(msgs); im != nil; im = im.next { mset.processInboundMirrorMsg(im) - // Do this here to nil out below vs up in for loop. - next := im.next - im.next, im.hdr, im.msg = nil, nil, nil - im = next + } + case <-t.C: + mset.mu.RLock() + stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval + mset.mu.RUnlock() + if stalled { + mset.resetMirrorConsumer() } } } @@ -1025,8 +1037,6 @@ func (mset *stream) processMirrorMsgs() { // processInboundMirrorMsg handles processing messages bound for a stream. func (mset *stream) processInboundMirrorMsg(m *inMsg) { - sseq, _, _, ts, pending := replyInfo(m.rply) - mset.mu.Lock() if mset.mirror == nil { mset.mu.Unlock() @@ -1044,6 +1054,8 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) { return } + sseq, _, _, ts, pending := replyInfo(m.rply) + // Mirror info tracking. olag := mset.mirror.lag mset.mirror.lag = pending @@ -1119,8 +1131,10 @@ func (mset *stream) setupMirrorConsumer() error { return errors.New("outq required") } + isReset := mset.mirror != nil + // Reset - if mset.mirror != nil { + if isReset { if mset.mirror.sub != nil { mset.unsubscribe(mset.mirror.sub) mset.mirror.sub = nil @@ -1139,7 +1153,9 @@ func (mset *stream) setupMirrorConsumer() error { deliverSubject = syncSubject("$JS.M") } - mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: &inbound{mch: make(chan struct{}, 1)}} + if !isReset { + mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: &inbound{mch: make(chan struct{}, 1)}} + } // Process inbound mirror messages from the wire. sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { @@ -1159,7 +1175,10 @@ func (mset *stream) setupMirrorConsumer() error { } mset.mirror.sub = sub - mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() }) + + if !isReset { + mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() }) + } // Now send off request to create/update our consumer. This will be all API based even in single server mode. // We calculate durable names apriori so we do not need to save them off. @@ -1179,6 +1198,7 @@ func (mset *stream) setupMirrorConsumer() error { FlowControl: true, }, } + // Only use start optionals on first time. if state.Msgs == 0 { if mset.cfg.Mirror.OptStartSeq > 0 { @@ -1229,7 +1249,9 @@ func (mset *stream) setupMirrorConsumer() error { } else { // Capture consumer name. mset.mu.Lock() - mset.mirror.cname = ccr.ConsumerInfo.Name + if mset.mirror != nil { + mset.mirror.cname = ccr.ConsumerInfo.Name + } mset.mu.Unlock() } mset.setMirrorErr(ccr.Error) @@ -1244,10 +1266,6 @@ func (mset *stream) setupMirrorConsumer() error { return nil } -func (mset *stream) sourceDurable(streamName string) string { - return string(getHash(fmt.Sprintf("SOURCE:%s:%s", streamName, mset.cfg.Name))) -} - func (mset *stream) streamSource(sname string) *StreamSource { for _, ssi := range mset.cfg.Sources { if ssi.Name == sname { @@ -1426,6 +1444,9 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { qch := mset.qch mset.mu.RUnlock() + t := time.NewTicker(sourceHealthCheckInterval) + defer t.Stop() + for { select { case <-s.quitCh: @@ -1433,12 +1454,16 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { case <-qch: return case <-mch: - for im := mset.pending(msgs); im != nil; { + for im := mset.pending(msgs); im != nil; im = im.next { mset.processInboundSourceMsg(si, im) - // Do this here to nil out below vs up in for loop. - next := im.next - im.next, im.hdr, im.msg = nil, nil, nil - im = next + } + case <-t.C: + mset.mu.RLock() + stalled := time.Since(si.last) > 3*sourceHealthCheckInterval + sname := si.name + mset.mu.RUnlock() + if stalled { + mset.retrySourceConsumer(sname) } } } @@ -1446,11 +1471,6 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { // processInboundSourceMsg handles processing other stream messages bound for this stream. func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { - sseq, dseq, dc, _, pending := replyInfo(m.rply) - - if dc > 1 { - return - } mset.mu.Lock() @@ -1465,6 +1485,13 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { return } + sseq, dseq, dc, _, pending := replyInfo(m.rply) + + if dc > 1 { + mset.mu.Unlock() + return + } + // Tracking is done here. if dseq == si.dseq+1 { si.dseq++ @@ -2386,7 +2413,7 @@ func (mset *stream) internalLoop() { for { select { case <-outq.mch: - for pm := outq.pending(); pm != nil; { + for pm := outq.pending(); pm != nil; pm = pm.next { c.pa.subject = []byte(pm.subj) c.pa.deliver = []byte(pm.dsubj) c.pa.size = len(pm.msg) + len(pm.hdr) @@ -2413,25 +2440,16 @@ func (mset *stream) internalLoop() { if pm.o != nil && pm.seq > 0 && !didDeliver { pm.o.didNotDeliver(pm.seq) } - - // Do this here to nil out below vs up in for loop. - next := pm.next - pm.next, pm.hdr, pm.msg = nil, nil, nil - pm = next } c.flushClients(10 * time.Millisecond) case <-mch: - for im := mset.pending(mset.msgs); im != nil; { + for im := mset.pending(mset.msgs); im != nil; im = im.next { // If we are clustered we need to propose this message to the underlying raft group. if isClustered { mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg) } else { mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0) } - // Do this here to nil out below vs up in for loop. - next := im.next - im.next, im.hdr, im.msg = nil, nil, nil - im = next } case seq := <-rmch: mset.store.RemoveMsg(seq)