Changed stream sendq to linked list outq.

Made consumer share streams outq.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-04 17:19:50 -08:00
parent e70e46ea4a
commit 207ebd3b3d
5 changed files with 144 additions and 181 deletions

View File

@@ -187,7 +187,7 @@ type consumer struct {
ackSubj string
nextMsgSubj string
maxp int
sendq chan *jsPubMsg
outq *jsOutQ
pending map[uint64]*Pending
ptmr *time.Timer
rdq []uint64
@@ -623,7 +623,7 @@ func (o *consumer) setLeader(isLeader bool) {
}
mset.mu.RLock()
s, jsa, stream := mset.srv, mset.jsa, mset.cfg.Name
s, jsa, stream, outq := mset.srv, mset.jsa, mset.cfg.Name, mset.outq
mset.mu.RUnlock()
o.mu.Lock()
@@ -670,8 +670,9 @@ func (o *consumer) setLeader(isLeader bool) {
o.replay = true
}
// Recreate internal sendq
o.sendq = make(chan *jsPubMsg, msetSendQSize)
// We borrow this from the mset.
o.outq = outq
// Recreate quit channel.
o.qch = make(chan struct{})
qch := o.qch
@@ -679,8 +680,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)
// Startup our deliver loop.
go o.loopAndDeliverMsgs(qch)
} else {
// Shutdown the go routines and the subscriptions.
@@ -693,7 +692,7 @@ func (o *consumer) setLeader(isLeader bool) {
o.srv.sysUnsubscribe(o.infoSub)
o.infoSub = nil
}
o.sendq = nil
o.outq = nil
if o.qch != nil {
close(o.qch)
o.qch = nil
@@ -737,14 +736,14 @@ func (o *consumer) unsubscribe(sub *subscription) {
o.client.processUnsub(sub.sid)
}
// We need to make sure we protect access to the sendq.
// We need to make sure we protect access to the outq.
// Do all advisory sends here.
// Lock should be held on entry but will be released.
func (o *consumer) sendAdvisory(subj string, msg []byte) {
sendq := o.sendq
outq := o.outq
o.mu.Unlock()
if sendq != nil {
sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0}
if outq != nil {
outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil})
}
o.mu.Lock()
}
@@ -1230,73 +1229,6 @@ func (o *consumer) writeStoreState() error {
return o.store.Update(&state)
}
// loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes.
func (o *consumer) loopAndDeliverMsgs(qch chan struct{}) {
o.mu.Lock()
inch, sendq := o.inch, o.sendq
s, acc := o.acc.srv, o.acc
o.mu.Unlock()
// Create our client used to send messages.
c := s.createInternalJetStreamClient()
// Bind to the account.
c.registerWithAccount(acc)
// Clean up on exit.
defer c.closeConnection(ClientClosed)
// Warn when internal send queue is backed up past 75%
warnThresh := cap(sendq) * 3 / 4
warnFreq := time.Second
last := time.Now().Add(-warnFreq)
for {
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
s.Warnf("Jetstream internal consumer send queue > 75%% for account: %q consumer: %q", acc.Name, o)
last = time.Now()
}
select {
case <-qch:
return
case interest := <-inch:
// inch can be nil on pull-based, but then this will
// just block and not fire.
o.updateDeliveryInterest(interest)
case pm := <-sendq:
if pm == nil {
return
}
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
didDeliver, _ := c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if !didDeliver && pm.o != nil && pm.seq > 0 {
pm.o.didNotDeliver(pm.seq)
}
}
}
}
// Info returns our current consumer state.
func (o *consumer) info() *ConsumerInfo {
o.mu.RLock()
@@ -1645,18 +1577,17 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string
defer o.mu.Unlock()
s, mset, js := o.srv, o.mset, o.js
if mset == nil || o.sendq == nil {
if mset == nil {
return
}
sendq := o.sendq
outq := o.outq
sendErr := func(status int, description string) {
// Needs to be unlocked to send err.
o.mu.Unlock()
defer o.mu.Lock()
hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description))
pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0}
sendq <- pmsg // Send error message.
outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil})
}
if o.isPushMode() {
@@ -1846,11 +1777,10 @@ func (o *consumer) forceExpireFirstWaiting() *waitingRequest {
return wr
}
// If we are expiring this and we think there is still interest, alert.
if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.sendq != nil {
if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.outq != nil {
// We still appear to have interest, so send alert as courtesy.
hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n")
pmsg := &jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0}
o.sendq <- pmsg // Send message.
o.outq.send(&jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0, nil})
}
return wr
}
@@ -1903,6 +1833,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
lseq = o.mset.state().LastSeq
}
inch := o.inch
o.mu.Unlock()
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
@@ -2000,9 +1931,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
o.mu.Unlock()
select {
case interest := <-inch:
// inch can be nil on pull-based, but then this will
// just block and not fire.
o.updateDeliveryInterest(interest)
case <-qch:
return
case <-mch:
// Messages are waiting.
}
}
}
@@ -2014,7 +1950,7 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str
// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) {
if o.mset == nil || o.sendq == nil {
if o.mset == nil || o.outq == nil {
return
}
// Update pending on first attempt
@@ -2032,15 +1968,16 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
dseq := o.dseq
o.dseq++
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq}
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil}
mset := o.mset
ap := o.cfg.AckPolicy
sendq := o.sendq
outq := o.outq
// This needs to be unlocked since the other side may need this lock on a failed delivery.
o.mu.Unlock()
// Send message.
sendq <- pmsg
outq.send(pmsg)
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) {
mset.store.RemoveMsg(seq)

View File

@@ -2789,7 +2789,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
chunk = chunk[:n]
if err != nil {
if n > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0, nil})
}
break
}
@@ -2804,15 +2804,14 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
case <-time.After(10 * time.Millisecond):
}
}
// TODO(dlc) - Might want these moved off sendq if we have contention.
ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0, nil})
atomic.AddInt32(&out, int32(len(chunk)))
}
done:
// Send last EOF
// TODO(dlc) - place hash in header
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
// Request to create a durable consumer.

