// Copyright 2018-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "bytes" "crypto/sha256" "crypto/tls" "encoding/json" "fmt" "math/rand" "net" "net/url" "sort" "strconv" "sync" "sync/atomic" "time" ) const ( defaultSolicitGatewaysDelay = time.Second defaultGatewayConnectDelay = time.Second defaultGatewayReconnectDelay = time.Second defaultGatewayRecentSubExpiration = 2 * time.Second defaultGatewayMaxRUnsubBeforeSwitch = 1000 oldGWReplyPrefix = "$GR." oldGWReplyPrefixLen = len(oldGWReplyPrefix) oldGWReplyStart = oldGWReplyPrefixLen + 5 // len of prefix above + len of hash (4) + "." // The new prefix is "_GR_..." where is 6 characters // hash of origin cluster name and is 6 characters hash of origin server pub key. gwReplyPrefix = "_GR_." gwReplyPrefixLen = len(gwReplyPrefix) gwHashLen = 6 gwClusterOffset = gwReplyPrefixLen gwServerOffset = gwClusterOffset + gwHashLen + 1 gwSubjectOffset = gwServerOffset + gwHashLen + 1 // Gateway connections send PINGs regardless of traffic. The interval is // either Options.PingInterval or this value, whichever is the smallest. gwMaxPingInterval = 15 * time.Second ) var ( gatewayConnectDelay = defaultGatewayConnectDelay gatewayReconnectDelay = defaultGatewayReconnectDelay gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch gatewaySolicitDelay = int64(defaultSolicitGatewaysDelay) gatewayMaxPingInterval = gwMaxPingInterval ) // Warning when user configures gateway TLS insecure const gatewayTLSInsecureWarning = "TLS certificate chain and hostname of solicited gateways will not be verified. DO NOT USE IN PRODUCTION!" // SetGatewaysSolicitDelay sets the initial delay before gateways // connections are initiated. // Used by tests. func SetGatewaysSolicitDelay(delay time.Duration) { atomic.StoreInt64(&gatewaySolicitDelay, int64(delay)) } // ResetGatewaysSolicitDelay resets the initial delay before gateways // connections are initiated to its default values. // Used by tests. func ResetGatewaysSolicitDelay() { atomic.StoreInt64(&gatewaySolicitDelay, int64(defaultSolicitGatewaysDelay)) } const ( gatewayCmdGossip byte = 1 gatewayCmdAllSubsStart byte = 2 gatewayCmdAllSubsComplete byte = 3 ) // GatewayInterestMode represents an account interest mode for a gateway connection type GatewayInterestMode byte // GatewayInterestMode values const ( // optimistic is the default mode where a cluster will send // to a gateway unless it is been told that there is no interest // (this is for plain subscribers only). Optimistic GatewayInterestMode = iota // transitioning is when a gateway has to send too many // no interest on subjects to the remote and decides that it is // now time to move to modeInterestOnly (this is on a per account // basis). Transitioning // interestOnly means that a cluster sends all it subscriptions // interest to the gateway, which in return does not send a message // unless it knows that there is explicit interest. InterestOnly ) func (im GatewayInterestMode) String() string { switch im { case Optimistic: return "Optimistic" case InterestOnly: return "Interest-Only" case Transitioning: return "Transitioning" default: return "Unknown" } } var gwDoNotForceInterestOnlyMode bool // GatewayDoNotForceInterestOnlyMode is used ONLY in tests. // DO NOT USE in normal code or if you embed the NATS Server. func GatewayDoNotForceInterestOnlyMode(doNotForce bool) { gwDoNotForceInterestOnlyMode = doNotForce } type srvGateway struct { totalQSubs int64 //total number of queue subs in all remote gateways (used with atomic operations) sync.RWMutex enabled bool // Immutable, true if both a name and port are configured name string // Name of the Gateway on this server out map[string]*client // outbound gateways outo []*client // outbound gateways maintained in an order suitable for sending msgs (currently based on RTT) in map[uint64]*client // inbound gateways remotes map[string]*gatewayCfg // Config of remote gateways URLs refCountedUrlSet // Set of all Gateway URLs in the cluster URL string // This server gateway URL (after possible random port is resolved) info *Info // Gateway Info protocol infoJSON []byte // Marshal'ed Info protocol runknown bool // Rejects unknown (not configured) gateway connections replyPfx []byte // Will be "$GNR.<1:reserved>.<8:cluster hash>.<8:server hash>." // For backward compatibility oldReplyPfx []byte oldHash []byte // We maintain the interest of subjects and queues per account. // For a given account, entries in the map could be something like this: // foo.bar {n: 3} // 3 subs on foo.bar // foo.> {n: 6} // 6 subs on foo.> // foo bar {n: 1, q: true} // 1 qsub on foo, queue bar // foo baz {n: 3, q: true} // 3 qsubs on foo, queue baz pasi struct { // Protect map since accessed from different go-routine and avoid // possible race resulting in RS+ being sent before RS- resulting // in incorrect interest suppression. // Will use while sending QSubs (on GW connection accept) and when // switching to the send-all-subs mode. sync.Mutex m map[string]map[string]*sitally } // This is to track recent subscriptions for a given account rsubs sync.Map resolver netResolver // Used to resolve host name before calling net.Dial() sqbsz int // Max buffer size to send queue subs protocol. Used for testing. recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply // These are used for routing of mapped replies. sIDHash []byte // Server ID hash (6 bytes) routesIDByHash sync.Map // Route's server ID is hashed (6 bytes) and stored in this map. // If a server has its own configuration in the "Gateways" remotes configuration // we will keep track of the URLs that are defined in the config so they can // be reported in monitoring. ownCfgURLs []string } // Subject interest tally. Also indicates if the key in the map is a // queue or not. type sitally struct { n int32 // number of subscriptions directly matching q bool // indicate that this is a queue } type gatewayCfg struct { sync.RWMutex *RemoteGatewayOpts hash []byte oldHash []byte urls map[string]*url.URL connAttempts int tlsName string implicit bool varzUpdateURLs bool // Tells monitoring code to update URLs when varz is inspected. } // Struct for client's gateway related fields type gateway struct { name string cfg *gatewayCfg connectURL *url.URL // Needed when sending CONNECT after receiving INFO from remote outsim *sync.Map // Per-account subject interest (or no-interest) (outbound conn) insim map[string]*insie // Per-account subject no-interest sent or modeInterestOnly mode (inbound conn) // This is an outbound GW connection outbound bool // Set/check in readLoop without lock. This is to know that an inbound has sent the CONNECT protocol first connected bool // Set to true if outbound is to a server that only knows about $GR, not $GNR useOldPrefix bool // If true, it indicates that the inbound side will switch any account to // interest-only mode "immediately", so the outbound should disregard // the optimistic mode when checking for interest. interestOnlyMode bool } // Outbound subject interest entry. type outsie struct { sync.RWMutex // Indicate that all subs should be stored. This is // set to true when receiving the command from the // remote that we are about to receive all its subs. mode GatewayInterestMode // If not nil, used for no-interest for plain subs. // If a subject is present in this map, it means that // the remote is not interested in that subject. // When we have received the command that says that // the remote has sent all its subs, this is set to nil. ni map[string]struct{} // Contains queue subscriptions when in optimistic mode, // and all subs when pk is > 0. sl *Sublist // Number of queue subs qsubs int } // Inbound subject interest entry. // If `ni` is not nil, it stores the subjects for which an // RS- was sent to the remote gateway. When a subscription // is created, this is used to know if we need to send // an RS+ to clear the no-interest in the remote. // When an account is switched to modeInterestOnly (we send // all subs of an account to the remote), then `ni` is nil and // when all subs have been sent, mode is set to modeInterestOnly type insie struct { ni map[string]struct{} // Record if RS- was sent for given subject mode GatewayInterestMode } type gwReplyMap struct { ms string exp int64 } type gwReplyMapping struct { // Indicate if we should check the map or not. Since checking the map is done // when processing inbound messages and requires the lock we want to // check only when needed. This is set/get using atomic, so needs to // be memory aligned. check int32 // To keep track of gateway replies mapping mapping map[string]*gwReplyMap } // Returns the corresponding gw routed subject, and `true` to indicate that a // mapping was found. If no entry is found, the passed subject is returned // as-is and `false` is returned to indicate that no mapping was found. // Caller is responsible to ensure the locking. func (g *gwReplyMapping) get(subject []byte) ([]byte, bool) { rm, ok := g.mapping[string(subject)] if !ok { return subject, false } subj := []byte(rm.ms) return subj, true } // clone returns a deep copy of the RemoteGatewayOpts object func (r *RemoteGatewayOpts) clone() *RemoteGatewayOpts { if r == nil { return nil } clone := &RemoteGatewayOpts{ Name: r.Name, URLs: deepCopyURLs(r.URLs), } if r.TLSConfig != nil { clone.TLSConfig = r.TLSConfig.Clone() clone.TLSTimeout = r.TLSTimeout } return clone } // Ensure that gateway is properly configured. func validateGatewayOptions(o *Options) error { if o.Gateway.Name == "" && o.Gateway.Port == 0 { return nil } if o.Gateway.Name == "" { return fmt.Errorf("gateway has no name") } if o.Gateway.Port == 0 { return fmt.Errorf("gateway %q has no port specified (select -1 for random port)", o.Gateway.Name) } for i, g := range o.Gateway.Gateways { if g.Name == "" { return fmt.Errorf("gateway in the list %d has no name", i) } if len(g.URLs) == 0 { return fmt.Errorf("gateway %q has no URL", g.Name) } } if err := validatePinnedCerts(o.Gateway.TLSPinnedCerts); err != nil { return fmt.Errorf("gateway %q: %v", o.Gateway.Name, err) } return nil } // Computes a hash of 6 characters for the name. // This will be used for routing of replies. func getGWHash(name string) []byte { return []byte(getHashSize(name, gwHashLen)) } func getOldHash(name string) []byte { sha := sha256.New() sha.Write([]byte(name)) fullHash := []byte(fmt.Sprintf("%x", sha.Sum(nil))) return fullHash[:4] } // Initialize the s.gateway structure. We do this even if the server // does not have a gateway configured. In some part of the code, the // server will check the number of outbound gateways, etc.. and so // we don't have to check if s.gateway is nil or not. func (s *Server) newGateway(opts *Options) error { gateway := &srvGateway{ name: opts.Gateway.Name, out: make(map[string]*client), outo: make([]*client, 0, 4), in: make(map[uint64]*client), remotes: make(map[string]*gatewayCfg), URLs: make(refCountedUrlSet), resolver: opts.Gateway.resolver, runknown: opts.Gateway.RejectUnknown, oldHash: getOldHash(opts.Gateway.Name), } gateway.Lock() defer gateway.Unlock() gateway.sIDHash = getGWHash(s.info.ID) clusterHash := getGWHash(opts.Gateway.Name) prefix := make([]byte, 0, gwSubjectOffset) prefix = append(prefix, gwReplyPrefix...) prefix = append(prefix, clusterHash...) prefix = append(prefix, '.') prefix = append(prefix, gateway.sIDHash...) prefix = append(prefix, '.') gateway.replyPfx = prefix prefix = make([]byte, 0, oldGWReplyStart) prefix = append(prefix, oldGWReplyPrefix...) prefix = append(prefix, gateway.oldHash...) prefix = append(prefix, '.') gateway.oldReplyPfx = prefix gateway.pasi.m = make(map[string]map[string]*sitally) if gateway.resolver == nil { gateway.resolver = netResolver(net.DefaultResolver) } // Create remote gateways for _, rgo := range opts.Gateway.Gateways { // Ignore if there is a remote gateway with our name. if rgo.Name == gateway.name { gateway.ownCfgURLs = getURLsAsString(rgo.URLs) continue } cfg := &gatewayCfg{ RemoteGatewayOpts: rgo.clone(), hash: getGWHash(rgo.Name), oldHash: getOldHash(rgo.Name), urls: make(map[string]*url.URL, len(rgo.URLs)), } if opts.Gateway.TLSConfig != nil && cfg.TLSConfig == nil { cfg.TLSConfig = opts.Gateway.TLSConfig.Clone() } if cfg.TLSTimeout == 0 { cfg.TLSTimeout = opts.Gateway.TLSTimeout } for _, u := range rgo.URLs { // For TLS, look for a hostname that we can use for TLSConfig.ServerName cfg.saveTLSHostname(u) cfg.urls[u.Host] = u } gateway.remotes[cfg.Name] = cfg } gateway.sqbsz = opts.Gateway.sendQSubsBufSize if gateway.sqbsz == 0 { gateway.sqbsz = maxBufSize } gateway.recSubExp = defaultGatewayRecentSubExpiration gateway.enabled = opts.Gateway.Name != "" && opts.Gateway.Port != 0 s.gateway = gateway return nil } // Update remote gateways TLS configurations after a config reload. func (g *srvGateway) updateRemotesTLSConfig(opts *Options) { g.Lock() defer g.Unlock() for _, ro := range opts.Gateway.Gateways { if ro.Name == g.name { continue } if cfg, ok := g.remotes[ro.Name]; ok { cfg.Lock() // If TLS config is in remote, use that one, otherwise, // use the TLS config from the main block. if ro.TLSConfig != nil { cfg.TLSConfig = ro.TLSConfig.Clone() } else if opts.Gateway.TLSConfig != nil { cfg.TLSConfig = opts.Gateway.TLSConfig.Clone() } cfg.Unlock() } } } // Returns if this server rejects connections from gateways that are not // explicitly configured. func (g *srvGateway) rejectUnknown() bool { g.RLock() reject := g.runknown g.RUnlock() return reject } // Starts the gateways accept loop and solicit explicit gateways // after an initial delay. This delay is meant to give a chance to // the cluster to form and this server gathers gateway URLs for this // cluster in order to send that as part of the connect/info process. func (s *Server) startGateways() { s.startGatewayAcceptLoop() // Delay start of creation of gateways to give a chance // to the local cluster to form. s.startGoRoutine(func() { defer s.grWG.Done() dur := s.getOpts().gatewaysSolicitDelay if dur == 0 { dur = time.Duration(atomic.LoadInt64(&gatewaySolicitDelay)) } select { case <-time.After(dur): s.solicitGateways() case <-s.quitCh: return } }) } // This starts the gateway accept loop in a go routine, unless it // is detected that the server has already been shutdown. func (s *Server) startGatewayAcceptLoop() { if s.isShuttingDown() { return } // Snapshot server options. opts := s.getOpts() port := opts.Gateway.Port if port == -1 { port = 0 } s.mu.Lock() hp := net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(port)) l, e := natsListen("tcp", hp) s.gatewayListenerErr = e if e != nil { s.mu.Unlock() s.Fatalf("Error listening on gateway port: %d - %v", opts.Gateway.Port, e) return } s.Noticef("Gateway name is %s", s.getGatewayName()) s.Noticef("Listening for gateways connections on %s", net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port))) tlsReq := opts.Gateway.TLSConfig != nil authRequired := opts.Gateway.Username != "" info := &Info{ ID: s.info.ID, Name: opts.ServerName, Version: s.info.Version, AuthRequired: authRequired, TLSRequired: tlsReq, TLSVerify: tlsReq, MaxPayload: s.info.MaxPayload, Gateway: opts.Gateway.Name, GatewayNRP: true, Headers: s.supportsHeaders(), } // Unless in some tests we want to keep the old behavior, we are now // (since v2.9.0) indicate that this server will switch all accounts // to InterestOnly mode when accepting an inbound or when a new // account is fetched. if !gwDoNotForceInterestOnlyMode { info.GatewayIOM = true } // If we have selected a random port... if port == 0 { // Write resolved port back to options. opts.Gateway.Port = l.Addr().(*net.TCPAddr).Port } // Possibly override Host/Port based on Gateway.Advertise if err := s.setGatewayInfoHostPort(info, opts); err != nil { s.Fatalf("Error setting gateway INFO with Gateway.Advertise value of %s, err=%v", opts.Gateway.Advertise, err) l.Close() s.mu.Unlock() return } // Setup state that can enable shutdown s.gatewayListener = l // Warn if insecure is configured in the main Gateway configuration // or any of the RemoteGateway's. This means that we need to check // remotes even if TLS would not be configured for the accept. warn := tlsReq && opts.Gateway.TLSConfig.InsecureSkipVerify if !warn { for _, g := range opts.Gateway.Gateways { if g.TLSConfig != nil && g.TLSConfig.InsecureSkipVerify { warn = true break } } } if warn { s.Warnf(gatewayTLSInsecureWarning) } go s.acceptConnections(l, "Gateway", func(conn net.Conn) { s.createGateway(nil, nil, conn) }, nil) s.mu.Unlock() } // Similar to setInfoHostPortAndGenerateJSON, but for gatewayInfo. func (s *Server) setGatewayInfoHostPort(info *Info, o *Options) error { gw := s.gateway gw.Lock() defer gw.Unlock() gw.URLs.removeUrl(gw.URL) if o.Gateway.Advertise != "" { advHost, advPort, err := parseHostPort(o.Gateway.Advertise, o.Gateway.Port) if err != nil { return err } info.Host = advHost info.Port = advPort } else { info.Host = o.Gateway.Host info.Port = o.Gateway.Port // If the host is "0.0.0.0" or "::" we need to resolve to a public IP. // This will return at most 1 IP. hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(info.Host, false) if err != nil { return err } if hostIsIPAny { if len(ips) == 0 { // TODO(ik): Should we fail here (prevent starting)? If not, we // are going to "advertise" the 0.0.0.0: url, which means // that remote are going to try to connect to 0.0.0.0:, // which means a connect to loopback address, which is going // to fail with either TLS error, conn refused if the remote // is using different gateway port than this one, or error // saying that it tried to connect to itself. s.Errorf("Could not find any non-local IP for gateway %q with listen specification %q", gw.name, info.Host) } else { // Take the first from the list... info.Host = ips[0] } } } gw.URL = net.JoinHostPort(info.Host, strconv.Itoa(info.Port)) if o.Gateway.Advertise != "" { s.Noticef("Advertise address for gateway %q is set to %s", gw.name, gw.URL) } else { s.Noticef("Address for gateway %q is %s", gw.name, gw.URL) } gw.URLs[gw.URL]++ gw.info = info info.GatewayURL = gw.URL // (re)generate the gatewayInfoJSON byte array gw.generateInfoJSON() return nil } // Generates the Gateway INFO protocol. // The gateway lock is held on entry func (g *srvGateway) generateInfoJSON() { // We could be here when processing a route INFO that has a gateway URL, // but this server is not configured for gateways, so simply ignore here. // The configuration mismatch is reported somewhere else. if !g.enabled || g.info == nil { return } g.info.GatewayURLs = g.URLs.getAsStringSlice() b, err := json.Marshal(g.info) if err != nil { panic(err) } g.infoJSON = []byte(fmt.Sprintf(InfoProto, b)) } // Goes through the list of registered gateways and try to connect to those. // The list (remotes) is initially containing the explicit remote gateways, // but the list is augmented with any implicit (discovered) gateway. Therefore, // this function only solicit explicit ones. func (s *Server) solicitGateways() { gw := s.gateway gw.RLock() defer gw.RUnlock() for _, cfg := range gw.remotes { // Since we delay the creation of gateways, it is // possible that server starts to receive inbound from // other clusters and in turn create outbounds. So here // we create only the ones that are configured. if !cfg.isImplicit() { cfg := cfg // Create new instance for the goroutine. s.startGoRoutine(func() { s.solicitGateway(cfg, true) s.grWG.Done() }) } } } // Reconnect to the gateway after a little wait period. For explicit // gateways, we also wait for the default reconnect time. func (s *Server) reconnectGateway(cfg *gatewayCfg) { defer s.grWG.Done() delay := time.Duration(rand.Intn(100)) * time.Millisecond if !cfg.isImplicit() { delay += gatewayReconnectDelay } select { case <-time.After(delay): case <-s.quitCh: return } s.solicitGateway(cfg, false) } // This function will loop trying to connect to any URL attached // to the given Gateway. It will return once a connection has been created. func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) { var ( opts = s.getOpts() isImplicit = cfg.isImplicit() attempts int typeStr string ) if isImplicit { typeStr = "implicit" } else { typeStr = "explicit" } const connFmt = "Connecting to %s gateway %q (%s) at %s (attempt %v)" const connErrFmt = "Error connecting to %s gateway %q (%s) at %s (attempt %v): %v" for s.isRunning() { urls := cfg.getURLs() if len(urls) == 0 { break } attempts++ report := s.shouldReportConnectErr(firstConnect, attempts) // Iteration is random for _, u := range urls { address, err := s.getRandomIP(s.gateway.resolver, u.Host, nil) if err != nil { s.Errorf("Error getting IP for %s gateway %q (%s): %v", typeStr, cfg.Name, u.Host, err) continue } if report { s.Noticef(connFmt, typeStr, cfg.Name, u.Host, address, attempts) } else { s.Debugf(connFmt, typeStr, cfg.Name, u.Host, address, attempts) } conn, err := natsDialTimeout("tcp", address, DEFAULT_ROUTE_DIAL) if err == nil { // We could connect, create the gateway connection and return. s.createGateway(cfg, u, conn) return } if report { s.Errorf(connErrFmt, typeStr, cfg.Name, u.Host, address, attempts, err) } else { s.Debugf(connErrFmt, typeStr, cfg.Name, u.Host, address, attempts, err) } // Break this loop if server is being shutdown... if !s.isRunning() { break } } if isImplicit { if opts.Gateway.ConnectRetries == 0 || attempts > opts.Gateway.ConnectRetries { s.gateway.Lock() // We could have just accepted an inbound for this remote gateway. // So if there is an inbound, let's try again to connect. if s.gateway.hasInbound(cfg.Name) { s.gateway.Unlock() continue } delete(s.gateway.remotes, cfg.Name) s.gateway.Unlock() return } } select { case <-s.quitCh: return case <-time.After(gatewayConnectDelay): continue } } } // Returns true if there is an inbound for the given `name`. // Lock held on entry. func (g *srvGateway) hasInbound(name string) bool { for _, ig := range g.in { ig.mu.Lock() igname := ig.gw.name ig.mu.Unlock() if igname == name { return true } } return false } // Called when a gateway connection is either accepted or solicited. // If accepted, the gateway is marked as inbound. // If solicited, the gateway is marked as outbound. func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { // Snapshot server options. opts := s.getOpts() now := time.Now() c := &client{srv: s, nc: conn, start: now, last: now, kind: GATEWAY} // Are we creating the gateway based on the configuration solicit := cfg != nil var tlsRequired bool s.gateway.RLock() infoJSON := s.gateway.infoJSON s.gateway.RUnlock() // Perform some initialization under the client lock c.mu.Lock() c.initClient() c.gw = &gateway{} if solicit { // This is an outbound gateway connection cfg.RLock() tlsRequired = cfg.TLSConfig != nil cfgName := cfg.Name cfg.RUnlock() c.gw.outbound = true c.gw.name = cfgName c.gw.cfg = cfg cfg.bumpConnAttempts() // Since we are delaying the connect until after receiving // the remote's INFO protocol, save the URL we need to connect to. c.gw.connectURL = url c.Noticef("Creating outbound gateway connection to %q", cfgName) } else { c.flags.set(expectConnect) // Inbound gateway connection c.Noticef("Processing inbound gateway connection") // Check if TLS is required for inbound GW connections. tlsRequired = opts.Gateway.TLSConfig != nil // We expect a CONNECT from the accepted connection. c.setAuthTimer(secondsToDuration(opts.Gateway.AuthTimeout)) } // Check for TLS if tlsRequired { var tlsConfig *tls.Config var tlsName string var timeout float64 if solicit { cfg.RLock() tlsName = cfg.tlsName tlsConfig = cfg.TLSConfig.Clone() timeout = cfg.TLSTimeout cfg.RUnlock() } else { tlsConfig = opts.Gateway.TLSConfig timeout = opts.Gateway.TLSTimeout } // Perform (either server or client side) TLS handshake. if resetTLSName, err := c.doTLSHandshake("gateway", solicit, url, tlsConfig, tlsName, timeout, opts.Gateway.TLSPinnedCerts); err != nil { if resetTLSName { cfg.Lock() cfg.tlsName = _EMPTY_ cfg.Unlock() } c.mu.Unlock() return } } // Do final client initialization c.in.pacache = make(map[string]*perAccountCache) if solicit { // This is an outbound gateway connection c.gw.outsim = &sync.Map{} } else { // Inbound gateway connection c.gw.insim = make(map[string]*insie) } // Register in temp map for now until gateway properly registered // in out or in gateways. if !s.addToTempClients(c.cid, c) { c.mu.Unlock() c.closeConnection(ServerShutdown) return } // Only send if we accept a connection. Will send CONNECT+INFO as an // outbound only after processing peer's INFO protocol. if !solicit { c.enqueueProto(infoJSON) } // Spin up the read loop. s.startGoRoutine(func() { c.readLoop(nil) }) // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) if tlsRequired { c.Debugf("TLS handshake complete") cs := c.nc.(*tls.Conn).ConnectionState() c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite)) } c.mu.Unlock() // Announce ourselves again to new connections. if solicit && s.EventsEnabled() { s.sendStatszUpdate() } } // Builds and sends the CONNECT protocol for a gateway. // Client lock held on entry. func (c *client) sendGatewayConnect(opts *Options) { tlsRequired := c.gw.cfg.TLSConfig != nil url := c.gw.connectURL c.gw.connectURL = nil var user, pass string if userInfo := url.User; userInfo != nil { user = userInfo.Username() pass, _ = userInfo.Password() } else if opts != nil { user = opts.Gateway.Username pass = opts.Gateway.Password } cinfo := connectInfo{ Verbose: false, Pedantic: false, User: user, Pass: pass, TLS: tlsRequired, Name: c.srv.info.ID, Gateway: c.srv.gateway.name, } b, err := json.Marshal(cinfo) if err != nil { panic(err) } c.enqueueProto([]byte(fmt.Sprintf(ConProto, b))) } // Process the CONNECT protocol from a gateway connection. // Returns an error to the connection if the CONNECT is not from a gateway // (for instance a client or route connecting to the gateway port), or // if the destination does not match the gateway name of this server. // // func (c *client) processGatewayConnect(arg []byte) error { connect := &connectInfo{} if err := json.Unmarshal(arg, connect); err != nil { return err } // Coming from a client or a route, reject if connect.Gateway == "" { c.sendErrAndErr(ErrClientOrRouteConnectedToGatewayPort.Error()) c.closeConnection(WrongPort) return ErrClientOrRouteConnectedToGatewayPort } c.mu.Lock() s := c.srv c.mu.Unlock() // If we reject unknown gateways, make sure we have it configured, // otherwise return an error. if s.gateway.rejectUnknown() && s.getRemoteGateway(connect.Gateway) == nil { c.Errorf("Rejecting connection from gateway %q", connect.Gateway) c.sendErr(fmt.Sprintf("Connection to gateway %q rejected", s.getGatewayName())) c.closeConnection(WrongGateway) return ErrWrongGateway } c.mu.Lock() c.gw.connected = true // Set the Ping timer after sending connect and info. c.setFirstPingTimer() c.mu.Unlock() return nil } // Process the INFO protocol from a gateway connection. // // If the gateway connection is an outbound (this server initiated the connection), // this function checks that the incoming INFO contains the Gateway field. If empty, // it means that this is a response from an older server or that this server connected // to the wrong port. // The outbound gateway may also receive a gossip INFO protocol from the remote gateway, // indicating other gateways that the remote knows about. This server will try to connect // to those gateways (if not explicitly configured or already implicitly connected). // In both cases (explicit or implicit), the local cluster is notified about the existence // of this new gateway. This allows servers in the cluster to ensure that they have an // outbound connection to this gateway. // // For an inbound gateway, the gateway is simply registered and the info protocol // is saved to be used after processing the CONNECT. // // func (c *client) processGatewayInfo(info *Info) { var ( gwName string cfg *gatewayCfg ) c.mu.Lock() s := c.srv cid := c.cid // Check if this is the first INFO. (this call sets the flag if not already set). isFirstINFO := c.flags.setIfNotSet(infoReceived) isOutbound := c.gw.outbound if isOutbound { gwName = c.gw.name cfg = c.gw.cfg } else if isFirstINFO { c.gw.name = info.Gateway } if isFirstINFO { c.opts.Name = info.ID } c.mu.Unlock() // For an outbound connection... if isOutbound { // Check content of INFO for fields indicating that it comes from a gateway. // If we incorrectly connect to the wrong port (client or route), we won't // have the Gateway field set. if info.Gateway == "" { c.sendErrAndErr(fmt.Sprintf("Attempt to connect to gateway %q using wrong port", gwName)) c.closeConnection(WrongPort) return } // Check that the gateway name we got is what we expect if info.Gateway != gwName { // Unless this is the very first INFO, it may be ok if this is // a gossip request to connect to other gateways. if !isFirstINFO && info.GatewayCmd == gatewayCmdGossip { // If we are configured to reject unknown, do not attempt to // connect to one that we don't have configured. if s.gateway.rejectUnknown() && s.getRemoteGateway(info.Gateway) == nil { return } s.processImplicitGateway(info) return } // Otherwise, this is a failure... // We are reporting this error in the log... c.Errorf("Failing connection to gateway %q, remote gateway name is %q", gwName, info.Gateway) // ...and sending this back to the remote so that the error // makes more sense in the remote server's log. c.sendErr(fmt.Sprintf("Connection from %q rejected, wanted to connect to %q, this is %q", s.getGatewayName(), gwName, info.Gateway)) c.closeConnection(WrongGateway) return } // Check for duplicate server name with servers in our cluster if s.isDuplicateServerName(info.Name) { c.Errorf("Remote server has a duplicate name: %q", info.Name) c.closeConnection(DuplicateServerName) return } // Possibly add URLs that we get from the INFO protocol. if len(info.GatewayURLs) > 0 { cfg.updateURLs(info.GatewayURLs) } // If this is the first INFO, send our connect if isFirstINFO { s.gateway.RLock() infoJSON := s.gateway.infoJSON s.gateway.RUnlock() supportsHeaders := s.supportsHeaders() opts := s.getOpts() // Note, if we want to support NKeys, then we would get the nonce // from this INFO protocol and can sign it in the CONNECT we are // going to send now. c.mu.Lock() c.gw.interestOnlyMode = info.GatewayIOM c.sendGatewayConnect(opts) c.Debugf("Gateway connect protocol sent to %q", gwName) // Send INFO too c.enqueueProto(infoJSON) c.gw.useOldPrefix = !info.GatewayNRP c.headers = supportsHeaders && info.Headers c.mu.Unlock() // Register as an outbound gateway.. if we had a protocol to ack our connect, // then we should do that when process that ack. if s.registerOutboundGatewayConnection(gwName, c) { c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID) // Now that the outbound gateway is registered, we can remove from temp map. s.removeFromTempClients(cid) // Set the Ping timer after sending connect and info. c.mu.Lock() c.setFirstPingTimer() c.mu.Unlock() } else { // There was a bug that would cause a connection to possibly // be called twice resulting in reconnection of twice the // same outbound connection. The issue is fixed, but adding // defensive code above that if we did not register this connection // because we already have an outbound for this name, then // close this connection (and make sure it does not try to reconnect) c.mu.Lock() c.flags.set(noReconnect) c.mu.Unlock() c.closeConnection(WrongGateway) return } } else if info.GatewayCmd > 0 { switch info.GatewayCmd { case gatewayCmdAllSubsStart: c.gatewayAllSubsReceiveStart(info) return case gatewayCmdAllSubsComplete: c.gatewayAllSubsReceiveComplete(info) return default: s.Warnf("Received unknown command %v from gateway %q", info.GatewayCmd, gwName) return } } // Flood local cluster with information about this gateway. // Servers in this cluster will ensure that they have (or otherwise create) // an outbound connection to this gateway. s.forwardNewGatewayToLocalCluster(info) } else if isFirstINFO { // This is the first INFO of an inbound connection... // Check for duplicate server name with servers in our cluster if s.isDuplicateServerName(info.Name) { c.Errorf("Remote server has a duplicate name: %q", info.Name) c.closeConnection(DuplicateServerName) return } s.registerInboundGatewayConnection(cid, c) c.Noticef("Inbound gateway connection from %q (%s) registered", info.Gateway, info.ID) // Now that it is registered, we can remove from temp map. s.removeFromTempClients(cid) // Send our QSubs. s.sendQueueSubsToGateway(c) // Initiate outbound connection. This function will behave correctly if // we have already one. s.processImplicitGateway(info) // Send back to the server that initiated this gateway connection the // list of all remote gateways known on this server. s.gossipGatewaysToInboundGateway(info.Gateway, c) // Now make sure if we have any knowledge of connected leafnodes that we resend the // connect events to switch those accounts into interest only mode. s.mu.Lock() s.ensureGWsInterestOnlyForLeafNodes() s.mu.Unlock() js := s.js.Load() // If running in some tests, maintain the original behavior. if gwDoNotForceInterestOnlyMode && js != nil { // Switch JetStream accounts to interest-only mode. var accounts []string js.mu.Lock() if len(js.accounts) > 0 { accounts = make([]string, 0, len(js.accounts)) for accName := range js.accounts { accounts = append(accounts, accName) } } js.mu.Unlock() for _, accName := range accounts { if acc, err := s.LookupAccount(accName); err == nil && acc != nil { if acc.JetStreamEnabled() { s.switchAccountToInterestMode(acc.GetName()) } } } } else if !gwDoNotForceInterestOnlyMode { // Starting 2.9.0, we are phasing out the optimistic mode, so change // all accounts to interest-only mode, unless instructed not to do so // in some tests. s.accounts.Range(func(_, v interface{}) bool { acc := v.(*Account) s.switchAccountToInterestMode(acc.GetName()) return true }) } } } // Sends to the given inbound gateway connection a gossip INFO protocol // for each gateway known by this server. This allows for a "full mesh" // of gateways. func (s *Server) gossipGatewaysToInboundGateway(gwName string, c *client) { gw := s.gateway gw.RLock() defer gw.RUnlock() for gwCfgName, cfg := range gw.remotes { // Skip the gateway that we just created if gwCfgName == gwName { continue } info := Info{ ID: s.info.ID, GatewayCmd: gatewayCmdGossip, } urls := cfg.getURLsAsStrings() if len(urls) > 0 { info.Gateway = gwCfgName info.GatewayURLs = urls b, _ := json.Marshal(&info) c.mu.Lock() c.enqueueProto([]byte(fmt.Sprintf(InfoProto, b))) c.mu.Unlock() } } } // Sends the INFO protocol of a gateway to all routes known by this server. func (s *Server) forwardNewGatewayToLocalCluster(oinfo *Info) { // Need to protect s.routes here, so use server's lock s.mu.Lock() defer s.mu.Unlock() // We don't really need the ID to be set, but, we need to make sure // that it is not set to the server ID so that if we were to connect // to an older server that does not expect a "gateway" INFO, it // would think that it needs to create an implicit route (since info.ID // would not match the route's remoteID), but will fail to do so because // the sent protocol will not have host/port defined. info := &Info{ ID: "GW" + s.info.ID, Name: s.getOpts().ServerName, Gateway: oinfo.Gateway, GatewayURLs: oinfo.GatewayURLs, GatewayCmd: gatewayCmdGossip, } b, _ := json.Marshal(info) infoJSON := []byte(fmt.Sprintf(InfoProto, b)) s.forEachRemote(func(r *client) { r.mu.Lock() r.enqueueProto(infoJSON) r.mu.Unlock() }) } // Sends queue subscriptions interest to remote gateway. // This is sent from the inbound side, that is, the side that receives // messages from the remote's outbound connection. This side is // the one sending the subscription interest. func (s *Server) sendQueueSubsToGateway(c *client) { s.sendSubsToGateway(c, nil) } // Sends all subscriptions for the given account to the remove gateway // This is sent from the inbound side, that is, the side that receives // messages from the remote's outbound connection. This side is // the one sending the subscription interest. func (s *Server) sendAccountSubsToGateway(c *client, accName []byte) { s.sendSubsToGateway(c, accName) } func gwBuildSubProto(buf *bytes.Buffer, accName []byte, acc map[string]*sitally, doQueues bool) { for saq, si := range acc { if doQueues && si.q || !doQueues && !si.q { buf.Write(rSubBytes) buf.Write(accName) buf.WriteByte(' ') // For queue subs (si.q is true), saq will be // subject + ' ' + queue, for plain subs, this is // just the subject. buf.WriteString(saq) if doQueues { buf.WriteString(" 1") } buf.WriteString(CR_LF) } } } // Sends subscriptions to remote gateway. func (s *Server) sendSubsToGateway(c *client, accountName []byte) { var ( bufa = [32 * 1024]byte{} bbuf = bytes.NewBuffer(bufa[:0]) ) gw := s.gateway // This needs to run under this lock for the whole duration gw.pasi.Lock() defer gw.pasi.Unlock() // If account is specified... if accountName != nil { // Simply send all plain subs (no queues) for this specific account gwBuildSubProto(bbuf, accountName, gw.pasi.m[string(accountName)], false) // Instruct to send all subs (RS+/-) for this account from now on. c.mu.Lock() e := c.gw.insim[string(accountName)] if e == nil { e = &insie{} c.gw.insim[string(accountName)] = e } e.mode = InterestOnly c.mu.Unlock() } else { // Send queues for all accounts for accName, acc := range gw.pasi.m { gwBuildSubProto(bbuf, []byte(accName), acc, true) } } buf := bbuf.Bytes() // Nothing to send. if len(buf) == 0 { return } if len(buf) > cap(bufa) { s.Debugf("Sending subscriptions to %q, buffer size: %v", c.gw.name, len(buf)) } // Send c.mu.Lock() c.enqueueProto(buf) c.Debugf("Sent queue subscriptions to gateway") c.mu.Unlock() } // This is invoked when getting an INFO protocol for gateway on the ROUTER port. // This function will then execute appropriate function based on the command // contained in the protocol. // func (s *Server) processGatewayInfoFromRoute(info *Info, routeSrvID string, route *client) { switch info.GatewayCmd { case gatewayCmdGossip: s.processImplicitGateway(info) default: s.Errorf("Unknown command %d from server %v", info.GatewayCmd, routeSrvID) } } // Sends INFO protocols to the given route connection for each known Gateway. // These will be processed by the route and delegated to the gateway code to // invoke processImplicitGateway. func (s *Server) sendGatewayConfigsToRoute(route *client) { gw := s.gateway gw.RLock() // Send only to gateways for which we have actual outbound connection to. if len(gw.out) == 0 { gw.RUnlock() return } // Collect gateway configs for which we have an outbound connection. gwCfgsa := [16]*gatewayCfg{} gwCfgs := gwCfgsa[:0] for _, c := range gw.out { c.mu.Lock() if c.gw.cfg != nil { gwCfgs = append(gwCfgs, c.gw.cfg) } c.mu.Unlock() } gw.RUnlock() if len(gwCfgs) == 0 { return } // Check forwardNewGatewayToLocalCluster() as to why we set ID this way. info := Info{ ID: "GW" + s.info.ID, GatewayCmd: gatewayCmdGossip, } for _, cfg := range gwCfgs { urls := cfg.getURLsAsStrings() if len(urls) > 0 { info.Gateway = cfg.Name info.GatewayURLs = urls b, _ := json.Marshal(&info) route.mu.Lock() route.enqueueProto([]byte(fmt.Sprintf(InfoProto, b))) route.mu.Unlock() } } } // Initiates a gateway connection using the info contained in the INFO protocol. // If a gateway with the same name is already registered (either because explicitly // configured, or already implicitly connected), this function will augmment the // remote URLs with URLs present in the info protocol and return. // Otherwise, this function will register this remote (to prevent multiple connections // to the same remote) and call solicitGateway (which will run in a different go-routine). func (s *Server) processImplicitGateway(info *Info) { s.gateway.Lock() defer s.gateway.Unlock() // Name of the gateway to connect to is the Info.Gateway field. gwName := info.Gateway // If this is our name, bail. if gwName == s.gateway.name { return } // Check if we already have this config, and if so, we are done cfg := s.gateway.remotes[gwName] if cfg != nil { // However, possibly augment the list of URLs with the given // info.GatewayURLs content. cfg.Lock() cfg.addURLs(info.GatewayURLs) cfg.Unlock() return } opts := s.getOpts() cfg = &gatewayCfg{ RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName}, hash: getGWHash(gwName), oldHash: getOldHash(gwName), urls: make(map[string]*url.URL, len(info.GatewayURLs)), implicit: true, } if opts.Gateway.TLSConfig != nil { cfg.TLSConfig = opts.Gateway.TLSConfig.Clone() cfg.TLSTimeout = opts.Gateway.TLSTimeout } // Since we know we don't have URLs (no config, so just based on what we // get from INFO), directly call addURLs(). We don't need locking since // we just created that structure and no one else has access to it yet. cfg.addURLs(info.GatewayURLs) // If there is no URL, we can't proceed. if len(cfg.urls) == 0 { return } s.gateway.remotes[gwName] = cfg s.startGoRoutine(func() { s.solicitGateway(cfg, true) s.grWG.Done() }) } // NumOutboundGateways is public here mostly for testing. func (s *Server) NumOutboundGateways() int { return s.numOutboundGateways() } // Returns the number of outbound gateway connections func (s *Server) numOutboundGateways() int { s.gateway.RLock() n := len(s.gateway.out) s.gateway.RUnlock() return n } // Returns the number of inbound gateway connections func (s *Server) numInboundGateways() int { s.gateway.RLock() n := len(s.gateway.in) s.gateway.RUnlock() return n } // Returns the remoteGateway (if any) that has the given `name` func (s *Server) getRemoteGateway(name string) *gatewayCfg { s.gateway.RLock() cfg := s.gateway.remotes[name] s.gateway.RUnlock() return cfg } // Used in tests func (g *gatewayCfg) bumpConnAttempts() { g.Lock() g.connAttempts++ g.Unlock() } // Used in tests func (g *gatewayCfg) getConnAttempts() int { g.Lock() ca := g.connAttempts g.Unlock() return ca } // Used in tests func (g *gatewayCfg) resetConnAttempts() { g.Lock() g.connAttempts = 0 g.Unlock() } // Returns if this remote gateway is implicit or not. func (g *gatewayCfg) isImplicit() bool { g.RLock() ii := g.implicit g.RUnlock() return ii } // getURLs returns an array of URLs in random order suitable for // an iteration to try to connect. func (g *gatewayCfg) getURLs() []*url.URL { g.RLock() a := make([]*url.URL, 0, len(g.urls)) for _, u := range g.urls { a = append(a, u) } g.RUnlock() // Map iteration is random, but not that good with small maps. rand.Shuffle(len(a), func(i, j int) { a[i], a[j] = a[j], a[i] }) return a } // Similar to getURLs but returns the urls as an array of strings. func (g *gatewayCfg) getURLsAsStrings() []string { g.RLock() a := make([]string, 0, len(g.urls)) for _, u := range g.urls { a = append(a, u.Host) } g.RUnlock() return a } // updateURLs creates the urls map with the content of the config's URLs array // and the given array that we get from the INFO protocol. func (g *gatewayCfg) updateURLs(infoURLs []string) { g.Lock() // Clear the map... g.urls = make(map[string]*url.URL, len(g.URLs)+len(infoURLs)) // Add the urls from the config URLs array. for _, u := range g.URLs { g.urls[u.Host] = u } // Then add the ones from the infoURLs array we got. g.addURLs(infoURLs) // The call above will set varzUpdateURLs only when finding ULRs in infoURLs // that are not present in the config. That does not cover the case where // previously "discovered" URLs are now gone. We could check "before" size // of g.urls and if bigger than current size, set the boolean to true. // Not worth it... simply set this to true to allow a refresh of gateway // URLs in varz. g.varzUpdateURLs = true g.Unlock() } // Saves the hostname of the given URL (if not already done). // This may be used as the ServerName of the TLSConfig when initiating a // TLS connection. // Write lock held on entry. func (g *gatewayCfg) saveTLSHostname(u *url.URL) { if g.TLSConfig != nil && g.tlsName == "" && net.ParseIP(u.Hostname()) == nil { g.tlsName = u.Hostname() } } // add URLs from the given array to the urls map only if not already present. // remoteGateway write lock is assumed to be held on entry. // Write lock is held on entry. func (g *gatewayCfg) addURLs(infoURLs []string) { var scheme string if g.TLSConfig != nil { scheme = "tls" } else { scheme = "nats" } for _, iu := range infoURLs { if _, present := g.urls[iu]; !present { // Urls in Info.GatewayURLs come without scheme. Add it to parse // the url (otherwise it fails). if u, err := url.Parse(fmt.Sprintf("%s://%s", scheme, iu)); err == nil { // Also, if a tlsName has not been set yet and we are dealing // with a hostname and not a bare IP, save the hostname. g.saveTLSHostname(u) // Use u.Host for the key. g.urls[u.Host] = u // Signal that we have updated the list. Used by monitoring code. g.varzUpdateURLs = true } } } } // Adds this URL to the set of Gateway URLs. // Returns true if the URL has been added, false otherwise. // Server lock held on entry func (s *Server) addGatewayURL(urlStr string) bool { s.gateway.Lock() added := s.gateway.URLs.addUrl(urlStr) if added { s.gateway.generateInfoJSON() } s.gateway.Unlock() return added } // Removes this URL from the set of gateway URLs. // Returns true if the URL has been removed, false otherwise. // Server lock held on entry func (s *Server) removeGatewayURL(urlStr string) bool { if s.isShuttingDown() { return false } s.gateway.Lock() removed := s.gateway.URLs.removeUrl(urlStr) if removed { s.gateway.generateInfoJSON() } s.gateway.Unlock() return removed } // Sends a Gateway's INFO to all inbound GW connections. // Server lock is held on entry func (s *Server) sendAsyncGatewayInfo() { s.gateway.RLock() for _, ig := range s.gateway.in { ig.mu.Lock() ig.enqueueProto(s.gateway.infoJSON) ig.mu.Unlock() } s.gateway.RUnlock() } // This returns the URL of the Gateway listen spec, or empty string // if the server has no gateway configured. func (s *Server) getGatewayURL() string { s.gateway.RLock() url := s.gateway.URL s.gateway.RUnlock() return url } // Returns this server gateway name. // Same than calling s.gateway.getName() func (s *Server) getGatewayName() string { // This is immutable return s.gateway.name } // All gateway connections (outbound and inbound) are put in the given map. func (s *Server) getAllGatewayConnections(conns map[uint64]*client) { gw := s.gateway gw.RLock() for _, c := range gw.out { c.mu.Lock() cid := c.cid c.mu.Unlock() conns[cid] = c } for cid, c := range gw.in { conns[cid] = c } gw.RUnlock() } // Register the given gateway connection (*client) in the inbound gateways // map. The key is the connection ID (like for clients and routes). func (s *Server) registerInboundGatewayConnection(cid uint64, gwc *client) { s.gateway.Lock() s.gateway.in[cid] = gwc s.gateway.Unlock() } // Register the given gateway connection (*client) in the outbound gateways // map with the given name as the key. func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) bool { s.gateway.Lock() if _, exist := s.gateway.out[name]; exist { s.gateway.Unlock() return false } s.gateway.out[name] = gwc s.gateway.outo = append(s.gateway.outo, gwc) s.gateway.orderOutboundConnectionsLocked() s.gateway.Unlock() return true } // Returns the outbound gateway connection (*client) with the given name, // or nil if not found func (s *Server) getOutboundGatewayConnection(name string) *client { s.gateway.RLock() gwc := s.gateway.out[name] s.gateway.RUnlock() return gwc } // Returns all outbound gateway connections in the provided array. // The order of the gateways is suited for the sending of a message. // Current ordering is based on individual gateway's RTT value. func (s *Server) getOutboundGatewayConnections(a *[]*client) { s.gateway.RLock() for i := 0; i < len(s.gateway.outo); i++ { *a = append(*a, s.gateway.outo[i]) } s.gateway.RUnlock() } // Orders the array of outbound connections. // Current ordering is by lowest RTT. // Gateway write lock is held on entry func (g *srvGateway) orderOutboundConnectionsLocked() { // Order the gateways by lowest RTT sort.Slice(g.outo, func(i, j int) bool { return g.outo[i].getRTTValue() < g.outo[j].getRTTValue() }) } // Orders the array of outbound connections. // Current ordering is by lowest RTT. func (g *srvGateway) orderOutboundConnections() { g.Lock() g.orderOutboundConnectionsLocked() g.Unlock() } // Returns all inbound gateway connections in the provided array func (s *Server) getInboundGatewayConnections(a *[]*client) { s.gateway.RLock() for _, gwc := range s.gateway.in { *a = append(*a, gwc) } s.gateway.RUnlock() } // This is invoked when a gateway connection is closed and the server // is removing this connection from its state. func (s *Server) removeRemoteGatewayConnection(c *client) { c.mu.Lock() cid := c.cid isOutbound := c.gw.outbound gwName := c.gw.name c.mu.Unlock() gw := s.gateway gw.Lock() if isOutbound { delete(gw.out, gwName) louto := len(gw.outo) reorder := false for i := 0; i < len(gw.outo); i++ { if gw.outo[i] == c { // If last, simply remove and no need to reorder if i != louto-1 { gw.outo[i] = gw.outo[louto-1] reorder = true } gw.outo = gw.outo[:louto-1] } } if reorder { gw.orderOutboundConnectionsLocked() } } else { delete(gw.in, cid) } gw.Unlock() s.removeFromTempClients(cid) if isOutbound { // Update number of totalQSubs for this gateway qSubsRemoved := int64(0) c.mu.Lock() for _, sub := range c.subs { if sub.queue != nil { qSubsRemoved++ } } c.mu.Unlock() // Update total count of qsubs in remote gateways. atomic.AddInt64(&c.srv.gateway.totalQSubs, -qSubsRemoved) } else { var subsa [1024]*subscription var subs = subsa[:0] // For inbound GW connection, if we have subs, those are // local subs on "_R_." subjects. c.mu.Lock() for _, sub := range c.subs { subs = append(subs, sub) } c.mu.Unlock() for _, sub := range subs { c.removeReplySub(sub) } } } // GatewayAddr returns the net.Addr object for the gateway listener. func (s *Server) GatewayAddr() *net.TCPAddr { s.mu.Lock() defer s.mu.Unlock() if s.gatewayListener == nil { return nil } return s.gatewayListener.Addr().(*net.TCPAddr) } // A- protocol received from the remote after sending messages // on an account that it has no interest in. Mark this account // with a "no interest" marker to prevent further messages send. // func (c *client) processGatewayAccountUnsub(accName string) { // Just to indicate activity around "subscriptions" events. c.in.subs++ // This account may have an entry because of queue subs. // If that's the case, we can reset the no-interest map, // but not set the entry to nil. setToNil := true if ei, ok := c.gw.outsim.Load(accName); ei != nil { e := ei.(*outsie) e.Lock() // Reset the no-interest map if we have queue subs // and don't set the entry to nil. if e.qsubs > 0 { e.ni = make(map[string]struct{}) setToNil = false } e.Unlock() } else if ok { // Already set to nil, so skip setToNil = false } if setToNil { c.gw.outsim.Store(accName, nil) } } // A+ protocol received from remote gateway if it had previously // sent an A-. Clear the "no interest" marker for this account. // func (c *client) processGatewayAccountSub(accName string) error { // Just to indicate activity around "subscriptions" events. c.in.subs++ // If this account has an entry because of queue subs, we // can't delete the entry. remove := true if ei, ok := c.gw.outsim.Load(accName); ei != nil { e := ei.(*outsie) e.Lock() if e.qsubs > 0 { remove = false } e.Unlock() } else if !ok { // There is no entry, so skip remove = false } if remove { c.gw.outsim.Delete(accName) } return nil } // RS- protocol received from the remote after sending messages // on a subject that it has no interest in (but knows about the // account). Mark this subject with a "no interest" marker to // prevent further messages being sent. // If in modeInterestOnly or for a queue sub, remove from // the sublist if present. // func (c *client) processGatewayRUnsub(arg []byte) error { accName, subject, queue, err := c.parseUnsubProto(arg) if err != nil { return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error()) } var ( e *outsie useSl bool newe bool callUpdate bool srv *Server sub *subscription ) // Possibly execute this on exit after all locks have been released. // If callUpdate is true, srv and sub will be not nil. defer func() { if callUpdate { srv.updateInterestForAccountOnGateway(accName, sub, -1) } }() c.mu.Lock() if c.gw.outsim == nil { c.Errorf("Received RS- from gateway on inbound connection") c.mu.Unlock() c.closeConnection(ProtocolViolation) return nil } defer c.mu.Unlock() ei, _ := c.gw.outsim.Load(accName) if ei != nil { e = ei.(*outsie) e.Lock() defer e.Unlock() // If there is an entry, for plain sub we need // to know if we should store the sub useSl = queue != nil || e.mode != Optimistic } else if queue != nil { // should not even happen... c.Debugf("Received RS- without prior RS+ for subject %q, queue %q", subject, queue) return nil } else { // Plain sub, assume optimistic sends, create entry. e = &outsie{ni: make(map[string]struct{}), sl: NewSublistWithCache()} newe = true } // This is when a sub or queue sub is supposed to be in // the sublist. Look for it and remove. if useSl { var ok bool key := arg // m[string()] does not cause mem allocation sub, ok = c.subs[string(key)] // if RS- for a sub that we don't have, just ignore. if !ok { return nil } if e.sl.Remove(sub) == nil { delete(c.subs, string(key)) if queue != nil { e.qsubs-- atomic.AddInt64(&c.srv.gateway.totalQSubs, -1) } // If last, we can remove the whole entry only // when in optimistic mode and there is no element // in the `ni` map. if e.sl.Count() == 0 && e.mode == Optimistic && len(e.ni) == 0 { c.gw.outsim.Delete(accName) } } // We are going to call updateInterestForAccountOnGateway on exit. srv = c.srv callUpdate = true } else { e.ni[string(subject)] = struct{}{} if newe { c.gw.outsim.Store(accName, e) } } return nil } // For plain subs, RS+ protocol received from remote gateway if it // had previously sent a RS-. Clear the "no interest" marker for // this subject (under this account). // For queue subs, or if in modeInterestOnly, register interest // from remote gateway. // func (c *client) processGatewayRSub(arg []byte) error { // Indicate activity. c.in.subs++ var ( queue []byte qw int32 ) args := splitArg(arg) switch len(args) { case 2: case 4: queue = args[2] qw = int32(parseSize(args[3])) default: return fmt.Errorf("processGatewaySubjectSub Parse Error: '%s'", arg) } accName := args[0] subject := args[1] var ( e *outsie useSl bool newe bool callUpdate bool srv *Server sub *subscription ) // Possibly execute this on exit after all locks have been released. // If callUpdate is true, srv and sub will be not nil. defer func() { if callUpdate { srv.updateInterestForAccountOnGateway(string(accName), sub, 1) } }() c.mu.Lock() if c.gw.outsim == nil { c.Errorf("Received RS+ from gateway on inbound connection") c.mu.Unlock() c.closeConnection(ProtocolViolation) return nil } defer c.mu.Unlock() ei, _ := c.gw.outsim.Load(string(accName)) // We should always have an existing entry for plain subs because // in optimistic mode we would have received RS- first, and // in full knowledge, we are receiving RS+ for an account after // getting many RS- from the remote.. if ei != nil { e = ei.(*outsie) e.Lock() defer e.Unlock() useSl = queue != nil || e.mode != Optimistic } else if queue == nil { return nil } else { e = &outsie{ni: make(map[string]struct{}), sl: NewSublistWithCache()} newe = true useSl = true } if useSl { var key []byte // We store remote subs by account/subject[/queue]. // For queue, remove the trailing weight if queue != nil { key = arg[:len(arg)-len(args[3])-1] } else { key = arg } // If RS+ for a sub that we already have, ignore. // (m[string()] does not allocate memory) if _, ok := c.subs[string(key)]; ok { return nil } // new subscription. copy subject (and queue) to // not reference the underlying possibly big buffer. var csubject []byte var cqueue []byte if queue != nil { // make single allocation and use different slices // to point to subject and queue name. cbuf := make([]byte, len(subject)+1+len(queue)) copy(cbuf, key[len(accName)+1:]) csubject = cbuf[:len(subject)] cqueue = cbuf[len(subject)+1:] } else { csubject = make([]byte, len(subject)) copy(csubject, subject) } sub = &subscription{client: c, subject: csubject, queue: cqueue, qw: qw} // If no error inserting in sublist... if e.sl.Insert(sub) == nil { c.subs[string(key)] = sub if queue != nil { e.qsubs++ atomic.AddInt64(&c.srv.gateway.totalQSubs, 1) } if newe { c.gw.outsim.Store(string(accName), e) } } // We are going to call updateInterestForAccountOnGateway on exit. srv = c.srv callUpdate = true } else { subj := string(subject) // If this is an RS+ for a wc subject, then // remove from the no interest map all subjects // that are a subset of this wc subject. if subjectHasWildcard(subj) { for k := range e.ni { if subjectIsSubsetMatch(k, subj) { delete(e.ni, k) } } } else { delete(e.ni, subj) } } return nil } // Returns true if this gateway has possible interest in the // given account/subject (which means, it does not have a registered // no-interest on the account and/or subject) and the sublist result // for queue subscriptions. // func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { ei, accountInMap := c.gw.outsim.Load(acc) // If there is an entry for this account and ei is nil, // it means that the remote is not interested at all in // this account and we could not possibly have queue subs. if accountInMap && ei == nil { return false, nil } // Assume interest if account not in map, unless we support // only interest-only mode. psi := !accountInMap && !c.gw.interestOnlyMode var r *SublistResult if accountInMap { // If in map, check for subs interest with sublist. e := ei.(*outsie) e.RLock() // Unless each side has agreed on interest-only mode, // we may be in transition to modeInterestOnly // but until e.ni is nil, use it to know if we // should suppress interest or not. if !c.gw.interestOnlyMode && e.ni != nil { if _, inMap := e.ni[subj]; !inMap { psi = true } } // If we are in modeInterestOnly (e.ni will be nil) // or if we have queue subs, we also need to check sl.Match. if e.ni == nil || e.qsubs > 0 { r = e.sl.Match(subj) if len(r.psubs) > 0 { psi = true } } e.RUnlock() // Since callers may just check if the sublist result is nil or not, // make sure that if what is returned by sl.Match() is the emptyResult, then // we return nil to the caller. if r == emptyResult { r = nil } } return psi, r } // switchAccountToInterestMode will switch an account over to interestMode. // Lock should NOT be held. func (s *Server) switchAccountToInterestMode(accName string) { gwsa := [16]*client{} gws := gwsa[:0] s.getInboundGatewayConnections(&gws) for _, gin := range gws { var e *insie var ok bool gin.mu.Lock() if e, ok = gin.gw.insim[accName]; !ok || e == nil { e = &insie{} gin.gw.insim[accName] = e } // Do it only if we are in Optimistic mode if e.mode == Optimistic { gin.gatewaySwitchAccountToSendAllSubs(e, accName) } gin.mu.Unlock() } } // This is invoked when registering (or unregistering) the first // (or last) subscription on a given account/subject. For each // GWs inbound connections, we will check if we need to send an RS+ or A+ // protocol. func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription, added bool) { if sub.queue != nil { return } gwsa := [16]*client{} gws := gwsa[:0] s.getInboundGatewayConnections(&gws) if len(gws) == 0 { return } var ( rsProtoa [512]byte rsProto []byte accProtoa [256]byte accProto []byte proto []byte subject = string(sub.subject) hasWc = subjectHasWildcard(subject) ) for _, c := range gws { proto = nil c.mu.Lock() e, inMap := c.gw.insim[accName] // If there is a inbound subject interest entry... if e != nil { sendProto := false // In optimistic mode, we care only about possibly sending RS+ (or A+) // so if e.ni is not nil we do things only when adding a new subscription. if e.ni != nil && added { // For wildcard subjects, we will remove from our no-interest // map, all subjects that are a subset of this wc subject, but we // still send the wc subject and let the remote do its own cleanup. if hasWc { for enis := range e.ni { if subjectIsSubsetMatch(enis, subject) { delete(e.ni, enis) sendProto = true } } } else if _, noInterest := e.ni[subject]; noInterest { delete(e.ni, subject) sendProto = true } } else if e.mode == InterestOnly { // We are in the mode where we always send RS+/- protocols. sendProto = true } if sendProto { if rsProto == nil { // Construct the RS+/- only once proto = rsProtoa[:0] if added { proto = append(proto, rSubBytes...) } else { proto = append(proto, rUnsubBytes...) } proto = append(proto, accName...) proto = append(proto, ' ') proto = append(proto, sub.subject...) proto = append(proto, CR_LF...) rsProto = proto } else { // Point to the already constructed RS+/- proto = rsProto } } } else if added && inMap { // Here, we have a `nil` entry for this account in // the map, which means that we have previously sent // an A-. We have a new subscription, so we need to // send an A+ and delete the entry from the map so // that we do this only once. delete(c.gw.insim, accName) if accProto == nil { // Construct the A+ only once proto = accProtoa[:0] proto = append(proto, aSubBytes...) proto = append(proto, accName...) proto = append(proto, CR_LF...) accProto = proto } else { // Point to the already constructed A+ proto = accProto } } if proto != nil { c.enqueueProto(proto) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } } c.mu.Unlock() } } // This is invoked when the first (or last) queue subscription on a // given subject/group is registered (or unregistered). Sent to all // inbound gateways. func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscription, added bool) { if qsub.queue == nil { return } gwsa := [16]*client{} gws := gwsa[:0] s.getInboundGatewayConnections(&gws) if len(gws) == 0 { return } var protoa [512]byte var proto []byte for _, c := range gws { if proto == nil { proto = protoa[:0] if added { proto = append(proto, rSubBytes...) } else { proto = append(proto, rUnsubBytes...) } proto = append(proto, accName...) proto = append(proto, ' ') proto = append(proto, qsub.subject...) proto = append(proto, ' ') proto = append(proto, qsub.queue...) if added { // For now, just use 1 for the weight proto = append(proto, ' ', '1') } proto = append(proto, CR_LF...) } c.mu.Lock() // If we add a queue sub, and we had previously sent an A-, // we don't need to send an A+ here, but we need to clear // the fact that we did sent the A- so that we don't send // an A+ when we will get the first non-queue sub registered. if added { if ei, ok := c.gw.insim[accName]; ok && ei == nil { delete(c.gw.insim, accName) } } c.enqueueProto(proto) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } c.mu.Unlock() } } // This is invoked when a subscription (plain or queue) is // added/removed locally or in our cluster. We use ref counting // to know when to update the inbound gateways. // func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, change int32) { if sub.si { return } var ( keya [1024]byte key = keya[:0] entry *sitally isNew bool ) s.gateway.pasi.Lock() defer s.gateway.pasi.Unlock() accMap := s.gateway.pasi.m // First see if we have the account st := accMap[accName] if st == nil { // Ignore remove of something we don't have if change < 0 { return } st = make(map[string]*sitally) accMap[accName] = st isNew = true } // Lookup: build the key as subject[+' '+queue] key = append(key, sub.subject...) if sub.queue != nil { key = append(key, ' ') key = append(key, sub.queue...) } if !isNew { entry = st[string(key)] } first := false last := false if entry == nil { // Ignore remove of something we don't have if change < 0 { return } entry = &sitally{n: 1, q: sub.queue != nil} st[string(key)] = entry first = true } else { entry.n += change if entry.n <= 0 { delete(st, string(key)) last = true if len(st) == 0 { delete(accMap, accName) } } } if sub.client != nil { rsubs := &s.gateway.rsubs acc := sub.client.acc sli, _ := rsubs.Load(acc) if change > 0 { var sl *Sublist if sli == nil { sl = NewSublistNoCache() rsubs.Store(acc, sl) } else { sl = sli.(*Sublist) } sl.Insert(sub) time.AfterFunc(s.gateway.recSubExp, func() { sl.Remove(sub) }) } else if sli != nil { sl := sli.(*Sublist) sl.Remove(sub) if sl.Count() == 0 { rsubs.Delete(acc) } } } if first || last { if entry.q { s.sendQueueSubOrUnsubToGateways(accName, sub, first) } else { s.maybeSendSubOrUnsubToGateways(accName, sub, first) } } } // Returns true if the given subject is a GW routed reply subject, // that is, starts with $GNR and is long enough to contain cluster/server hash // and subject. func isGWRoutedReply(subj []byte) bool { return len(subj) > gwSubjectOffset && string(subj[:gwReplyPrefixLen]) == gwReplyPrefix } // Same than isGWRoutedReply but accepts the old prefix $GR and returns // a boolean indicating if this is the old prefix func isGWRoutedSubjectAndIsOldPrefix(subj []byte) (bool, bool) { if isGWRoutedReply(subj) { return true, false } if len(subj) > oldGWReplyStart && string(subj[:oldGWReplyPrefixLen]) == oldGWReplyPrefix { return true, true } return false, false } // Returns true if subject starts with "$GNR.". This is to check that // clients can't publish on this subject. func hasGWRoutedReplyPrefix(subj []byte) bool { return len(subj) > gwReplyPrefixLen && string(subj[:gwReplyPrefixLen]) == gwReplyPrefix } // Evaluates if the given reply should be mapped or not. func (g *srvGateway) shouldMapReplyForGatewaySend(acc *Account, reply []byte) bool { // If for this account there is a recent matching subscription interest // then we will map. sli, _ := g.rsubs.Load(acc) if sli == nil { return false } sl := sli.(*Sublist) if sl.Count() > 0 { if r := sl.Match(string(reply)); len(r.psubs)+len(r.qsubs) > 0 { return true } } return false } var subPool = &sync.Pool{ New: func() interface{} { return &subscription{} }, } // May send a message to all outbound gateways. It is possible // that the message is not sent to a given gateway if for instance // it is known that this gateway has no interest in the account or // subject, etc.. // func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) bool { // We had some times when we were sending across a GW with no subject, and the other side would break // due to parser error. These need to be fixed upstream but also double check here. if len(subject) == 0 { return false } gwsa := [16]*client{} gws := gwsa[:0] // This is in fast path, so avoid calling functions when possible. // Get the outbound connections in place instead of calling // getOutboundGatewayConnections(). srv := c.srv gw := srv.gateway gw.RLock() for i := 0; i < len(gw.outo); i++ { gws = append(gws, gw.outo[i]) } thisClusterReplyPrefix := gw.replyPfx thisClusterOldReplyPrefix := gw.oldReplyPfx gw.RUnlock() if len(gws) == 0 { return false } var ( subj = string(subject) queuesa = [512]byte{} queues = queuesa[:0] accName = acc.Name mreplya [256]byte mreply []byte dstHash []byte checkReply = len(reply) > 0 didDeliver bool prodIsMQTT = c.isMqtt() dlvMsgs int64 ) // Get a subscription from the pool sub := subPool.Get().(*subscription) // Check if the subject is on the reply prefix, if so, we // need to send that message directly to the origin cluster. directSend, old := isGWRoutedSubjectAndIsOldPrefix(subject) if directSend { if old { dstHash = subject[oldGWReplyPrefixLen : oldGWReplyStart-1] } else { dstHash = subject[gwClusterOffset : gwClusterOffset+gwHashLen] } } for i := 0; i < len(gws); i++ { gwc := gws[i] if directSend { gwc.mu.Lock() var ok bool if gwc.gw.cfg != nil { if old { ok = bytes.Equal(dstHash, gwc.gw.cfg.oldHash) } else { ok = bytes.Equal(dstHash, gwc.gw.cfg.hash) } } gwc.mu.Unlock() if !ok { continue } } else { // Plain sub interest and queue sub results for this account/subject psi, qr := gwc.gatewayInterest(accName, subj) if !psi && qr == nil { continue } queues = queuesa[:0] if qr != nil { for i := 0; i < len(qr.qsubs); i++ { qsubs := qr.qsubs[i] if len(qsubs) > 0 { queue := qsubs[0].queue add := true for _, qn := range qgroups { if bytes.Equal(queue, qn) { add = false break } } if add { qgroups = append(qgroups, queue) queues = append(queues, queue...) queues = append(queues, ' ') } } } } if !psi && len(queues) == 0 { continue } } if checkReply { // Check/map only once checkReply = false // Assume we will use original mreply = reply // Decide if we should map. if gw.shouldMapReplyForGatewaySend(acc, reply) { mreply = mreplya[:0] gwc.mu.Lock() useOldPrefix := gwc.gw.useOldPrefix gwc.mu.Unlock() if useOldPrefix { mreply = append(mreply, thisClusterOldReplyPrefix...) } else { mreply = append(mreply, thisClusterReplyPrefix...) } mreply = append(mreply, reply...) } } // Setup the message header. // Make sure we are an 'R' proto by default c.msgb[0] = 'R' mh := c.msgb[:msgHeadProtoLen] mh = append(mh, accName...) mh = append(mh, ' ') mh = append(mh, subject...) mh = append(mh, ' ') if len(queues) > 0 { if reply != nil { mh = append(mh, "+ "...) // Signal that there is a reply. mh = append(mh, mreply...) mh = append(mh, ' ') } else { mh = append(mh, "| "...) // Only queues } mh = append(mh, queues...) } else if len(reply) > 0 { mh = append(mh, mreply...) mh = append(mh, ' ') } // Headers hasHeader := c.pa.hdr > 0 canReceiveHeader := gwc.headers if hasHeader { if canReceiveHeader { mh[0] = 'H' mh = append(mh, c.pa.hdb...) mh = append(mh, ' ') mh = append(mh, c.pa.szb...) } else { // If we are here we need to truncate the payload size nsz := strconv.Itoa(c.pa.size - c.pa.hdr) mh = append(mh, nsz...) } } else { mh = append(mh, c.pa.szb...) } mh = append(mh, CR_LF...) // We reuse the subscription object that we pass to deliverMsg. // So set/reset important fields. sub.nm, sub.max = 0, 0 sub.client = gwc sub.subject = subject if c.deliverMsg(prodIsMQTT, sub, acc, subject, mreply, mh, msg, false) { // We don't count internal deliveries so count only if sub.icb is nil if sub.icb == nil { dlvMsgs++ } didDeliver = true } } if dlvMsgs > 0 { totalBytes := dlvMsgs * int64(len(msg)) // For non MQTT producers, remove the CR_LF * number of messages if !prodIsMQTT { totalBytes -= dlvMsgs * int64(LEN_CR_LF) } if acc != nil { atomic.AddInt64(&acc.outMsgs, dlvMsgs) atomic.AddInt64(&acc.outBytes, totalBytes) } atomic.AddInt64(&srv.outMsgs, dlvMsgs) atomic.AddInt64(&srv.outBytes, totalBytes) } // Done with subscription, put back to pool. We don't need // to reset content since we explicitly set when using it. subPool.Put(sub) return didDeliver } // Possibly sends an A- to the remote gateway `c`. // Invoked when processing an inbound message and the account is not found. // A check under a lock that protects processing of SUBs and UNSUBs is // done to make sure that we don't send the A- if a subscription has just // been created at the same time, which would otherwise results in the // remote never sending messages on this account until a new subscription // is created. func (s *Server) gatewayHandleAccountNoInterest(c *client, accName []byte) { // Check and possibly send the A- under this lock. s.gateway.pasi.Lock() defer s.gateway.pasi.Unlock() si, inMap := s.gateway.pasi.m[string(accName)] if inMap && si != nil && len(si) > 0 { return } c.sendAccountUnsubToGateway(accName) } // Helper that sends an A- to this remote gateway if not already done. // This function should not be invoked directly but instead be invoked // by functions holding the gateway.pasi's Lock. func (c *client) sendAccountUnsubToGateway(accName []byte) { // Check if we have sent the A- or not. c.mu.Lock() e, sent := c.gw.insim[string(accName)] if e != nil || !sent { // Add a nil value to indicate that we have sent an A- // so that we know to send A+ when needed. c.gw.insim[string(accName)] = nil var protoa [256]byte proto := protoa[:0] proto = append(proto, aUnsubBytes...) proto = append(proto, accName...) proto = append(proto, CR_LF...) c.enqueueProto(proto) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } } c.mu.Unlock() } // Possibly sends an A- for this account or RS- for this subject. // Invoked when processing an inbound message and the account is found // but there is no interest on this subject. // A test is done under a lock that protects processing of SUBs and UNSUBs // and if there is no subscription at this time, we send an A-. If there // is at least a subscription, but no interest on this subject, we send // an RS- for this subject (if not already done). func (s *Server) gatewayHandleSubjectNoInterest(c *client, acc *Account, accName, subject []byte) { s.gateway.pasi.Lock() defer s.gateway.pasi.Unlock() // If there is no subscription for this account, we would normally // send an A-, however, if this account has the internal subscription // for service reply, send a specific RS- for the subject instead. // Need to grab the lock here since sublist can change during reload. acc.mu.RLock() hasSubs := acc.sl.Count() > 0 || acc.siReply != nil acc.mu.RUnlock() // If there is at least a subscription, possibly send RS- if hasSubs { sendProto := false c.mu.Lock() // Send an RS- protocol if not already done and only if // not in the modeInterestOnly. e := c.gw.insim[string(accName)] if e == nil { e = &insie{ni: make(map[string]struct{})} e.ni[string(subject)] = struct{}{} c.gw.insim[string(accName)] = e sendProto = true } else if e.ni != nil { // If we are not in modeInterestOnly, check if we // have already sent an RS- if _, alreadySent := e.ni[string(subject)]; !alreadySent { // TODO(ik): pick some threshold as to when // we need to switch mode if len(e.ni) >= gatewayMaxRUnsubBeforeSwitch { // If too many RS-, switch to all-subs-mode. c.gatewaySwitchAccountToSendAllSubs(e, string(accName)) } else { e.ni[string(subject)] = struct{}{} sendProto = true } } } if sendProto { var ( protoa = [512]byte{} proto = protoa[:0] ) proto = append(proto, rUnsubBytes...) proto = append(proto, accName...) proto = append(proto, ' ') proto = append(proto, subject...) proto = append(proto, CR_LF...) c.enqueueProto(proto) if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } } c.mu.Unlock() } else { // There is not a single subscription, send an A- (if not already done). c.sendAccountUnsubToGateway([]byte(acc.Name)) } } // Returns the cluster hash from the gateway reply prefix func (g *srvGateway) getClusterHash() []byte { g.RLock() clusterHash := g.replyPfx[gwClusterOffset : gwClusterOffset+gwHashLen] g.RUnlock() return clusterHash } // Store this route in map with the key being the remote server's name hash // and the remote server's ID hash used by gateway replies mapping routing. func (s *Server) storeRouteByHash(srvIDHash string, c *client) { if !s.gateway.enabled { return } s.gateway.routesIDByHash.Store(srvIDHash, c) } // Remove the route with the given keys from the map. func (s *Server) removeRouteByHash(srvIDHash string) { if !s.gateway.enabled { return } s.gateway.routesIDByHash.Delete(srvIDHash) } // Returns the route with given hash or nil if not found. // This is for gateways only. func (s *Server) getRouteByHash(hash, accName []byte) (*client, bool) { id := string(hash) var perAccount bool if v, ok := s.accRouteByHash.Load(string(accName)); ok { if v == nil { id += string(accName) perAccount = true } else { id += strconv.Itoa(v.(int)) } } if v, ok := s.gateway.routesIDByHash.Load(id); ok { return v.(*client), perAccount } else if !perAccount { // Check if we have a "no pool" connection at index 0. if v, ok := s.gateway.routesIDByHash.Load(string(hash) + "0"); ok { if r := v.(*client); r != nil { r.mu.Lock() noPool := r.route.noPool r.mu.Unlock() if noPool { return r, false } } } } return nil, perAccount } // Returns the subject from the routed reply func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte { if isOldPrefix { return reply[oldGWReplyStart:] } return reply[gwSubjectOffset:] } // This should be invoked only from processInboundGatewayMsg() or // processInboundRoutedMsg() and is checking if the subject // (c.pa.subject) has the _GR_ prefix. If so, this is processed // as a GW reply and `true` is returned to indicate to the caller // that it should stop processing. // If gateway is not enabled on this server or if the subject // does not start with _GR_, `false` is returned and caller should // process message as usual. func (c *client) handleGatewayReply(msg []byte) (processed bool) { // Do not handle GW prefixed messages if this server does not have // gateway enabled or if the subject does not start with the previx. if !c.srv.gateway.enabled { return false } isGWPrefix, oldPrefix := isGWRoutedSubjectAndIsOldPrefix(c.pa.subject) if !isGWPrefix { return false } // Save original subject (in case we have to forward) orgSubject := c.pa.subject var clusterHash []byte var srvHash []byte var subject []byte if oldPrefix { clusterHash = c.pa.subject[oldGWReplyPrefixLen : oldGWReplyStart-1] // Check if this reply is intended for our cluster. if !bytes.Equal(clusterHash, c.srv.gateway.oldHash) { // We could report, for now, just drop. return true } subject = c.pa.subject[oldGWReplyStart:] } else { clusterHash = c.pa.subject[gwClusterOffset : gwClusterOffset+gwHashLen] // Check if this reply is intended for our cluster. if !bytes.Equal(clusterHash, c.srv.gateway.getClusterHash()) { // We could report, for now, just drop. return true } srvHash = c.pa.subject[gwServerOffset : gwServerOffset+gwHashLen] subject = c.pa.subject[gwSubjectOffset:] } var route *client var perAccount bool // If the origin is not this server, get the route this should be sent to. if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.gateway.sIDHash) { route, perAccount = c.srv.getRouteByHash(srvHash, c.pa.account) // This will be possibly nil, and in this case we will try to process // the interest from this server. } // Adjust the subject c.pa.subject = subject // Use a stack buffer to rewrite c.pa.cache since we only need it for // getAccAndResultFromCache() var _pacache [256]byte pacache := _pacache[:0] // For routes that are dedicated to an account, do not put the account // name in the pacache. if c.kind == GATEWAY || (c.kind == ROUTER && c.route != nil && len(c.route.accName) == 0) { pacache = append(pacache, c.pa.account...) pacache = append(pacache, ' ') } pacache = append(pacache, c.pa.subject...) c.pa.pacache = pacache acc, r := c.getAccAndResultFromCache() if acc == nil { typeConn := "routed" if c.kind == GATEWAY { typeConn = "gateway" } c.Debugf("Unknown account %q for %s message on subject: %q", c.pa.account, typeConn, c.pa.subject) if c.kind == GATEWAY { c.srv.gatewayHandleAccountNoInterest(c, c.pa.account) } return true } // If route is nil, we will process the incoming message locally. if route == nil { // Check if this is a service reply subject (_R_) isServiceReply := isServiceReply(c.pa.subject) var queues [][]byte if len(r.psubs)+len(r.qsubs) > 0 { flags := pmrCollectQueueNames | pmrIgnoreEmptyQueueFilter // If this message came from a ROUTE, allow to pick queue subs // only if the message was directly sent by the "gateway" server // in our cluster that received it. if c.kind == ROUTER { flags |= pmrAllowSendFromRouteToRoute } _, queues = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flags) } // Since this was a reply that made it to the origin cluster, // we now need to send the message with the real subject to // gateways in case they have interest on that reply subject. if !isServiceReply { c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues) } } else if c.kind == GATEWAY { // Only if we are a gateway connection should we try to route // to the server where the request originated. var bufa [256]byte var buf = bufa[:0] buf = append(buf, msgHeadProto...) if !perAccount { buf = append(buf, acc.Name...) buf = append(buf, ' ') } buf = append(buf, orgSubject...) buf = append(buf, ' ') if len(c.pa.reply) > 0 { buf = append(buf, c.pa.reply...) buf = append(buf, ' ') } szb := c.pa.szb if c.pa.hdr >= 0 { if route.headers { buf[0] = 'H' buf = append(buf, c.pa.hdb...) buf = append(buf, ' ') } else { szb = []byte(strconv.Itoa(c.pa.size - c.pa.hdr)) msg = msg[c.pa.hdr:] } } buf = append(buf, szb...) mhEnd := len(buf) buf = append(buf, _CRLF_...) buf = append(buf, msg...) route.mu.Lock() route.enqueueProto(buf) if route.trace { route.traceOutOp("", buf[:mhEnd]) } route.mu.Unlock() } return true } // Process a message coming from a remote gateway. Send to any sub/qsub // in our cluster that is matching. When receiving a message for an // account or subject for which there is no interest in this cluster // an A-/RS- protocol may be send back. // func (c *client) processInboundGatewayMsg(msg []byte) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) if c.opts.Verbose { c.sendOK() } // Mostly under testing scenarios. if c.srv == nil { return } // If the subject (c.pa.subject) has the gateway prefix, this function will // handle it. if c.handleGatewayReply(msg) { // We are done here. return } acc, r := c.getAccAndResultFromCache() if acc == nil { c.Debugf("Unknown account %q for gateway message on subject: %q", c.pa.account, c.pa.subject) c.srv.gatewayHandleAccountNoInterest(c, c.pa.account) return } // Check if this is a service reply subject (_R_) noInterest := len(r.psubs) == 0 checkNoInterest := true if acc.NumServiceImports() > 0 { if isServiceReply(c.pa.subject) { checkNoInterest = false } else { // We need to eliminate the subject interest from the service imports here to // make sure we send the proper no interest if the service import is the only interest. noInterest = true for _, sub := range r.psubs { // sub.si indicates that this is a subscription for service import, and is immutable. // So sub.si is false, then this is a subscription for something else, so there is // actually proper interest. if !sub.si { noInterest = false break } } } } if checkNoInterest && noInterest { // If there is no interest on plain subs, possibly send an RS-, // even if there is qsubs interest. c.srv.gatewayHandleSubjectNoInterest(c, acc, c.pa.account, c.pa.subject) // If there is also no queue filter, then no point in continuing // (even if r.qsubs i > 0). if len(c.pa.queues) == 0 { return } } c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, pmrNoFlag) } // Indicates that the remote which we are sending messages to // has decided to send us all its subs interest so that we // stop doing optimistic sends. // func (c *client) gatewayAllSubsReceiveStart(info *Info) { account := getAccountFromGatewayCommand(c, info, "start") if account == "" { return } c.Debugf("Gateway %q: switching account %q to %s mode", info.Gateway, account, InterestOnly) // Since the remote would send us this start command // only after sending us too many RS- for this account, // we should always have an entry here. // TODO(ik): Should we close connection with protocol violation // error if that happens? ei, _ := c.gw.outsim.Load(account) if ei != nil { e := ei.(*outsie) e.Lock() e.mode = Transitioning e.Unlock() } else { e := &outsie{sl: NewSublistWithCache()} e.mode = Transitioning c.mu.Lock() c.gw.outsim.Store(account, e) c.mu.Unlock() } } // Indicates that the remote has finished sending all its // subscriptions and we should now not send unless we know // there is explicit interest. // func (c *client) gatewayAllSubsReceiveComplete(info *Info) { account := getAccountFromGatewayCommand(c, info, "complete") if account == "" { return } // Done receiving all subs from remote. Set the `ni` // map to nil so that gatewayInterest() no longer // uses it. ei, _ := c.gw.outsim.Load(string(account)) if ei != nil { e := ei.(*outsie) // Needs locking here since `ni` is checked by // many go-routines calling gatewayInterest() e.Lock() e.ni = nil e.mode = InterestOnly e.Unlock() c.Debugf("Gateway %q: switching account %q to %s mode complete", info.Gateway, account, InterestOnly) } } // small helper to get the account name from the INFO command. func getAccountFromGatewayCommand(c *client, info *Info, cmd string) string { if info.GatewayCmdPayload == nil { c.sendErrAndErr(fmt.Sprintf("Account absent from receive-all-subscriptions-%s command", cmd)) c.closeConnection(ProtocolViolation) return "" } return string(info.GatewayCmdPayload) } // Switch to send-all-subs mode for the given gateway and account. // This is invoked when processing an inbound message and we // reach a point where we had to send a lot of RS- for this // account. We will send an INFO protocol to indicate that we // start sending all our subs (for this account), followed by // all subs (RS+) and finally an INFO to indicate the end of it. // The remote will then send messages only if it finds explicit // interest in the sublist created based on all RS+ that we just // sent. // The client's lock is held on entry. // func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie, accName string) { // Set this map to nil so that the no-interest is no longer checked. e.ni = nil // Switch mode to transitioning to prevent switchAccountToInterestMode // to possibly call this function multiple times. e.mode = Transitioning s := c.srv remoteGWName := c.gw.name c.Debugf("Gateway %q: switching account %q to %s mode", remoteGWName, accName, InterestOnly) // Function that will create an INFO protocol // and set proper command. sendCmd := func(cmd byte, useLock bool) { // Use bare server info and simply set the // gateway name and command info := Info{ Gateway: s.gateway.name, GatewayCmd: cmd, GatewayCmdPayload: []byte(accName), } b, _ := json.Marshal(&info) infoJSON := []byte(fmt.Sprintf(InfoProto, b)) if useLock { c.mu.Lock() } c.enqueueProto(infoJSON) if useLock { c.mu.Unlock() } } // Send the start command. When remote receives this, // it may continue to send optimistic messages, but // it will start to register RS+/RS- in sublist instead // of noInterest map. sendCmd(gatewayCmdAllSubsStart, false) // Execute this in separate go-routine as to not block // the readLoop (which may cause the otherside to close // the connection due to slow consumer) s.startGoRoutine(func() { defer s.grWG.Done() s.sendAccountSubsToGateway(c, []byte(accName)) // Send the complete command. When the remote receives // this, it will not send a message unless it has a // matching sub from us. sendCmd(gatewayCmdAllSubsComplete, true) c.Debugf("Gateway %q: switching account %q to %s mode complete", remoteGWName, accName, InterestOnly) }) } // Keeps track of the routed reply to be used when/if application sends back a // message on the reply without the prefix. // If `client` is not nil, it will be stored in the client gwReplyMapping structure, // and client lock is held on entry. // If `client` is nil, the mapping is stored in the client's account's gwReplyMapping // structure. Account lock will be explicitly acquired. // This is a server receiver because we use a timer interval that is avail in // Server.gateway object. func (s *Server) trackGWReply(c *client, acc *Account, reply, routedReply []byte) { var l sync.Locker var g *gwReplyMapping if acc != nil { acc.mu.Lock() defer acc.mu.Unlock() g = &acc.gwReplyMapping l = &acc.mu } else { g = &c.gwReplyMapping l = &c.mu } ttl := s.gateway.recSubExp wasEmpty := len(g.mapping) == 0 if g.mapping == nil { g.mapping = make(map[string]*gwReplyMap) } // The reason we pass both `reply` and `routedReply`, is that in some cases, // `routedReply` may have a deliver subject appended, something look like: // "_GR_.xxx.yyy.$JS.ACK.$MQTT_msgs.someid.1.1.1.1620086713306484000.0@$MQTT.msgs.foo" // but `reply` has already been cleaned up (delivery subject removed from tail): // "$JS.ACK.$MQTT_msgs.someid.1.1.1.1620086713306484000.0" // So we will use that knowledge so we don't have to make any cleaning here. routedReply = routedReply[:gwSubjectOffset+len(reply)] // We need to make a copy so that we don't reference the underlying // read buffer. ms := string(routedReply) grm := &gwReplyMap{ms: ms, exp: time.Now().Add(ttl).UnixNano()} // If we are here with the same key but different mapped replies // (say $GNR._.A.srv1.bar and then $GNR._.B.srv2.bar), we need to // store it otherwise we would take the risk of the reply not // making it back. g.mapping[ms[gwSubjectOffset:]] = grm if wasEmpty { atomic.StoreInt32(&g.check, 1) s.gwrm.m.Store(g, l) if atomic.CompareAndSwapInt32(&s.gwrm.w, 0, 1) { select { case s.gwrm.ch <- ttl: default: } } } } // Starts a long lived go routine that is responsible to // remove GW reply mapping that have expired. func (s *Server) startGWReplyMapExpiration() { s.mu.Lock() s.gwrm.ch = make(chan time.Duration, 1) s.mu.Unlock() s.startGoRoutine(func() { defer s.grWG.Done() t := time.NewTimer(time.Hour) var ttl time.Duration for { select { case <-t.C: if ttl == 0 { t.Reset(time.Hour) continue } now := time.Now().UnixNano() mapEmpty := true s.gwrm.m.Range(func(k, v interface{}) bool { g := k.(*gwReplyMapping) l := v.(sync.Locker) l.Lock() for k, grm := range g.mapping { if grm.exp <= now { delete(g.mapping, k) if len(g.mapping) == 0 { atomic.StoreInt32(&g.check, 0) s.gwrm.m.Delete(g) } } } l.Unlock() mapEmpty = false return true }) if mapEmpty && atomic.CompareAndSwapInt32(&s.gwrm.w, 1, 0) { ttl = 0 t.Reset(time.Hour) } else { t.Reset(ttl) } case cttl := <-s.gwrm.ch: ttl = cttl if !t.Stop() { select { case <-t.C: default: } } t.Reset(ttl) case <-s.quitCh: return } } }) }