mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Replaced MQTT's send queue
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
155
server/mqtt.go
155
server/mqtt.go
@@ -229,27 +229,15 @@ type mqttAccountSessionManager struct {
|
||||
replicas int
|
||||
rrmLastSeq uint64 // Restore retained messages expected last sequence
|
||||
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
|
||||
sp sessPersist // Used for cluster-wide processing of session records being persisted
|
||||
sp *ipQueue // of uint64. Used for cluster-wide processing of session records being persisted
|
||||
domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject.
|
||||
}
|
||||
|
||||
type sessPersist struct {
|
||||
mu sync.Mutex
|
||||
ch chan struct{}
|
||||
head *sessPersistRecord
|
||||
tail *sessPersistRecord
|
||||
}
|
||||
|
||||
type sessPersistRecord struct {
|
||||
seq uint64
|
||||
next *sessPersistRecord
|
||||
}
|
||||
|
||||
type mqttJSA struct {
|
||||
mu sync.Mutex
|
||||
id string
|
||||
c *client
|
||||
sendq chan *mqttJSPubMsg
|
||||
sendq *ipQueue // of *mqttJSPubMsg
|
||||
rplyr string
|
||||
replies sync.Map
|
||||
nuid *nuid.NUID
|
||||
@@ -943,13 +931,11 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
|
||||
id: id,
|
||||
c: c,
|
||||
rplyr: mqttJSARepliesPrefix + id + ".",
|
||||
sendq: make(chan *mqttJSPubMsg, 8192),
|
||||
sendq: newIPQueue(), // of *mqttJSPubMsg
|
||||
nuid: nuid.New(),
|
||||
quitCh: quitCh,
|
||||
},
|
||||
sp: sessPersist{
|
||||
ch: make(chan struct{}, 1),
|
||||
},
|
||||
sp: newIPQueue(), // of uint64
|
||||
}
|
||||
// TODO record domain name in as here
|
||||
|
||||
@@ -1213,12 +1199,12 @@ func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, time
|
||||
jsa.replies.Store(reply, ch)
|
||||
|
||||
subject = jsa.prefixDomain(subject)
|
||||
jsa.sendq <- &mqttJSPubMsg{
|
||||
jsa.sendq.push(&mqttJSPubMsg{
|
||||
subj: subject,
|
||||
reply: reply,
|
||||
hdr: hdr,
|
||||
msg: msg,
|
||||
}
|
||||
})
|
||||
|
||||
var i interface{}
|
||||
// We don't want to use time.After() which causes memory growth because the timer
|
||||
@@ -1343,10 +1329,10 @@ func (jsa *mqttJSA) deleteMsg(stream string, seq uint64, wait bool) error {
|
||||
req, _ := json.Marshal(dreq)
|
||||
subj := jsa.prefixDomain(fmt.Sprintf(JSApiMsgDeleteT, stream))
|
||||
if !wait {
|
||||
jsa.sendq <- &mqttJSPubMsg{
|
||||
jsa.sendq.push(&mqttJSPubMsg{
|
||||
subj: subj,
|
||||
msg: req,
|
||||
}
|
||||
})
|
||||
return nil
|
||||
}
|
||||
dmi, err := jsa.newRequest(mqttJSAMsgDelete, subj, 0, req)
|
||||
@@ -1516,27 +1502,11 @@ func (as *mqttAccountSessionManager) processSessionPersist(_ *subscription, pc *
|
||||
// We would need to lookup the message that that is a request/reply
|
||||
// that we can do in place here. So move that to a long-running routine
|
||||
// that will process the session persist record.
|
||||
as.mu.RLock()
|
||||
sp := &as.sp
|
||||
as.mu.RUnlock()
|
||||
|
||||
spr := &sessPersistRecord{seq: par.Sequence}
|
||||
sp.mu.Lock()
|
||||
if sp.tail != nil {
|
||||
sp.tail.next = spr
|
||||
} else {
|
||||
sp.head = spr
|
||||
select {
|
||||
case sp.ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
sp.tail = spr
|
||||
sp.mu.Unlock()
|
||||
as.sp.push(par.Sequence)
|
||||
}
|
||||
|
||||
func (as *mqttAccountSessionManager) processSessPersistRecord(spr *sessPersistRecord) {
|
||||
smsg, err := as.jsa.loadMsg(mqttSessStreamName, spr.seq)
|
||||
func (as *mqttAccountSessionManager) processSessPersistRecord(seq uint64) {
|
||||
smsg, err := as.jsa.loadMsg(mqttSessStreamName, seq)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -1551,7 +1521,7 @@ func (as *mqttAccountSessionManager) processSessPersistRecord(spr *sessPersistRe
|
||||
// If our current session's stream sequence is higher, it means that this
|
||||
// update is stale, so we don't do anything here.
|
||||
sess.mu.Lock()
|
||||
ignore := spr.seq < sess.seq
|
||||
ignore := seq < sess.seq
|
||||
sess.mu.Unlock()
|
||||
if ignore {
|
||||
return
|
||||
@@ -1573,22 +1543,18 @@ func (as *mqttAccountSessionManager) processSessPersistRecord(spr *sessPersistRe
|
||||
|
||||
func (as *mqttAccountSessionManager) sessPersistProcessing(closeCh chan struct{}) {
|
||||
as.mu.RLock()
|
||||
sp := &as.sp
|
||||
sp := as.sp
|
||||
quitCh := as.jsa.quitCh
|
||||
as.mu.RUnlock()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sp.ch:
|
||||
sp.mu.Lock()
|
||||
l := sp.head
|
||||
sp.head, sp.tail = nil, nil
|
||||
sp.mu.Unlock()
|
||||
|
||||
for spr := l; spr != nil; spr = l.next {
|
||||
l = spr
|
||||
as.processSessPersistRecord(spr)
|
||||
seqs := sp.pop()
|
||||
for _, seq := range seqs {
|
||||
as.processSessPersistRecord(seq.(uint64))
|
||||
}
|
||||
sp.recycle(&seqs)
|
||||
case <-closeCh:
|
||||
return
|
||||
case <-quitCh:
|
||||
@@ -1683,44 +1649,49 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc
|
||||
|
||||
for {
|
||||
select {
|
||||
case r := <-sendq:
|
||||
case <-sendq.ch:
|
||||
pmis := sendq.pop()
|
||||
for _, pmi := range pmis {
|
||||
r := pmi.(*mqttJSPubMsg)
|
||||
|
||||
var nsize int
|
||||
var nsize int
|
||||
|
||||
msg := r.msg
|
||||
// If r.hdr is set to -1, it means that there is no need for any header.
|
||||
if r.hdr != -1 {
|
||||
bb := bytes.Buffer{}
|
||||
if r.hdr > 0 {
|
||||
// This means that the header has been set by the caller and is
|
||||
// already part of `msg`, so simply set c.pa.hdr to the given value.
|
||||
c.pa.hdr = r.hdr
|
||||
msg := r.msg
|
||||
// If r.hdr is set to -1, it means that there is no need for any header.
|
||||
if r.hdr != -1 {
|
||||
bb := bytes.Buffer{}
|
||||
if r.hdr > 0 {
|
||||
// This means that the header has been set by the caller and is
|
||||
// already part of `msg`, so simply set c.pa.hdr to the given value.
|
||||
c.pa.hdr = r.hdr
|
||||
nsize = len(msg)
|
||||
msg = append(msg, _CRLF_...)
|
||||
} else {
|
||||
// We need the ClientInfo header, so add it here.
|
||||
bb.Write(hdrb)
|
||||
c.pa.hdr = bb.Len()
|
||||
bb.Write(r.msg)
|
||||
nsize = bb.Len()
|
||||
bb.WriteString(_CRLF_)
|
||||
msg = bb.Bytes()
|
||||
}
|
||||
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
|
||||
} else {
|
||||
c.pa.hdr = -1
|
||||
c.pa.hdb = nil
|
||||
nsize = len(msg)
|
||||
msg = append(msg, _CRLF_...)
|
||||
} else {
|
||||
// We need the ClientInfo header, so add it here.
|
||||
bb.Write(hdrb)
|
||||
c.pa.hdr = bb.Len()
|
||||
bb.Write(r.msg)
|
||||
nsize = bb.Len()
|
||||
bb.WriteString(_CRLF_)
|
||||
msg = bb.Bytes()
|
||||
}
|
||||
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
|
||||
} else {
|
||||
c.pa.hdr = -1
|
||||
c.pa.hdb = nil
|
||||
nsize = len(msg)
|
||||
msg = append(msg, _CRLF_...)
|
||||
|
||||
c.pa.subject = []byte(r.subj)
|
||||
c.pa.reply = []byte(r.reply)
|
||||
c.pa.size = nsize
|
||||
c.pa.szb = []byte(strconv.Itoa(nsize))
|
||||
|
||||
c.processInboundClientMsg(msg)
|
||||
c.flushClients(0)
|
||||
}
|
||||
|
||||
c.pa.subject = []byte(r.subj)
|
||||
c.pa.reply = []byte(r.reply)
|
||||
c.pa.size = nsize
|
||||
c.pa.szb = []byte(strconv.Itoa(nsize))
|
||||
|
||||
c.processInboundClientMsg(msg)
|
||||
c.flushClients(0)
|
||||
sendq.recycle(&pmis)
|
||||
|
||||
case <-closeCh:
|
||||
return
|
||||
@@ -2125,10 +2096,10 @@ func (as *mqttAccountSessionManager) notifyRetainedMsgDeleted(subject string, se
|
||||
}
|
||||
b, _ := json.Marshal(&req)
|
||||
jsa := &as.jsa
|
||||
jsa.sendq <- &mqttJSPubMsg{
|
||||
jsa.sendq.push(&mqttJSPubMsg{
|
||||
subj: jsa.rplyr + mqttJSARetainedMsgDel,
|
||||
msg: b,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Server) {
|
||||
@@ -2267,11 +2238,10 @@ func (sess *mqttSession) clear() error {
|
||||
}
|
||||
}
|
||||
sess.subs, sess.pending, sess.cpending, sess.seq, sess.tmaxack = nil, nil, nil, 0, 0
|
||||
sess.mu.Unlock()
|
||||
|
||||
for _, dur := range durs {
|
||||
sess.jsa.sendq <- &mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))}
|
||||
sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))})
|
||||
}
|
||||
sess.mu.Unlock()
|
||||
if seq > 0 {
|
||||
if err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true); err != nil {
|
||||
return fmt.Errorf("unable to delete session %q record at sequence %v", id, seq)
|
||||
@@ -2422,8 +2392,8 @@ func (sess *mqttSession) createConsumer(consConfig *ConsumerConfig) error {
|
||||
func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) {
|
||||
sess.mu.Lock()
|
||||
sess.tmaxack -= cc.MaxAckPending
|
||||
sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))})
|
||||
sess.mu.Unlock()
|
||||
sess.jsa.sendq <- &mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
@@ -3168,14 +3138,13 @@ func (c *client) mqttProcessPubAck(pi uint16) {
|
||||
}
|
||||
ackSubject = ack.subject
|
||||
}
|
||||
sess.mu.Unlock()
|
||||
|
||||
// Send the ack if applicable, this is done outside of the session lock.
|
||||
// Send the ack if applicable.
|
||||
if ackSubject != _EMPTY_ {
|
||||
// We pass -1 for the hdr so that the send loop does not need to
|
||||
// add the "client info" header. This is not a JS API request per se.
|
||||
sess.jsa.sendq <- &mqttJSPubMsg{subj: ackSubject, hdr: -1}
|
||||
sess.jsa.sendq.push(&mqttJSPubMsg{subj: ackSubject, hdr: -1})
|
||||
}
|
||||
sess.mu.Unlock()
|
||||
}
|
||||
|
||||
// Return the QoS from the given PUBLISH protocol's flags
|
||||
|
||||
Reference in New Issue
Block a user