View File

@@ -3891,7 +3891,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.mu.RLock()
canRespond := !mset.cfg.NoAck && len(reply) > 0
s, jsa, st, rf, sendq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.sendq
s, jsa, st, rf, outq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq
maxMsgSize := int(mset.cfg.MaxMsgSize)
msetName := mset.cfg.Name
mset.mu.RUnlock()
@@ -3920,7 +3920,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}}
resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"}
response, _ = json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return err
}
@@ -3933,7 +3933,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}
response, _ = json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return err
}
@@ -3962,7 +3962,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// If we errored out respond here.
if err != nil && canRespond {
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
if err != nil && isOutOfSpaceErr(err) {

View File

@@ -108,19 +108,19 @@ func (sq *sendq) send(subj, rply string, hdr, msg []byte) {
msg = append(msg[:0:0], msg...)
out.msg = msg
}
sq.mu.Lock()
var doKick bool
sq.mu.Lock()
var notify bool
if sq.head == nil {
sq.head = out
doKick = true
notify = true
} else {
sq.tail.next = out
}
sq.tail = out
sq.mu.Unlock()
if doKick {
if notify {
select {
case sq.mch <- struct{}{}:
default:

View File

@@ -135,7 +135,7 @@ type stream struct {
sysc *client
sid int
pubAck []byte
sendq chan *jsPubMsg
outq *jsOutQ
mch chan struct{}
msgs *inbound
store StreamStore
@@ -576,10 +576,10 @@ func (mset *stream) sendCreateAdvisory() {
mset.mu.Lock()
name := mset.cfg.Name
template := mset.cfg.Template
sendq := mset.sendq
outq := mset.outq
mset.mu.Unlock()
if sendq == nil {
if outq == nil {
return
}
@@ -601,11 +601,11 @@ func (mset *stream) sendCreateAdvisory() {
}
subj := JSAdvisoryStreamCreatedPre + "." + name
sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil})
}
func (mset *stream) sendDeleteAdvisoryLocked() {
if mset.sendq == nil {
if mset.outq == nil {
return
}
@@ -623,12 +623,12 @@ func (mset *stream) sendDeleteAdvisoryLocked() {
j, err := json.Marshal(m)
if err == nil {
subj := JSAdvisoryStreamDeletedPre + "." + mset.cfg.Name
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil})
}
}
func (mset *stream) sendUpdateAdvisoryLocked() {
if mset.sendq == nil {
if mset.outq == nil {
return
}
@@ -645,7 +645,7 @@ func (mset *stream) sendUpdateAdvisoryLocked() {
j, err := json.Marshal(m)
if err == nil {
subj := JSAdvisoryStreamUpdatedPre + "." + mset.cfg.Name
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil})
}
}
@@ -1117,8 +1117,8 @@ func (mset *stream) mirrorDurable() string {
// Setup our mirror consumer.
// Lock should be held.
func (mset *stream) setupMirrorConsumer() error {
if mset.sendq == nil {
return errors.New("sendq required")
if mset.outq == nil {
return errors.New("outq required")
}
// Reset
@@ -1176,7 +1176,7 @@ func (mset *stream) setupMirrorConsumer() error {
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
req := &CreateConsumerRequest{
Stream: mset.cfg.Mirror.Name,
@@ -1227,7 +1227,7 @@ func (mset *stream) setupMirrorConsumer() error {
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
go func() {
var shouldRetry bool
@@ -1318,7 +1318,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
if ext != nil {
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".")
@@ -1396,7 +1396,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
go func() {
var shouldRetry bool
@@ -1619,8 +1619,8 @@ func (mset *stream) startingSequenceForSources() {
// Setup our source consumers.
// Lock should be held.
func (mset *stream) setupSourceConsumers() error {
if mset.sendq == nil {
return errors.New("sendq required")
if mset.outq == nil {
return errors.New("outq required")
}
// Reset if needed.
for _, si := range mset.sources {
@@ -1676,7 +1676,7 @@ func (mset *stream) stopSourceConsumers() {
}
// Need to delete the old one.
subject := fmt.Sprintf(JSApiConsumerDeleteT, si.name, mset.sourceDurable(si.name))
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
}
@@ -1692,7 +1692,7 @@ func (mset *stream) unsubscribeToStream() error {
}
durable := mset.mirrorDurable()
subject := fmt.Sprintf(JSApiConsumerDeleteT, mset.cfg.Mirror.Name, durable)
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
mset.mirror = nil
}
@@ -1949,17 +1949,17 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
}
mset.mu.Lock()
var doKick bool
var notify bool
if mset.msgs.head == nil {
mset.msgs.head = m
doKick = true
notify = true
} else {
mset.msgs.tail.next = m
}
mset.msgs.tail = m
mset.mu.Unlock()
if doKick {
if notify {
select {
case mset.mch <- struct{}{}:
default:
@@ -2037,13 +2037,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}
if isMisMatch {
sendq := mset.sendq
outq := mset.outq
mset.mu.Unlock()
if canRespond && sendq != nil {
if canRespond && outq != nil {
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 503, Description: "expected stream sequence does not match"}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return errLastSeqMismatch
}
@@ -2059,14 +2059,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
var msgId string
if len(hdr) > 0 {
msgId = getMsgId(hdr)
sendq := mset.sendq
outq := mset.outq
if dde := mset.checkMsgId(msgId); dde != nil {
mset.clfs++
mset.mu.Unlock()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return errors.New("msgid is duplicate")
}
@@ -2079,7 +2079,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: "expected stream does not match"}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return errors.New("expected stream does not match")
}
@@ -2092,7 +2092,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last sequence: %d", mlseq)}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq)
}
@@ -2105,7 +2105,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last msg ID: %s", last)}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return fmt.Errorf("last msgid mismatch: %q vs %q", lmsgId, last)
}
@@ -2126,7 +2126,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}
b, _ := json.Marshal(resp)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return ErrMaxPayload
}
@@ -2163,7 +2163,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if canRespond {
response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...)
response = append(response, '}')
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
@@ -2233,7 +2233,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Send response here.
if canRespond {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
if err == nil && seq > 0 && numConsumers > 0 {
@@ -2263,6 +2263,43 @@ type jsPubMsg struct {
msg []byte
o *consumer
seq uint64
next *jsPubMsg
}
// Forms a linked list for sending internal system messages.
type jsOutQ struct {
mu sync.Mutex
mch chan struct{}
head *jsPubMsg
tail *jsPubMsg
}
func (q *jsOutQ) pending() *jsPubMsg {
q.mu.Lock()
head := q.head
q.head, q.tail = nil, nil
q.mu.Unlock()
return head
}
func (q *jsOutQ) send(msg *jsPubMsg) {
q.mu.Lock()
var notify bool
if q.head == nil {
q.head = msg
notify = true
} else {
q.tail.next = msg
}
q.tail = msg
q.mu.Unlock()
if notify {
select {
case q.mch <- struct{}{}:
default:
}
}
}
// StoredMsg is for raw access to messages in a stream.
@@ -2274,18 +2311,15 @@ type StoredMsg struct {
Time time.Time `json:"time"`
}
// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering
const msetSendQSize = 65536
// This is similar to system semantics but did not want to overload the single system sendq,
// or require system account when doing simple setup with jetstream.
func (mset *stream) setupSendCapabilities() {
mset.mu.Lock()
defer mset.mu.Unlock()
if mset.sendq != nil {
if mset.outq != nil {
return
}
mset.sendq = make(chan *jsPubMsg, msetSendQSize)
mset.outq = &jsOutQ{mch: make(chan struct{}, 1)}
go mset.internalLoop()
}
@@ -2305,53 +2339,48 @@ func (mset *stream) internalLoop() {
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
sendq, mch := mset.sendq, mset.mch
name := mset.cfg.Name
outq, qch, mch := mset.outq, mset.qch, mset.mch
isClustered := mset.node != nil
mset.mu.RUnlock()
// Warn when internal send queue is backed up past 75%
warnThresh := 3 * msetSendQSize / 4
warnFreq := time.Second
last := time.Now().Add(-warnFreq)
for {
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
s.Warnf("Jetstream internal send queue > 75%% for account: %q stream: %q", c.acc.Name, name)
last = time.Now()
}
select {
case pm := <-sendq:
if pm == nil {
return
}
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
case <-outq.mch:
for pm := outq.pending(); pm != nil; {
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
didDeliver, _ := c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
didDeliver, _ := c.processInboundClientMsg(msg)
c.pa.szb = nil
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
}
// Do this here to nil out below vs up in for loop.
next := pm.next
pm.next, pm.hdr, pm.msg = nil, nil, nil
pm = next
}
c.flushClients(10 * time.Millisecond)
case <-mch:
for im := mset.pending(); im != nil; {
// If we are clustered we need to propose this message to the underlying raft group.
@@ -2365,6 +2394,8 @@ func (mset *stream) internalLoop() {
im.next, im.hdr, im.msg = nil, nil, nil
im = next
}
case <-qch:
return
case <-s.quitCh:
return
}
@@ -2432,7 +2463,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
if deleteFlag {
for _, ssi := range mset.cfg.Sources {
subject := fmt.Sprintf(JSApiConsumerDeleteT, ssi.Name, mset.sourceDurable(ssi.Name))
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
}
@@ -2441,10 +2472,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.sendDeleteAdvisoryLocked()
}
if mset.sendq != nil {
mset.sendq <- nil
}
c := mset.client
mset.client = nil
if c == nil {