From 207ebd3b3dbee17e762b55601dba14262cd3e2c2 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 17:19:50 -0800 Subject: [PATCH] Changed stream sendq to linked list outq. Made consumer share streams outq. Signed-off-by: Derek Collison --- server/consumer.go | 115 +++++----------------- server/jetstream_api.go | 7 +- server/jetstream_cluster.go | 8 +- server/sendq.go | 8 +- server/stream.go | 187 +++++++++++++++++++++--------------- 5 files changed, 144 insertions(+), 181 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index bf12981f..1baca1ac 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -187,7 +187,7 @@ type consumer struct { ackSubj string nextMsgSubj string maxp int - sendq chan *jsPubMsg + outq *jsOutQ pending map[uint64]*Pending ptmr *time.Timer rdq []uint64 @@ -623,7 +623,7 @@ func (o *consumer) setLeader(isLeader bool) { } mset.mu.RLock() - s, jsa, stream := mset.srv, mset.jsa, mset.cfg.Name + s, jsa, stream, outq := mset.srv, mset.jsa, mset.cfg.Name, mset.outq mset.mu.RUnlock() o.mu.Lock() @@ -670,8 +670,9 @@ func (o *consumer) setLeader(isLeader bool) { o.replay = true } - // Recreate internal sendq - o.sendq = make(chan *jsPubMsg, msetSendQSize) + // We borrow this from the mset. + o.outq = outq + // Recreate quit channel. o.qch = make(chan struct{}) qch := o.qch @@ -679,8 +680,6 @@ func (o *consumer) setLeader(isLeader bool) { // Now start up Go routine to deliver msgs. go o.loopAndGatherMsgs(qch) - // Startup our deliver loop. - go o.loopAndDeliverMsgs(qch) } else { // Shutdown the go routines and the subscriptions. @@ -693,7 +692,7 @@ func (o *consumer) setLeader(isLeader bool) { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil } - o.sendq = nil + o.outq = nil if o.qch != nil { close(o.qch) o.qch = nil @@ -737,14 +736,14 @@ func (o *consumer) unsubscribe(sub *subscription) { o.client.processUnsub(sub.sid) } -// We need to make sure we protect access to the sendq. +// We need to make sure we protect access to the outq. // Do all advisory sends here. // Lock should be held on entry but will be released. func (o *consumer) sendAdvisory(subj string, msg []byte) { - sendq := o.sendq + outq := o.outq o.mu.Unlock() - if sendq != nil { - sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0} + if outq != nil { + outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil}) } o.mu.Lock() } @@ -1230,73 +1229,6 @@ func (o *consumer) writeStoreState() error { return o.store.Update(&state) } -// loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes. -func (o *consumer) loopAndDeliverMsgs(qch chan struct{}) { - o.mu.Lock() - inch, sendq := o.inch, o.sendq - s, acc := o.acc.srv, o.acc - o.mu.Unlock() - - // Create our client used to send messages. - c := s.createInternalJetStreamClient() - // Bind to the account. - c.registerWithAccount(acc) - // Clean up on exit. - defer c.closeConnection(ClientClosed) - - // Warn when internal send queue is backed up past 75% - warnThresh := cap(sendq) * 3 / 4 - warnFreq := time.Second - last := time.Now().Add(-warnFreq) - - for { - if len(sendq) > warnThresh && time.Since(last) >= warnFreq { - s.Warnf("Jetstream internal consumer send queue > 75%% for account: %q consumer: %q", acc.Name, o) - last = time.Now() - } - - select { - case <-qch: - return - case interest := <-inch: - // inch can be nil on pull-based, but then this will - // just block and not fire. - o.updateDeliveryInterest(interest) - case pm := <-sendq: - if pm == nil { - return - } - c.pa.subject = []byte(pm.subj) - c.pa.deliver = []byte(pm.dsubj) - c.pa.size = len(pm.msg) + len(pm.hdr) - c.pa.szb = []byte(strconv.Itoa(c.pa.size)) - c.pa.reply = []byte(pm.reply) - - var msg []byte - if len(pm.hdr) > 0 { - c.pa.hdr = len(pm.hdr) - c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) - msg = append(pm.hdr, pm.msg...) - msg = append(msg, _CRLF_...) - } else { - c.pa.hdr = -1 - c.pa.hdb = nil - msg = append(pm.msg, _CRLF_...) - } - - didDeliver, _ := c.processInboundClientMsg(msg) - c.pa.szb = nil - c.flushClients(0) - - // Check to see if this is a delivery for an observable and - // we failed to deliver the message. If so alert the observable. - if !didDeliver && pm.o != nil && pm.seq > 0 { - pm.o.didNotDeliver(pm.seq) - } - } - } -} - // Info returns our current consumer state. func (o *consumer) info() *ConsumerInfo { o.mu.RLock() @@ -1645,18 +1577,17 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string defer o.mu.Unlock() s, mset, js := o.srv, o.mset, o.js - if mset == nil || o.sendq == nil { + if mset == nil { return } - sendq := o.sendq + outq := o.outq sendErr := func(status int, description string) { // Needs to be unlocked to send err. o.mu.Unlock() defer o.mu.Lock() hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description)) - pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0} - sendq <- pmsg // Send error message. + outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil}) } if o.isPushMode() { @@ -1846,11 +1777,10 @@ func (o *consumer) forceExpireFirstWaiting() *waitingRequest { return wr } // If we are expiring this and we think there is still interest, alert. - if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.sendq != nil { + if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.outq != nil { // We still appear to have interest, so send alert as courtesy. hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n") - pmsg := &jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0} - o.sendq <- pmsg // Send message. + o.outq.send(&jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0, nil}) } return wr } @@ -1903,6 +1833,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { } lseq = o.mset.state().LastSeq } + inch := o.inch o.mu.Unlock() // Deliver all the msgs we have now, once done or on a condition, we wait for new ones. @@ -2000,9 +1931,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { o.mu.Unlock() select { + case interest := <-inch: + // inch can be nil on pull-based, but then this will + // just block and not fire. + o.updateDeliveryInterest(interest) case <-qch: return case <-mch: + // Messages are waiting. } } } @@ -2014,7 +1950,7 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { - if o.mset == nil || o.sendq == nil { + if o.mset == nil || o.outq == nil { return } // Update pending on first attempt @@ -2032,15 +1968,16 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 dseq := o.dseq o.dseq++ - pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq} + pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil} mset := o.mset ap := o.cfg.AckPolicy - sendq := o.sendq + outq := o.outq // This needs to be unlocked since the other side may need this lock on a failed delivery. o.mu.Unlock() // Send message. - sendq <- pmsg + outq.send(pmsg) + // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) { mset.store.RemoveMsg(seq) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f8c11bcb..c66357e0 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2789,7 +2789,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr * chunk = chunk[:n] if err != nil { if n > 0 { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0, nil}) } break } @@ -2804,15 +2804,14 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr * case <-time.After(10 * time.Millisecond): } } - // TODO(dlc) - Might want these moved off sendq if we have contention. ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index) - mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0, nil}) atomic.AddInt32(&out, int32(len(chunk))) } done: // Send last EOF // TODO(dlc) - place hash in header - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } // Request to create a durable consumer. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 16e7b7ea..43a951d8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3891,7 +3891,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 - s, jsa, st, rf, sendq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.sendq + s, jsa, st, rf, outq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq maxMsgSize := int(mset.cfg.MaxMsgSize) msetName := mset.cfg.Name mset.mu.RUnlock() @@ -3920,7 +3920,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}} resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"} response, _ = json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return err } @@ -3933,7 +3933,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}} resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"} response, _ = json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return err } @@ -3962,7 +3962,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // If we errored out respond here. if err != nil && canRespond { - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } if err != nil && isOutOfSpaceErr(err) { diff --git a/server/sendq.go b/server/sendq.go index 1e16066b..66109062 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -108,19 +108,19 @@ func (sq *sendq) send(subj, rply string, hdr, msg []byte) { msg = append(msg[:0:0], msg...) out.msg = msg } - sq.mu.Lock() - var doKick bool + sq.mu.Lock() + var notify bool if sq.head == nil { sq.head = out - doKick = true + notify = true } else { sq.tail.next = out } sq.tail = out sq.mu.Unlock() - if doKick { + if notify { select { case sq.mch <- struct{}{}: default: diff --git a/server/stream.go b/server/stream.go index 838fa593..afe58809 100644 --- a/server/stream.go +++ b/server/stream.go @@ -135,7 +135,7 @@ type stream struct { sysc *client sid int pubAck []byte - sendq chan *jsPubMsg + outq *jsOutQ mch chan struct{} msgs *inbound store StreamStore @@ -576,10 +576,10 @@ func (mset *stream) sendCreateAdvisory() { mset.mu.Lock() name := mset.cfg.Name template := mset.cfg.Template - sendq := mset.sendq + outq := mset.outq mset.mu.Unlock() - if sendq == nil { + if outq == nil { return } @@ -601,11 +601,11 @@ func (mset *stream) sendCreateAdvisory() { } subj := JSAdvisoryStreamCreatedPre + "." + name - sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } func (mset *stream) sendDeleteAdvisoryLocked() { - if mset.sendq == nil { + if mset.outq == nil { return } @@ -623,12 +623,12 @@ func (mset *stream) sendDeleteAdvisoryLocked() { j, err := json.Marshal(m) if err == nil { subj := JSAdvisoryStreamDeletedPre + "." + mset.cfg.Name - mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } } func (mset *stream) sendUpdateAdvisoryLocked() { - if mset.sendq == nil { + if mset.outq == nil { return } @@ -645,7 +645,7 @@ func (mset *stream) sendUpdateAdvisoryLocked() { j, err := json.Marshal(m) if err == nil { subj := JSAdvisoryStreamUpdatedPre + "." + mset.cfg.Name - mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } } @@ -1117,8 +1117,8 @@ func (mset *stream) mirrorDurable() string { // Setup our mirror consumer. // Lock should be held. func (mset *stream) setupMirrorConsumer() error { - if mset.sendq == nil { - return errors.New("sendq required") + if mset.outq == nil { + return errors.New("outq required") } // Reset @@ -1176,7 +1176,7 @@ func (mset *stream) setupMirrorConsumer() error { subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) req := &CreateConsumerRequest{ Stream: mset.cfg.Mirror.Name, @@ -1227,7 +1227,7 @@ func (mset *stream) setupMirrorConsumer() error { subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil}) go func() { var shouldRetry bool @@ -1318,7 +1318,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) if ext != nil { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".") @@ -1396,7 +1396,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil}) go func() { var shouldRetry bool @@ -1619,8 +1619,8 @@ func (mset *stream) startingSequenceForSources() { // Setup our source consumers. // Lock should be held. func (mset *stream) setupSourceConsumers() error { - if mset.sendq == nil { - return errors.New("sendq required") + if mset.outq == nil { + return errors.New("outq required") } // Reset if needed. for _, si := range mset.sources { @@ -1676,7 +1676,7 @@ func (mset *stream) stopSourceConsumers() { } // Need to delete the old one. subject := fmt.Sprintf(JSApiConsumerDeleteT, si.name, mset.sourceDurable(si.name)) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } } @@ -1692,7 +1692,7 @@ func (mset *stream) unsubscribeToStream() error { } durable := mset.mirrorDurable() subject := fmt.Sprintf(JSApiConsumerDeleteT, mset.cfg.Mirror.Name, durable) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) mset.mirror = nil } @@ -1949,17 +1949,17 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { } mset.mu.Lock() - var doKick bool + var notify bool if mset.msgs.head == nil { mset.msgs.head = m - doKick = true + notify = true } else { mset.msgs.tail.next = m } mset.msgs.tail = m mset.mu.Unlock() - if doKick { + if notify { select { case mset.mch <- struct{}{}: default: @@ -2037,13 +2037,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } if isMisMatch { - sendq := mset.sendq + outq := mset.outq mset.mu.Unlock() - if canRespond && sendq != nil { + if canRespond && outq != nil { resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 503, Description: "expected stream sequence does not match"} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return errLastSeqMismatch } @@ -2059,14 +2059,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, var msgId string if len(hdr) > 0 { msgId = getMsgId(hdr) - sendq := mset.sendq + outq := mset.outq if dde := mset.checkMsgId(msgId); dde != nil { mset.clfs++ mset.mu.Unlock() if canRespond { response := append(pubAck, strconv.FormatUint(dde.seq, 10)...) response = append(response, ",\"duplicate\": true}"...) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return errors.New("msgid is duplicate") } @@ -2079,7 +2079,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: "expected stream does not match"} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return errors.New("expected stream does not match") } @@ -2092,7 +2092,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last sequence: %d", mlseq)} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq) } @@ -2105,7 +2105,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last msg ID: %s", last)} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return fmt.Errorf("last msgid mismatch: %q vs %q", lmsgId, last) } @@ -2126,7 +2126,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"} b, _ := json.Marshal(resp) - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return ErrMaxPayload } @@ -2163,7 +2163,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if canRespond { response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...) response = append(response, '}') - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } // If we have a msgId make sure to save. if msgId != _EMPTY_ { @@ -2233,7 +2233,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Send response here. if canRespond { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } if err == nil && seq > 0 && numConsumers > 0 { @@ -2263,6 +2263,43 @@ type jsPubMsg struct { msg []byte o *consumer seq uint64 + next *jsPubMsg +} + +// Forms a linked list for sending internal system messages. +type jsOutQ struct { + mu sync.Mutex + mch chan struct{} + head *jsPubMsg + tail *jsPubMsg +} + +func (q *jsOutQ) pending() *jsPubMsg { + q.mu.Lock() + head := q.head + q.head, q.tail = nil, nil + q.mu.Unlock() + return head +} + +func (q *jsOutQ) send(msg *jsPubMsg) { + q.mu.Lock() + var notify bool + if q.head == nil { + q.head = msg + notify = true + } else { + q.tail.next = msg + } + q.tail = msg + q.mu.Unlock() + + if notify { + select { + case q.mch <- struct{}{}: + default: + } + } } // StoredMsg is for raw access to messages in a stream. @@ -2274,18 +2311,15 @@ type StoredMsg struct { Time time.Time `json:"time"` } -// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering -const msetSendQSize = 65536 - // This is similar to system semantics but did not want to overload the single system sendq, // or require system account when doing simple setup with jetstream. func (mset *stream) setupSendCapabilities() { mset.mu.Lock() defer mset.mu.Unlock() - if mset.sendq != nil { + if mset.outq != nil { return } - mset.sendq = make(chan *jsPubMsg, msetSendQSize) + mset.outq = &jsOutQ{mch: make(chan struct{}, 1)} go mset.internalLoop() } @@ -2305,53 +2339,48 @@ func (mset *stream) internalLoop() { c := s.createInternalJetStreamClient() c.registerWithAccount(mset.acc) defer c.closeConnection(ClientClosed) - sendq, mch := mset.sendq, mset.mch - name := mset.cfg.Name + outq, qch, mch := mset.outq, mset.qch, mset.mch isClustered := mset.node != nil mset.mu.RUnlock() - // Warn when internal send queue is backed up past 75% - warnThresh := 3 * msetSendQSize / 4 - warnFreq := time.Second - last := time.Now().Add(-warnFreq) - for { - if len(sendq) > warnThresh && time.Since(last) >= warnFreq { - s.Warnf("Jetstream internal send queue > 75%% for account: %q stream: %q", c.acc.Name, name) - last = time.Now() - } select { - case pm := <-sendq: - if pm == nil { - return - } - c.pa.subject = []byte(pm.subj) - c.pa.deliver = []byte(pm.dsubj) - c.pa.size = len(pm.msg) + len(pm.hdr) - c.pa.szb = []byte(strconv.Itoa(c.pa.size)) - c.pa.reply = []byte(pm.reply) + case <-outq.mch: + for pm := outq.pending(); pm != nil; { + c.pa.subject = []byte(pm.subj) + c.pa.deliver = []byte(pm.dsubj) + c.pa.size = len(pm.msg) + len(pm.hdr) + c.pa.szb = []byte(strconv.Itoa(c.pa.size)) + c.pa.reply = []byte(pm.reply) - var msg []byte - if len(pm.hdr) > 0 { - c.pa.hdr = len(pm.hdr) - c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) - msg = append(pm.hdr, pm.msg...) - msg = append(msg, _CRLF_...) - } else { - c.pa.hdr = -1 - c.pa.hdb = nil - msg = append(pm.msg, _CRLF_...) - } + var msg []byte + if len(pm.hdr) > 0 { + c.pa.hdr = len(pm.hdr) + c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + msg = append(pm.hdr, pm.msg...) + msg = append(msg, _CRLF_...) + } else { + c.pa.hdr = -1 + c.pa.hdb = nil + msg = append(pm.msg, _CRLF_...) + } - didDeliver, _ := c.processInboundClientMsg(msg) - c.pa.szb = nil - c.flushClients(0) + didDeliver, _ := c.processInboundClientMsg(msg) + c.pa.szb = nil - // Check to see if this is a delivery for an observable and - // we failed to deliver the message. If so alert the observable. - if pm.o != nil && pm.seq > 0 && !didDeliver { - pm.o.didNotDeliver(pm.seq) + // Check to see if this is a delivery for an observable and + // we failed to deliver the message. If so alert the observable. + if pm.o != nil && pm.seq > 0 && !didDeliver { + pm.o.didNotDeliver(pm.seq) + } + + // Do this here to nil out below vs up in for loop. + next := pm.next + pm.next, pm.hdr, pm.msg = nil, nil, nil + pm = next } + c.flushClients(10 * time.Millisecond) + case <-mch: for im := mset.pending(); im != nil; { // If we are clustered we need to propose this message to the underlying raft group. @@ -2365,6 +2394,8 @@ func (mset *stream) internalLoop() { im.next, im.hdr, im.msg = nil, nil, nil im = next } + case <-qch: + return case <-s.quitCh: return } @@ -2432,7 +2463,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if deleteFlag { for _, ssi := range mset.cfg.Sources { subject := fmt.Sprintf(JSApiConsumerDeleteT, ssi.Name, mset.sourceDurable(ssi.Name)) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } } @@ -2441,10 +2472,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } - if mset.sendq != nil { - mset.sendq <- nil - } - c := mset.client mset.client = nil if c == nil {