From b44e9e01b6b45067c4da6682046f92a166310f22 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 22 Dec 2021 11:35:15 -0700 Subject: [PATCH] Replaced MQTT's send queue Signed-off-by: Ivan Kozlovic --- server/mqtt.go | 155 ++++++++++++++++++++----------------------------- 1 file changed, 62 insertions(+), 93 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index 63118893..6bbf76b5 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -229,27 +229,15 @@ type mqttAccountSessionManager struct { replicas int rrmLastSeq uint64 // Restore retained messages expected last sequence rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded - sp sessPersist // Used for cluster-wide processing of session records being persisted + sp *ipQueue // of uint64. Used for cluster-wide processing of session records being persisted domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject. } -type sessPersist struct { - mu sync.Mutex - ch chan struct{} - head *sessPersistRecord - tail *sessPersistRecord -} - -type sessPersistRecord struct { - seq uint64 - next *sessPersistRecord -} - type mqttJSA struct { mu sync.Mutex id string c *client - sendq chan *mqttJSPubMsg + sendq *ipQueue // of *mqttJSPubMsg rplyr string replies sync.Map nuid *nuid.NUID @@ -943,13 +931,11 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id: id, c: c, rplyr: mqttJSARepliesPrefix + id + ".", - sendq: make(chan *mqttJSPubMsg, 8192), + sendq: newIPQueue(), // of *mqttJSPubMsg nuid: nuid.New(), quitCh: quitCh, }, - sp: sessPersist{ - ch: make(chan struct{}, 1), - }, + sp: newIPQueue(), // of uint64 } // TODO record domain name in as here @@ -1213,12 +1199,12 @@ func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, time jsa.replies.Store(reply, ch) subject = jsa.prefixDomain(subject) - jsa.sendq <- &mqttJSPubMsg{ + jsa.sendq.push(&mqttJSPubMsg{ subj: subject, reply: reply, hdr: hdr, msg: msg, - } + }) var i interface{} // We don't want to use time.After() which causes memory growth because the timer @@ -1343,10 +1329,10 @@ func (jsa *mqttJSA) deleteMsg(stream string, seq uint64, wait bool) error { req, _ := json.Marshal(dreq) subj := jsa.prefixDomain(fmt.Sprintf(JSApiMsgDeleteT, stream)) if !wait { - jsa.sendq <- &mqttJSPubMsg{ + jsa.sendq.push(&mqttJSPubMsg{ subj: subj, msg: req, - } + }) return nil } dmi, err := jsa.newRequest(mqttJSAMsgDelete, subj, 0, req) @@ -1516,27 +1502,11 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc * // We would need to lookup the message that that is a request/reply // that we can do in place here. So move that to a long-running routine // that will process the session persist record. - as.mu.RLock() - sp := &as.sp - as.mu.RUnlock() - - spr := &sessPersistRecord{seq: par.Sequence} - sp.mu.Lock() - if sp.tail != nil { - sp.tail.next = spr - } else { - sp.head = spr - select { - case sp.ch <- struct{}{}: - default: - } - } - sp.tail = spr - sp.mu.Unlock() + as.sp.push(par.Sequence) } -func (as *mqttAccountSessionManager) processSessPersistRecord(spr *sessPersistRecord) { - smsg, err := as.jsa.loadMsg(mqttSessStreamName, spr.seq) +func (as *mqttAccountSessionManager) processSessPersistRecord(seq uint64) { + smsg, err := as.jsa.loadMsg(mqttSessStreamName, seq) if err != nil { return } @@ -1551,7 +1521,7 @@ func (as *mqttAccountSessionManager) processSessPersistRecord(spr *sessPersistRe // If our current session's stream sequence is higher, it means that this // update is stale, so we don't do anything here. sess.mu.Lock() - ignore := spr.seq < sess.seq + ignore := seq < sess.seq sess.mu.Unlock() if ignore { return @@ -1573,22 +1543,18 @@ func (as *mqttAccountSessionManager) processSessPersistRecord(spr *sessPersistRe func (as *mqttAccountSessionManager) sessPersistProcessing(closeCh chan struct{}) { as.mu.RLock() - sp := &as.sp + sp := as.sp quitCh := as.jsa.quitCh as.mu.RUnlock() for { select { case <-sp.ch: - sp.mu.Lock() - l := sp.head - sp.head, sp.tail = nil, nil - sp.mu.Unlock() - - for spr := l; spr != nil; spr = l.next { - l = spr - as.processSessPersistRecord(spr) + seqs := sp.pop() + for _, seq := range seqs { + as.processSessPersistRecord(seq.(uint64)) } + sp.recycle(&seqs) case <-closeCh: return case <-quitCh: @@ -1683,44 +1649,49 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc for { select { - case r := <-sendq: + case <-sendq.ch: + pmis := sendq.pop() + for _, pmi := range pmis { + r := pmi.(*mqttJSPubMsg) - var nsize int + var nsize int - msg := r.msg - // If r.hdr is set to -1, it means that there is no need for any header. - if r.hdr != -1 { - bb := bytes.Buffer{} - if r.hdr > 0 { - // This means that the header has been set by the caller and is - // already part of `msg`, so simply set c.pa.hdr to the given value. - c.pa.hdr = r.hdr + msg := r.msg + // If r.hdr is set to -1, it means that there is no need for any header. + if r.hdr != -1 { + bb := bytes.Buffer{} + if r.hdr > 0 { + // This means that the header has been set by the caller and is + // already part of `msg`, so simply set c.pa.hdr to the given value. + c.pa.hdr = r.hdr + nsize = len(msg) + msg = append(msg, _CRLF_...) + } else { + // We need the ClientInfo header, so add it here. + bb.Write(hdrb) + c.pa.hdr = bb.Len() + bb.Write(r.msg) + nsize = bb.Len() + bb.WriteString(_CRLF_) + msg = bb.Bytes() + } + c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + } else { + c.pa.hdr = -1 + c.pa.hdb = nil nsize = len(msg) msg = append(msg, _CRLF_...) - } else { - // We need the ClientInfo header, so add it here. - bb.Write(hdrb) - c.pa.hdr = bb.Len() - bb.Write(r.msg) - nsize = bb.Len() - bb.WriteString(_CRLF_) - msg = bb.Bytes() } - c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) - } else { - c.pa.hdr = -1 - c.pa.hdb = nil - nsize = len(msg) - msg = append(msg, _CRLF_...) + + c.pa.subject = []byte(r.subj) + c.pa.reply = []byte(r.reply) + c.pa.size = nsize + c.pa.szb = []byte(strconv.Itoa(nsize)) + + c.processInboundClientMsg(msg) + c.flushClients(0) } - - c.pa.subject = []byte(r.subj) - c.pa.reply = []byte(r.reply) - c.pa.size = nsize - c.pa.szb = []byte(strconv.Itoa(nsize)) - - c.processInboundClientMsg(msg) - c.flushClients(0) + sendq.recycle(&pmis) case <-closeCh: return @@ -2125,10 +2096,10 @@ func (as *mqttAccountSessionManager) notifyRetainedMsgDeleted(subject string, se } b, _ := json.Marshal(&req) jsa := &as.jsa - jsa.sendq <- &mqttJSPubMsg{ + jsa.sendq.push(&mqttJSPubMsg{ subj: jsa.rplyr + mqttJSARetainedMsgDel, msg: b, - } + }) } func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Server) { @@ -2267,11 +2238,10 @@ func (sess *mqttSession) clear() error { } } sess.subs, sess.pending, sess.cpending, sess.seq, sess.tmaxack = nil, nil, nil, 0, 0 - sess.mu.Unlock() - for _, dur := range durs { - sess.jsa.sendq <- &mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))} + sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))}) } + sess.mu.Unlock() if seq > 0 { if err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true); err != nil { return fmt.Errorf("unable to delete session %q record at sequence %v", id, seq) @@ -2422,8 +2392,8 @@ func (sess *mqttSession) createConsumer(consConfig *ConsumerConfig) error { func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) { sess.mu.Lock() sess.tmaxack -= cc.MaxAckPending + sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))}) sess.mu.Unlock() - sess.jsa.sendq <- &mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))} } ////////////////////////////////////////////////////////////////////////////// @@ -3168,14 +3138,13 @@ func (c *client) mqttProcessPubAck(pi uint16) { } ackSubject = ack.subject } - sess.mu.Unlock() - - // Send the ack if applicable, this is done outside of the session lock. + // Send the ack if applicable. if ackSubject != _EMPTY_ { // We pass -1 for the hdr so that the send loop does not need to // add the "client info" header. This is not a JS API request per se. - sess.jsa.sendq <- &mqttJSPubMsg{subj: ackSubject, hdr: -1} + sess.jsa.sendq.push(&mqttJSPubMsg{subj: ackSubject, hdr: -1}) } + sess.mu.Unlock() } // Return the QoS from the given PUBLISH protocol's flags