diff --git a/server/accounts.go b/server/accounts.go index 02d32028..9a68c032 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1726,7 +1726,7 @@ func (a *Account) subscribeInternal(subject string, cb msgHandler) (*subscriptio return nil, fmt.Errorf("no internal account client") } - return c.processSub(c.createSub([]byte(subject), nil, []byte(sid), cb), false) + return c.processSub([]byte(subject), nil, []byte(sid), cb, false) } // This will add an account subscription that matches the "from" from a service import entry. @@ -1751,7 +1751,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error { cb := func(sub *subscription, c *client, subject, reply string, msg []byte) { c.processServiceImport(si, a, msg) } - _, err := c.processSub(c.createSub([]byte(subject), nil, []byte(sid), cb), true) + _, err := c.processSub([]byte(subject), nil, []byte(sid), cb, true) return err } @@ -1951,7 +1951,7 @@ func (a *Account) createRespWildcard() []byte { a.mu.Unlock() // Create subscription and internal callback for all the wildcard response subjects. - c.processSub(c.createSub(wcsub, nil, []byte(sid), a.processServiceImportResponse), false) + c.processSub(wcsub, nil, []byte(sid), a.processServiceImportResponse, false) return pre } diff --git a/server/client.go b/server/client.go index 3a68b9d2..485a72df 100644 --- a/server/client.go +++ b/server/client.go @@ -2226,30 +2226,32 @@ func (c *client) parseSub(argo []byte, noForward bool) error { arg := make([]byte, len(argo)) copy(arg, argo) args := splitArg(arg) - sub := &subscription{client: c} + var ( + subject []byte + queue []byte + sid []byte + ) switch len(args) { case 2: - sub.subject = args[0] - sub.queue = nil - sub.sid = args[1] + subject = args[0] + queue = nil + sid = args[1] case 3: - sub.subject = args[0] - sub.queue = args[1] - sub.sid = args[2] + subject = args[0] + queue = args[1] + sid = args[2] default: return fmt.Errorf("processSub Parse Error: '%s'", arg) } // If there was an error, it has been sent to the client. We don't return an // error here to not close the connection as a parsing error. - c.processSub(sub, noForward) + c.processSub(subject, queue, sid, nil, noForward) return nil } -func (c *client) createSub(subject, queue, sid []byte, cb msgHandler) *subscription { - return &subscription{client: c, subject: subject, queue: queue, sid: sid, icb: cb} -} - -func (c *client) processSub(sub *subscription, noForward bool) (*subscription, error) { +func (c *client) processSub(subject, queue, bsid []byte, cb msgHandler, noForward bool) (*subscription, error) { + // Create the subscription + sub := &subscription{client: c, subject: subject, queue: queue, sid: bsid, icb: cb} c.mu.Lock() @@ -2266,7 +2268,7 @@ func (c *client) processSub(sub *subscription, noForward bool) (*subscription, e // This check does not apply to SYSTEM or JETSTREAM or ACCOUNT clients (because they don't have a `nc`...) if c.isClosed() && (kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT) { c.mu.Unlock() - return nil, nil + return nil, ErrConnectionClosed } // Check permissions if applicable. @@ -2313,9 +2315,6 @@ func (c *client) processSub(sub *subscription, noForward bool) (*subscription, e updateGWs = c.srv.gateway.enabled } } - } else if es.mqtt != nil && sub.mqtt != nil { - es.mqtt.prm = sub.mqtt.prm - es.mqtt.qos = sub.mqtt.qos } // Unlocked from here onward c.mu.Unlock() diff --git a/server/events.go b/server/events.go index 3e9c84dc..9d3862d3 100644 --- a/server/events.go +++ b/server/events.go @@ -1411,7 +1411,7 @@ func (s *Server) systemSubscribe(subject, queue string, internalOnly bool, cb ms q = []byte(queue) } // Now create the subscription - return c.processSub(c.createSub([]byte(subject), q, []byte(sid), cb), internalOnly) + return c.processSub([]byte(subject), q, []byte(sid), cb, internalOnly) } func (s *Server) sysUnsubscribe(sub *subscription) { diff --git a/server/jetstream.go b/server/jetstream.go index 66463ab1..9a591471 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1115,7 +1115,7 @@ func (t *StreamTemplate) createTemplateSubscriptions() error { sid := 1 for _, subject := range t.Config.Subjects { // Now create the subscription - if _, err := c.processSub(c.createSub([]byte(subject), nil, []byte(strconv.Itoa(sid)), t.processInboundTemplateMsg), false); err != nil { + if _, err := c.processSub([]byte(subject), nil, []byte(strconv.Itoa(sid)), t.processInboundTemplateMsg, false); err != nil { c.acc.DeleteStreamTemplate(t.Name) return err } diff --git a/server/mqtt.go b/server/mqtt.go index 8d64ff8c..dcc4a957 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -795,6 +795,16 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, clientID str sess.cons[sid] = cons } + setupSub := func(sub *subscription, qos byte) { + if sub.mqtt == nil { + sub.mqtt = &mqttSub{} + } + sub.mqtt.qos = qos + if fromSubProto { + as.serializeRetainedMsgsForSub(sess, c, sub, trace) + } + } + subs := make([]*subscription, 0, len(filters)) for _, f := range filters { if f.qos > 1 { @@ -810,17 +820,12 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, clientID str var jscons *Consumer var jssub *subscription - var err error - sub := c.mqttCreateSub(subject, sid, mqttDeliverMsgCb, f.qos) - if fromSubProto { - as.serializeRetainedMsgsForSub(sess, c, sub, trace) - } // Note that if a subscription already exists on this subject, - // the sub is updated with the new qos/prm and the pointer to - // the existing subscription is returned. - sub, err = c.processSub(sub, false) + // the existing sub is returned. Need to update the qos. + sub, err := c.processSub([]byte(subject), nil, []byte(sid), mqttDeliverMsgCb, false) if err == nil { + setupSub(sub, f.qos) // This will create (if not already exist) a JS consumer for subscriptions // of QoS >= 1. But if a JS consumer already exists and the subscription // for same subject is now a QoS==0, then the JS consumer will be deleted. @@ -836,18 +841,16 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, clientID str if mqttNeedSubForLevelUp(subject) { var fwjscons *Consumer var fwjssub *subscription + var fwcsub *subscription // Say subject is "foo.>", remove the ".>" so that it becomes "foo" fwcsubject := subject[:len(subject)-2] // Change the sid to "foo fwc" fwcsid := fwcsubject + mqttMultiLevelSidSuffix - fwcsub := c.mqttCreateSub(fwcsubject, fwcsid, mqttDeliverMsgCb, f.qos) - if fromSubProto { - as.serializeRetainedMsgsForSub(sess, c, fwcsub, trace) - } // See note above about existing subscription. - fwcsub, err = c.processSub(fwcsub, false) + fwcsub, err = c.processSub([]byte(fwcsubject), nil, []byte(fwcsid), mqttDeliverMsgCb, false) if err == nil { + setupSub(fwcsub, f.qos) fwjscons, fwjssub, err = c.mqttProcessJSConsumer(sess, as.mstream, fwcsubject, fwcsid, f.qos, fromSubProto) } @@ -1841,10 +1844,6 @@ func mqttSubscribeTrace(filters []*mqttFilter) string { } func mqttDeliverMsgCb(sub *subscription, pc *client, subject, reply string, msg []byte) { - if sub.mqtt == nil { - return - } - var ppFlags byte var pQoS byte var pi uint16 @@ -1859,7 +1858,7 @@ func mqttDeliverMsgCb(sub *subscription, pc *client, subject, reply string, msg // We lock to check some of the subscription's fields and if we need to // keep track of pending acks, etc.. sess.mu.Lock() - if sess.c != cc { + if sess.c != cc || sub.mqtt == nil { sess.mu.Unlock() return } @@ -1960,13 +1959,6 @@ func mqttSerializePublishMsg(w *mqttWriter, pi uint16, dup, retained bool, subje return flags } -// Helper to create an MQTT subscription. -func (c *client) mqttCreateSub(subject, sid string, cb msgHandler, qos byte) *subscription { - sub := c.createSub([]byte(subject), nil, []byte(sid), cb) - sub.mqtt = &mqttSub{qos: qos} - return sub -} - // Process the list of subscriptions and update the given filter // with the QoS that has been accepted (or failure). // @@ -2080,12 +2072,10 @@ func (c *client) mqttProcessJSConsumer(sess *mqttSession, stream *Stream, subjec return nil, nil, err } } - sub := c.mqttCreateSub(inbox, inbox, mqttDeliverMsgCb, qos) - sub.mqtt.jsCons = cons // This is an internal subscription on subject like "$MQTT.sub." that is setup // for the JS durable's deliver subject. I don't think that there is any need to // forward this subscription in the cluster/super cluster. - sub, err = c.processSub(sub, true) + sub, err := c.processSub([]byte(inbox), nil, []byte(inbox), mqttDeliverMsgCb, true) if err != nil { if !exists { cons.Delete() @@ -2093,6 +2083,11 @@ func (c *client) mqttProcessJSConsumer(sess *mqttSession, stream *Stream, subjec c.Errorf("Unable to create subscription for JetStream consumer on %q: %v", subject, err) return nil, nil, err } + if sub.mqtt == nil { + sub.mqtt = &mqttSub{} + } + sub.mqtt.qos = qos + sub.mqtt.jsCons = cons return cons, sub, nil } diff --git a/server/stream.go b/server/stream.go index 029cf391..c3cbea4f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -699,7 +699,7 @@ func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscript mset.sid++ // Now create the subscription - return c.processSub(c.createSub([]byte(subject), nil, []byte(strconv.Itoa(mset.sid)), cb), false) + return c.processSub([]byte(subject), nil, []byte(strconv.Itoa(mset.sid)), cb, false) } // Helper for unlocked stream.