Revert changes to processSub()

Based on how the MQTT callback operates, it is safe to finish setup
of the MQTT subscriptions after processSub() returns. So I have
reverted the changes to processSub() which will minimize changes
to non-MQTT related code.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-12-01 15:38:47 -07:00
parent 3e91ef75ab
commit 4fc04d3f55
6 changed files with 45 additions and 51 deletions

View File

@@ -1726,7 +1726,7 @@ func (a *Account) subscribeInternal(subject string, cb msgHandler) (*subscriptio
return nil, fmt.Errorf("no internal account client")
}
return c.processSub(c.createSub([]byte(subject), nil, []byte(sid), cb), false)
return c.processSub([]byte(subject), nil, []byte(sid), cb, false)
}
// This will add an account subscription that matches the "from" from a service import entry.
@@ -1751,7 +1751,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error {
cb := func(sub *subscription, c *client, subject, reply string, msg []byte) {
c.processServiceImport(si, a, msg)
}
_, err := c.processSub(c.createSub([]byte(subject), nil, []byte(sid), cb), true)
_, err := c.processSub([]byte(subject), nil, []byte(sid), cb, true)
return err
}
@@ -1951,7 +1951,7 @@ func (a *Account) createRespWildcard() []byte {
a.mu.Unlock()
// Create subscription and internal callback for all the wildcard response subjects.
c.processSub(c.createSub(wcsub, nil, []byte(sid), a.processServiceImportResponse), false)
c.processSub(wcsub, nil, []byte(sid), a.processServiceImportResponse, false)
return pre
}

View File

