mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Js leaf deny (#2693)
Along a leaf node connection, unless the system account is shared AND the JetStream domain name is identical, the default JetStream traffic (without a domain set) will be denied. As a consequence, all clients that wants to access a domain that is not the one in the server they are connected to, a domain name must be specified. Affected from this change are setups where: a leaf node had no local JetStream OR the server the leaf node connected to had no local JetStream. One of the two accounts that are connected via a leaf node remote, must have no JetStream enabled. The side that does not have JetStream enabled, will loose JetStream access and it's clients must set `nats.Domain` manually. For workarounds on how to restore the old behavior, look at: https://github.com/nats-io/nats-server/pull/2693#issuecomment-996212582 New config values added: `default_js_domain` is a mapping from account to domain, settable when JetStream is not enabled in an account. `extension_hint` are hints for non clustered server to start in clustered mode (and be usable to extend) `js_domain` is a way to set the JetStream domain to use for mqtt. Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -246,14 +246,16 @@ type sessPersistRecord struct {
|
||||
}
|
||||
|
||||
type mqttJSA struct {
|
||||
mu sync.Mutex
|
||||
id string
|
||||
c *client
|
||||
sendq chan *mqttJSPubMsg
|
||||
rplyr string
|
||||
replies sync.Map
|
||||
nuid *nuid.NUID
|
||||
quitCh chan struct{}
|
||||
mu sync.Mutex
|
||||
id string
|
||||
c *client
|
||||
sendq chan *mqttJSPubMsg
|
||||
rplyr string
|
||||
replies sync.Map
|
||||
nuid *nuid.NUID
|
||||
quitCh chan struct{}
|
||||
domain string // Domain or possibly empty. This is added to session subject.
|
||||
domainSet bool // covers if domain was set, even to empty
|
||||
}
|
||||
|
||||
type mqttJSPubMsg struct {
|
||||
@@ -931,7 +933,6 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
|
||||
|
||||
id := string(getHash(s.Name()))
|
||||
replicas := s.mqttDetermineReplicas()
|
||||
s.Noticef("Creating MQTT streams/consumers with replicas %v for account %q", replicas, accName)
|
||||
as := &mqttAccountSessionManager{
|
||||
sessions: make(map[string]*mqttSession),
|
||||
sessByHash: make(map[string]*mqttSession),
|
||||
@@ -950,10 +951,39 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
|
||||
ch: make(chan struct{}, 1),
|
||||
},
|
||||
}
|
||||
// TODO record domain name in as here
|
||||
|
||||
// The domain to communicate with may be required for JS calls.
|
||||
// Search from specific (per account setting) to generic (mqtt setting)
|
||||
opts := s.getOpts()
|
||||
if opts.JsAccDefaultDomain != nil {
|
||||
if d, ok := opts.JsAccDefaultDomain[accName]; ok {
|
||||
if d != _EMPTY_ {
|
||||
as.jsa.domain = d
|
||||
}
|
||||
as.jsa.domainSet = true
|
||||
}
|
||||
// in case domain was set to empty, check if there are more generic domain overwrites
|
||||
}
|
||||
if as.jsa.domain == _EMPTY_ {
|
||||
if d := opts.MQTT.JsDomain; d != _EMPTY_ {
|
||||
as.jsa.domain = d
|
||||
as.jsa.domainSet = true
|
||||
}
|
||||
}
|
||||
// We need to include the domain in the subject prefix used to store sessions in the $MQTT_sess stream.
|
||||
if d := s.getOpts().JetStreamDomain; d != _EMPTY_ {
|
||||
if as.jsa.domainSet {
|
||||
if as.jsa.domain != _EMPTY_ {
|
||||
as.domainTk = as.jsa.domain + "."
|
||||
}
|
||||
} else if d := s.getOpts().JetStreamDomain; d != _EMPTY_ {
|
||||
as.domainTk = d + "."
|
||||
}
|
||||
if as.jsa.domainSet {
|
||||
s.Noticef("Creating MQTT streams/consumers with replicas %v for account %q in domain %q", replicas, accName, as.jsa.domain)
|
||||
} else {
|
||||
s.Noticef("Creating MQTT streams/consumers with replicas %v for account %q", replicas, accName)
|
||||
}
|
||||
|
||||
var subs []*subscription
|
||||
var success bool
|
||||
@@ -1156,6 +1186,16 @@ func (jsa *mqttJSA) newRequest(kind, subject string, hdr int, msg []byte) (inter
|
||||
return jsa.newRequestEx(kind, subject, hdr, msg, mqttJSAPITimeout)
|
||||
}
|
||||
|
||||
func (jsa *mqttJSA) prefixDomain(subject string) string {
|
||||
if jsa.domain != _EMPTY_ {
|
||||
// rewrite js api prefix with domain
|
||||
if sub := strings.TrimPrefix(subject, JSApiPrefix+"."); sub != subject {
|
||||
subject = fmt.Sprintf("$JS.%s.API.%s", jsa.domain, sub)
|
||||
}
|
||||
}
|
||||
return subject
|
||||
}
|
||||
|
||||
func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, timeout time.Duration) (interface{}, error) {
|
||||
jsa.mu.Lock()
|
||||
// Either we use nuid.Next() which uses a global lock, or our own nuid object, but
|
||||
@@ -1172,6 +1212,7 @@ func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, time
|
||||
ch := make(chan interface{}, 1)
|
||||
jsa.replies.Store(reply, ch)
|
||||
|
||||
subject = jsa.prefixDomain(subject)
|
||||
jsa.sendq <- &mqttJSPubMsg{
|
||||
subj: subject,
|
||||
reply: reply,
|
||||
@@ -1300,14 +1341,15 @@ func (jsa *mqttJSA) storeMsgWithKind(kind, subject string, headers int, msg []by
|
||||
func (jsa *mqttJSA) deleteMsg(stream string, seq uint64, wait bool) error {
|
||||
dreq := JSApiMsgDeleteRequest{Seq: seq, NoErase: true}
|
||||
req, _ := json.Marshal(dreq)
|
||||
subj := jsa.prefixDomain(fmt.Sprintf(JSApiMsgDeleteT, stream))
|
||||
if !wait {
|
||||
jsa.sendq <- &mqttJSPubMsg{
|
||||
subj: fmt.Sprintf(JSApiMsgDeleteT, stream),
|
||||
subj: subj,
|
||||
msg: req,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
dmi, err := jsa.newRequest(mqttJSAMsgDelete, fmt.Sprintf(JSApiMsgDeleteT, stream), 0, req)
|
||||
dmi, err := jsa.newRequest(mqttJSAMsgDelete, subj, 0, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1609,7 +1651,8 @@ func (as *mqttAccountSessionManager) createSubscription(subject string, cb msgHa
|
||||
// No lock held on entry.
|
||||
func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, accName string, closeCh chan struct{}) {
|
||||
var cluster string
|
||||
if s.JetStreamEnabled() {
|
||||
if s.JetStreamEnabled() && !as.jsa.domainSet {
|
||||
// Only request the own cluster when it is clear that
|
||||
cluster = s.cachedClusterName()
|
||||
}
|
||||
as.mu.RLock()
|
||||
@@ -2227,7 +2270,7 @@ func (sess *mqttSession) clear() error {
|
||||
sess.mu.Unlock()
|
||||
|
||||
for _, dur := range durs {
|
||||
sess.jsa.sendq <- &mqttJSPubMsg{subj: fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur)}
|
||||
sess.jsa.sendq <- &mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))}
|
||||
}
|
||||
if seq > 0 {
|
||||
if err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true); err != nil {
|
||||
@@ -2380,7 +2423,7 @@ func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) {
|
||||
sess.mu.Lock()
|
||||
sess.tmaxack -= cc.MaxAckPending
|
||||
sess.mu.Unlock()
|
||||
sess.jsa.sendq <- &mqttJSPubMsg{subj: fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable)}
|
||||
sess.jsa.sendq <- &mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user