@@ -2226,30 +2226,32 @@ func (c *client) parseSub(argo []byte, noForward bool) error {
arg := make([]byte, len(argo))
copy(arg, argo)
args := splitArg(arg)
sub := &subscription{client: c}
var (
subject []byte
queue []byte
sid []byte
)
switch len(args) {
case 2:
sub.subject = args[0]
sub.queue = nil
sub.sid = args[1]
subject = args[0]
queue = nil
sid = args[1]
case 3:
sub.subject = args[0]
sub.queue = args[1]
sub.sid = args[2]
subject = args[0]
queue = args[1]
sid = args[2]
default:
return fmt.Errorf("processSub Parse Error: '%s'", arg)
}
// If there was an error, it has been sent to the client. We don't return an
// error here to not close the connection as a parsing error.
c.processSub(sub, noForward)
c.processSub(subject, queue, sid, nil, noForward)
return nil
}
func (c *client) createSub(subject, queue, sid []byte, cb msgHandler) *subscription {
return &subscription{client: c, subject: subject, queue: queue, sid: sid, icb: cb}
}
func (c *client) processSub(sub *subscription, noForward bool) (*subscription, error) {
func (c *client) processSub(subject, queue, bsid []byte, cb msgHandler, noForward bool) (*subscription, error) {
// Create the subscription
sub := &subscription{client: c, subject: subject, queue: queue, sid: bsid, icb: cb}
c.mu.Lock()
@@ -2266,7 +2268,7 @@ func (c *client) processSub(sub *subscription, noForward bool) (*subscription, e
// This check does not apply to SYSTEM or JETSTREAM or ACCOUNT clients (because they don't have a `nc`...)
if c.isClosed() && (kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT) {
c.mu.Unlock()
return nil, nil
return nil, ErrConnectionClosed
}
// Check permissions if applicable.
@@ -2313,9 +2315,6 @@ func (c *client) processSub(sub *subscription, noForward bool) (*subscription, e
updateGWs = c.srv.gateway.enabled
}
}
} else if es.mqtt != nil && sub.mqtt != nil {
es.mqtt.prm = sub.mqtt.prm
es.mqtt.qos = sub.mqtt.qos
}
// Unlocked from here onward
c.mu.Unlock()

View File

@@ -1411,7 +1411,7 @@ func (s *Server) systemSubscribe(subject, queue string, internalOnly bool, cb ms
q = []byte(queue)
}
// Now create the subscription
return c.processSub(c.createSub([]byte(subject), q, []byte(sid), cb), internalOnly)
return c.processSub([]byte(subject), q, []byte(sid), cb, internalOnly)
}
func (s *Server) sysUnsubscribe(sub *subscription) {

View File

@@ -1115,7 +1115,7 @@ func (t *StreamTemplate) createTemplateSubscriptions() error {
sid := 1
for _, subject := range t.Config.Subjects {
// Now create the subscription
if _, err := c.processSub(c.createSub([]byte(subject), nil, []byte(strconv.Itoa(sid)), t.processInboundTemplateMsg), false); err != nil {
if _, err := c.processSub([]byte(subject), nil, []byte(strconv.Itoa(sid)), t.processInboundTemplateMsg, false); err != nil {
c.acc.DeleteStreamTemplate(t.Name)
return err
}

View File

@@ -795,6 +795,16 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, clientID str
sess.cons[sid] = cons
}
setupSub := func(sub *subscription, qos byte) {
if sub.mqtt == nil {
sub.mqtt = &mqttSub{}
}
sub.mqtt.qos = qos
if fromSubProto {
as.serializeRetainedMsgsForSub(sess, c, sub, trace)
}
}
subs := make([]*subscription, 0, len(filters))
for _, f := range filters {
if f.qos > 1 {
@@ -810,17 +820,12 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, clientID str
var jscons *Consumer
var jssub *subscription
var err error
sub := c.mqttCreateSub(subject, sid, mqttDeliverMsgCb, f.qos)
if fromSubProto {
as.serializeRetainedMsgsForSub(sess, c, sub, trace)
}
// Note that if a subscription already exists on this subject,
// the sub is updated with the new qos/prm and the pointer to
// the existing subscription is returned.
sub, err = c.processSub(sub, false)
// the existing sub is returned. Need to update the qos.
sub, err := c.processSub([]byte(subject), nil, []byte(sid), mqttDeliverMsgCb, false)
if err == nil {
setupSub(sub, f.qos)
// This will create (if not already exist) a JS consumer for subscriptions
// of QoS >= 1. But if a JS consumer already exists and the subscription
// for same subject is now a QoS==0, then the JS consumer will be deleted.
@@ -836,18 +841,16 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, clientID str
if mqttNeedSubForLevelUp(subject) {
var fwjscons *Consumer
var fwjssub *subscription
var fwcsub *subscription
// Say subject is "foo.>", remove the ".>" so that it becomes "foo"
fwcsubject := subject[:len(subject)-2]
// Change the sid to "foo fwc"
fwcsid := fwcsubject + mqttMultiLevelSidSuffix
fwcsub := c.mqttCreateSub(fwcsubject, fwcsid, mqttDeliverMsgCb, f.qos)
if fromSubProto {
as.serializeRetainedMsgsForSub(sess, c, fwcsub, trace)
}
// See note above about existing subscription.
fwcsub, err = c.processSub(fwcsub, false)
fwcsub, err = c.processSub([]byte(fwcsubject), nil, []byte(fwcsid), mqttDeliverMsgCb, false)
if err == nil {
setupSub(fwcsub, f.qos)
fwjscons, fwjssub, err = c.mqttProcessJSConsumer(sess, as.mstream,
fwcsubject, fwcsid, f.qos, fromSubProto)
}
@@ -1841,10 +1844,6 @@ func mqttSubscribeTrace(filters []*mqttFilter) string {
}
func mqttDeliverMsgCb(sub *subscription, pc *client, subject, reply string, msg []byte) {
if sub.mqtt == nil {
return
}
var ppFlags byte
var pQoS byte
var pi uint16
@@ -1859,7 +1858,7 @@ func mqttDeliverMsgCb(sub *subscription, pc *client, subject, reply string, msg
// We lock to check some of the subscription's fields and if we need to
// keep track of pending acks, etc..
sess.mu.Lock()
if sess.c != cc {
if sess.c != cc || sub.mqtt == nil {
sess.mu.Unlock()
return
}
@@ -1960,13 +1959,6 @@ func mqttSerializePublishMsg(w *mqttWriter, pi uint16, dup, retained bool, subje
return flags
}
// Helper to create an MQTT subscription.
func (c *client) mqttCreateSub(subject, sid string, cb msgHandler, qos byte) *subscription {
sub := c.createSub([]byte(subject), nil, []byte(sid), cb)
sub.mqtt = &mqttSub{qos: qos}
return sub
}
// Process the list of subscriptions and update the given filter
// with the QoS that has been accepted (or failure).
//
@@ -2080,12 +2072,10 @@ func (c *client) mqttProcessJSConsumer(sess *mqttSession, stream *Stream, subjec
return nil, nil, err
}
}
sub := c.mqttCreateSub(inbox, inbox, mqttDeliverMsgCb, qos)
sub.mqtt.jsCons = cons
// This is an internal subscription on subject like "$MQTT.sub.<nuid>" that is setup
// for the JS durable's deliver subject. I don't think that there is any need to
// forward this subscription in the cluster/super cluster.
sub, err = c.processSub(sub, true)
sub, err := c.processSub([]byte(inbox), nil, []byte(inbox), mqttDeliverMsgCb, true)
if err != nil {
if !exists {
cons.Delete()
@@ -2093,6 +2083,11 @@ func (c *client) mqttProcessJSConsumer(sess *mqttSession, stream *Stream, subjec
c.Errorf("Unable to create subscription for JetStream consumer on %q: %v", subject, err)
return nil, nil, err
}
if sub.mqtt == nil {
sub.mqtt = &mqttSub{}
}
sub.mqtt.qos = qos
sub.mqtt.jsCons = cons
return cons, sub, nil
}

View File

@@ -699,7 +699,7 @@ func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscript
mset.sid++
// Now create the subscription
return c.processSub(c.createSub([]byte(subject), nil, []byte(strconv.Itoa(mset.sid)), cb), false)
return c.processSub([]byte(subject), nil, []byte(strconv.Itoa(mset.sid)), cb, false)
}
// Helper for unlocked stream.