Files
nats-server/server/route.go
Ivan Kozlovic d98d51c8cc [FIXED] Possible cluster Authorization Error during config reload
When changing something in the cluster, such as Timeout and doing
a config reload, the route could be closed with an `Authorization
Error` report. Moreover, the route would not try to reconnect,
even if specified as an explicit route.

There were 2 issues:
- When checking if a solicited route is still valid, we need to
  check the Routes' URL against the URL that we try to connect
  to but not compare the pointers, but either do a reflect
  deep equal, or compare their String representation (this is
  what I do in the PR).
- We should check route authorization only if this is an accepted
  route, not an explicit one. The reason is that we a server
  explicitly connect to another server, it does not get the remote
  server's username and password. So the check would always fail.

Note: It is possible that a config reload even without any change
in the cluster triggers the code checking if routes are properly
authorized, and that happens if there is TLS specified. When
the reload code checks if config has changed, the TLSConfig
between the old and new seem to indicate a change, eventhough there
is apparently none. Another reload does not detect a change. I
suspect some internal state in TLSConfig that causes the
reflect.DeepEqual() to report a difference.

Note2: This commit also contains fixes to regex that staticcheck
would otherwise complain about (they did not have any special
character), and I have removed printing the usage on startup when
getting an error. The usage is still correctly printed if passing
a parameter that is unknown.

Resolves #719

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2018-08-15 18:20:29 -06:00

1104 lines
29 KiB
Go

// Copyright 2013-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/url"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/nats-io/gnatsd/util"
)
// RouteType designates the router type
type RouteType int
// Type of Route
const (
// This route we learned from speaking to other routes.
Implicit RouteType = iota
// This route was explicitly configured.
Explicit
)
type route struct {
remoteID string
didSolicit bool
retry bool
routeType RouteType
url *url.URL
authRequired bool
tlsRequired bool
closed bool
connectURLs []string
}
type connectInfo struct {
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
TLS bool `json:"tls_required"`
Name string `json:"name"`
}
// Used to hold onto mappings for unsubscribed
// routed queue subscribers.
type rqsub struct {
group []byte
atime time.Time
}
// Route protocol constants
const (
ConProto = "CONNECT %s" + _CRLF_
InfoProto = "INFO %s" + _CRLF_
)
// Clear up the timer and any map held for remote qsubs.
func (s *Server) clearRemoteQSubs() {
s.rqsMu.Lock()
defer s.rqsMu.Unlock()
if s.rqsubsTimer != nil {
s.rqsubsTimer.Stop()
s.rqsubsTimer = nil
}
s.rqsubs = nil
}
// Check to see if we can remove any of the remote qsubs mappings
func (s *Server) purgeRemoteQSubs() {
ri := s.getOpts().RQSubsSweep
s.rqsMu.Lock()
exp := time.Now().Add(-ri)
for k, rqsub := range s.rqsubs {
if exp.After(rqsub.atime) {
delete(s.rqsubs, k)
}
}
if s.rqsubsTimer != nil {
// Reset timer.
s.rqsubsTimer = time.AfterFunc(ri, s.purgeRemoteQSubs)
}
s.rqsMu.Unlock()
}
// Lookup a remote queue group sid.
func (s *Server) lookupRemoteQGroup(sid string) []byte {
s.rqsMu.RLock()
rqsub := s.rqsubs[sid]
s.rqsMu.RUnlock()
return rqsub.group
}
// This will hold onto a remote queue subscriber to allow
// for mapping and handling if we get a message after the
// subscription goes away.
func (s *Server) holdRemoteQSub(sub *subscription) {
// Should not happen, but protect anyway.
if len(sub.queue) == 0 {
return
}
// Add the entry
s.rqsMu.Lock()
// Start timer if needed.
if s.rqsubsTimer == nil {
ri := s.getOpts().RQSubsSweep
s.rqsubsTimer = time.AfterFunc(ri, s.purgeRemoteQSubs)
}
// Create map if needed.
if s.rqsubs == nil {
s.rqsubs = make(map[string]rqsub)
}
group := make([]byte, len(sub.queue))
copy(group, sub.queue)
rqsub := rqsub{group: group, atime: time.Now()}
s.rqsubs[routeSid(sub)] = rqsub
s.rqsMu.Unlock()
}
// This is for when we receive a directed message for a queue subscriber
// that has gone away. We reroute like a new message but scope to only
// the queue subscribers that it was originally intended for. We will
// prefer local clients, but will bounce to another route if needed.
func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
c.Debugf("Attempting redelivery of message for absent queue subscriber on group '%q'", group)
// We only care about qsubs here. Data structure not setup for optimized
// lookup for our specific group however.
var qsubs []*subscription
for _, qs := range r.qsubs {
if len(qs) != 0 && bytes.Equal(group, qs[0].queue) {
qsubs = qs
break
}
}
// If no match return.
if qsubs == nil {
c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group)
return
}
// We have a matched group of queue subscribers.
// We prefer a local subscriber since that was the original target.
// Spin prand if needed.
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// Hold onto a remote if we come across it to utilize in case no locals exist.
var rsub *subscription
startIndex := c.in.prand.Intn(len(qsubs))
for i := 0; i < len(qsubs); i++ {
index := (startIndex + i) % len(qsubs)
sub := qsubs[index]
if sub == nil {
continue
}
if rsub == nil && bytes.HasPrefix(sub.sid, []byte(QRSID)) {
rsub = sub
continue
}
mh := c.msgHeader(msgh[:], sub)
if c.deliverMsg(sub, mh, msg) {
c.Debugf("Redelivery succeeded for message on group '%q'", group)
return
}
}
// If we are here we failed to find a local, see if we snapshotted a
// remote sub, and if so deliver to that.
if rsub != nil {
mh := c.msgHeader(msgh[:], rsub)
if c.deliverMsg(rsub, mh, msg) {
c.Debugf("Re-routing message on group '%q' to remote server", group)
return
}
}
c.Debugf("Redelivery failed, no queue subscribers for message on group '%q'", group)
}
// processRoutedMsg processes messages inbound from a route.
func (c *client) processRoutedMsg(r *SublistResult, msg []byte) {
// Snapshot server.
srv := c.srv
msgh := c.prepMsgHeader()
si := len(msgh)
// If we have a queue subscription, deliver direct
// since they are sent direct via L2 semantics over routes.
// If the match is a queue subscription, we will return from
// here regardless if we find a sub.
isq, sub, err := srv.routeSidQueueSubscriber(c.pa.sid)
if isq {
if err != nil {
// We got an invalid QRSID, so stop here
c.Errorf("Unable to deliver routed queue message: %v", err)
return
}
didDeliver := false
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
didDeliver = c.deliverMsg(sub, mh, msg)
}
if !didDeliver && c.srv != nil {
group := c.srv.lookupRemoteQGroup(string(c.pa.sid))
c.reRouteQMsg(r, msgh, msg, group)
}
return
}
// Normal pub/sub message here
// Loop over all normal subscriptions that match.
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, if so we ignore to
// enforce 1-hop semantics.
if sub.client.typ == ROUTER {
continue
}
sub.client.mu.Lock()
if sub.client.nc == nil {
sub.client.mu.Unlock()
continue
}
sub.client.mu.Unlock()
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
}
// Lock should be held entering here.
func (c *client) sendConnect(tlsRequired bool) {
var user, pass string
if userInfo := c.route.url.User; userInfo != nil {
user = userInfo.Username()
pass, _ = userInfo.Password()
}
cinfo := connectInfo{
Echo: true,
Verbose: false,
Pedantic: false,
User: user,
Pass: pass,
TLS: tlsRequired,
Name: c.srv.info.ID,
}
b, err := json.Marshal(cinfo)
if err != nil {
c.Errorf("Error marshaling CONNECT to route: %v\n", err)
c.closeConnection(ProtocolViolation)
return
}
c.sendProto([]byte(fmt.Sprintf(ConProto, b)), true)
}
// Process the info message if we are a route.
func (c *client) processRouteInfo(info *Info) {
c.mu.Lock()
// Connection can be closed at any time (by auth timeout, etc).
// Does not make sense to continue here if connection is gone.
if c.route == nil || c.nc == nil {
c.mu.Unlock()
return
}
s := c.srv
remoteID := c.route.remoteID
// We receive an INFO from a server that informs us about another server,
// so the info.ID in the INFO protocol does not match the ID of this route.
if remoteID != "" && remoteID != info.ID {
c.mu.Unlock()
// Process this implicit route. We will check that it is not an explicit
// route and/or that it has not been connected already.
s.processImplicitRoute(info)
return
}
// Need to set this for the detection of the route to self to work
// in closeConnection().
c.route.remoteID = info.ID
// Detect route to self.
if c.route.remoteID == s.info.ID {
c.mu.Unlock()
c.closeConnection(DuplicateRoute)
return
}
// Copy over important information.
c.route.authRequired = info.AuthRequired
c.route.tlsRequired = info.TLSRequired
// If we do not know this route's URL, construct one on the fly
// from the information provided.
if c.route.url == nil {
// Add in the URL from host and port
hp := net.JoinHostPort(info.Host, strconv.Itoa(info.Port))
url, err := url.Parse(fmt.Sprintf("nats-route://%s/", hp))
if err != nil {
c.Errorf("Error parsing URL from INFO: %v\n", err)
c.mu.Unlock()
c.closeConnection(ParseError)
return
}
c.route.url = url
}
// Check to see if we have this remote already registered.
// This can happen when both servers have routes to each other.
c.mu.Unlock()
if added, sendInfo := s.addRoute(c, info); added {
c.Debugf("Registering remote route %q", info.ID)
// Send our local subscriptions to this route.
s.sendLocalSubsToRoute(c)
// sendInfo will be false if the route that we just accepted
// is the only route there is.
if sendInfo {
// The incoming INFO from the route will have IP set
// if it has Cluster.Advertise. In that case, use that
// otherwise contruct it from the remote TCP address.
if info.IP == "" {
// Need to get the remote IP address.
c.mu.Lock()
switch conn := c.nc.(type) {
case *net.TCPConn, *tls.Conn:
addr := conn.RemoteAddr().(*net.TCPAddr)
info.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(addr.IP.String(),
strconv.Itoa(info.Port)))
default:
info.IP = c.route.url.String()
}
c.mu.Unlock()
}
// Now let the known servers know about this new route
s.forwardNewRouteInfoToKnownServers(info)
}
// Unless disabled, possibly update the server's INFO protocol
// and send to clients that know how to handle async INFOs.
if !s.getOpts().Cluster.NoAdvertise {
s.addClientConnectURLsAndSendINFOToClients(info.ClientConnectURLs)
}
} else {
c.Debugf("Detected duplicate remote route %q", info.ID)
c.closeConnection(DuplicateRoute)
}
}
// sendAsyncInfoToClients sends an INFO protocol to all
// connected clients that accept async INFO updates.
// The server lock is held on entry.
func (s *Server) sendAsyncInfoToClients() {
// If there are no clients supporting async INFO protocols, we are done.
// Also don't send if we are shutting down...
if s.cproto == 0 || s.shutdown {
return
}
for _, c := range s.clients {
c.mu.Lock()
// Here, we are going to send only to the clients that are fully
// registered (server has received CONNECT and first PING). For
// clients that are not at this stage, this will happen in the
// processing of the first PING (see client.processPing)
if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
c.sendInfo(c.generateClientInfoJSON(s.copyInfo()))
}
c.mu.Unlock()
}
}
// This will process implicit route information received from another server.
// We will check to see if we have configured or are already connected,
// and if so we will ignore. Otherwise we will attempt to connect.
func (s *Server) processImplicitRoute(info *Info) {
remoteID := info.ID
s.mu.Lock()
defer s.mu.Unlock()
// Don't connect to ourself
if remoteID == s.info.ID {
return
}
// Check if this route already exists
if _, exists := s.remotes[remoteID]; exists {
return
}
// Check if we have this route as a configured route
if s.hasThisRouteConfigured(info) {
return
}
// Initiate the connection, using info.IP instead of info.URL here...
r, err := url.Parse(info.IP)
if err != nil {
s.Errorf("Error parsing URL from INFO: %v\n", err)
return
}
// Snapshot server options.
opts := s.getOpts()
if info.AuthRequired {
r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password)
}
s.startGoRoutine(func() { s.connectToRoute(r, false) })
}
// hasThisRouteConfigured returns true if info.Host:info.Port is present
// in the server's opts.Routes, false otherwise.
// Server lock is assumed to be held by caller.
func (s *Server) hasThisRouteConfigured(info *Info) bool {
urlToCheckExplicit := strings.ToLower(net.JoinHostPort(info.Host, strconv.Itoa(info.Port)))
for _, ri := range s.getOpts().Routes {
if strings.ToLower(ri.Host) == urlToCheckExplicit {
return true
}
}
return false
}
// forwardNewRouteInfoToKnownServers sends the INFO protocol of the new route
// to all routes known by this server. In turn, each server will contact this
// new route.
func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
s.mu.Lock()
defer s.mu.Unlock()
b, _ := json.Marshal(info)
infoJSON := []byte(fmt.Sprintf(InfoProto, b))
for _, r := range s.routes {
r.mu.Lock()
if r.route.remoteID != info.ID {
r.sendInfo(infoJSON)
}
r.mu.Unlock()
}
}
// canImport is whether or not we will send a SUB for interest to the other side.
// This is for ROUTER connections only.
// Lock is held on entry.
func (c *client) canImport(subject []byte) bool {
// Use pubAllowed() since this checks Publish permissions which
// is what Import maps to.
return c.pubAllowed(subject)
}
// canExport is whether or not we will accept a SUB from the remote for a given subject.
// This is for ROUTER connections only.
// Lock is held on entry
func (c *client) canExport(subject []byte) bool {
// Use canSubscribe() since this checks Subscribe permissions which
// is what Export maps to.
return c.canSubscribe(subject)
}
// Initialize or reset cluster's permissions.
// This is for ROUTER connections only.
// Client lock is held on entry
func (c *client) setRoutePermissions(perms *RoutePermissions) {
// Reset if some were set
if perms == nil {
c.perms = nil
return
}
// Convert route permissions to user permissions.
// The Import permission is mapped to Publish
// and Export permission is mapped to Subscribe.
// For meaning of Import/Export, see canImport and canExport.
p := &Permissions{
Publish: perms.Import,
Subscribe: perms.Export,
}
c.setPermissions(p)
}
// This will send local subscription state to a new route connection.
// FIXME(dlc) - This could be a DOS or perf issue with many clients
// and large subscription space. Plus buffering in place not a good idea.
func (s *Server) sendLocalSubsToRoute(route *client) {
var raw [4096]*subscription
subs := raw[:0]
s.sl.localSubs(&subs)
route.mu.Lock()
for _, sub := range subs {
// Send SUB interest only if subject has a match in import permissions
if !route.canImport(sub.subject) {
continue
}
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, routeSid(sub))
route.queueOutbound([]byte(proto))
if route.out.pb > int64(route.out.sz*2) {
route.flushSignal()
}
}
route.flushSignal()
route.mu.Unlock()
route.Debugf("Sent local subscriptions to route")
}
func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
// Snapshot server options.
opts := s.getOpts()
didSolicit := rURL != nil
r := &route{didSolicit: didSolicit}
for _, route := range opts.Routes {
if rURL != nil && (strings.ToLower(rURL.Host) == strings.ToLower(route.Host)) {
r.routeType = Explicit
}
}
c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
// Grab server variables
s.mu.Lock()
infoJSON := s.routeInfoJSON
authRequired := s.routeInfo.AuthRequired
tlsRequired := s.routeInfo.TLSRequired
s.mu.Unlock()
// Grab lock
c.mu.Lock()
// Initialize
c.initClient()
if didSolicit {
// Do this before the TLS code, otherwise, in case of failure
// and if route is explicit, it would try to reconnect to 'nil'...
r.url = rURL
// Set permissions associated with the route user (if applicable).
// No lock needed since we are already under client lock.
c.setRoutePermissions(opts.Cluster.Permissions)
}
// Check for TLS
if tlsRequired {
// Copy off the config to add in ServerName if we
tlsConfig := util.CloneTLSConfig(opts.Cluster.TLSConfig)
// If we solicited, we will act like the client, otherwise the server.
if didSolicit {
c.Debugf("Starting TLS route client handshake")
// Specify the ServerName we are expecting.
host, _, _ := net.SplitHostPort(rURL.Host)
tlsConfig.ServerName = host
c.nc = tls.Client(c.nc, tlsConfig)
} else {
c.Debugf("Starting TLS route server handshake")
c.nc = tls.Server(c.nc, tlsConfig)
}
conn := c.nc.(*tls.Conn)
// Setup the timeout
ttl := secondsToDuration(opts.Cluster.TLSTimeout)
time.AfterFunc(ttl, func() { tlsTimeout(c, conn) })
conn.SetReadDeadline(time.Now().Add(ttl))
c.mu.Unlock()
if err := conn.Handshake(); err != nil {
c.Errorf("TLS route handshake error: %v", err)
c.sendErr("Secure Connection - TLS Required")
c.closeConnection(TLSHandshakeError)
return nil
}
// Reset the read deadline
conn.SetReadDeadline(time.Time{})
// Re-Grab lock
c.mu.Lock()
// Verify that the connection did not go away while we released the lock.
if c.nc == nil {
c.mu.Unlock()
return nil
}
}
// Do final client initialization
// Set the Ping timer
c.setPingTimer()
// For routes, the "client" is added to s.routes only when processing
// the INFO protocol, that is much later.
// In the meantime, if the server shutsdown, there would be no reference
// to the client (connection) to be closed, leaving this readLoop
// uinterrupted, causing the Shutdown() to wait indefinitively.
// We need to store the client in a special map, under a special lock.
s.grMu.Lock()
running := s.grRunning
if running {
s.grTmpClients[c.cid] = c
}
s.grMu.Unlock()
if !running {
c.mu.Unlock()
c.setRouteNoReconnectOnClose()
c.closeConnection(ServerShutdown)
return nil
}
// Check for Auth required state for incoming connections.
// Make sure to do this before spinning up readLoop.
if authRequired && !didSolicit {
ttl := secondsToDuration(opts.Cluster.AuthTimeout)
c.setAuthTimer(ttl)
}
// Spin up the read loop.
s.startGoRoutine(func() { c.readLoop() })
// Spin up the write loop.
s.startGoRoutine(c.writeLoop)
if tlsRequired {
c.Debugf("TLS handshake complete")
cs := c.nc.(*tls.Conn).ConnectionState()
c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite))
}
// Queue Connect proto if we solicited the connection.
if didSolicit {
c.Debugf("Route connect msg sent")
c.sendConnect(tlsRequired)
}
// Send our info to the other side.
c.sendInfo(infoJSON)
c.mu.Unlock()
c.Noticef("Route connection created")
return c
}
const (
_CRLF_ = "\r\n"
_EMPTY_ = ""
)
const (
subProto = "SUB %s %s %s" + _CRLF_
unsubProto = "UNSUB %s" + _CRLF_
)
// FIXME(dlc) - Make these reserved and reject if they come in as a sid
// from a client connection.
// Route constants
const (
RSID = "RSID"
QRSID = "QRSID"
QRSID_LEN = len(QRSID)
)
// Parse the given rsid. If the protocol does not start with QRSID,
// returns false and no subscription nor error.
// If it does start with QRSID, returns true and possibly a subscription
// or an error if the QRSID protocol is malformed.
func (s *Server) routeSidQueueSubscriber(rsid []byte) (bool, *subscription, error) {
if !bytes.HasPrefix(rsid, []byte(QRSID)) {
return false, nil, nil
}
cid, sid, err := parseRouteQueueSid(rsid)
if err != nil {
return true, nil, err
}
s.mu.Lock()
client := s.clients[cid]
s.mu.Unlock()
if client == nil {
return true, nil, nil
}
client.mu.Lock()
sub, ok := client.subs[string(sid)]
client.mu.Unlock()
if ok {
return true, sub, nil
}
return true, nil, nil
}
// Creates a routable sid that can be used
// to reach remote subscriptions.
func routeSid(sub *subscription) string {
var qi string
if len(sub.queue) > 0 {
qi = "Q"
}
return fmt.Sprintf("%s%s:%d:%s", qi, RSID, sub.client.cid, sub.sid)
}
// Parse the given `rsid` knowing that it starts with `QRSID`.
// Returns the cid and sid or an error not a valid QRSID.
func parseRouteQueueSid(rsid []byte) (uint64, []byte, error) {
var (
cid uint64
sid []byte
cidFound bool
sidFound bool
)
// A valid QRSID needs to be at least QRSID:x:y
// First character here should be `:`
if len(rsid) >= QRSID_LEN+4 {
if rsid[QRSID_LEN] == ':' {
for i, count := QRSID_LEN+1, len(rsid); i < count; i++ {
switch rsid[i] {
case ':':
cid = uint64(parseInt64(rsid[QRSID_LEN+1 : i]))
cidFound = true
sid = rsid[i+1:]
}
}
if cidFound {
// We can't assume the content of sid, so as long
// as it is not len 0, we have to say it is a valid one.
if len(rsid) > 0 {
sidFound = true
}
}
}
}
if cidFound && sidFound {
return cid, sid, nil
}
return 0, nil, fmt.Errorf("invalid QRSID: %s", rsid)
}
func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
id := c.route.remoteID
sendInfo := false
s.mu.Lock()
if !s.running {
s.mu.Unlock()
return false, false
}
remote, exists := s.remotes[id]
if !exists {
s.routes[c.cid] = c
s.remotes[id] = c
c.mu.Lock()
c.route.connectURLs = info.ClientConnectURLs
cid := c.cid
c.mu.Unlock()
// Remove from the temporary map
s.grMu.Lock()
delete(s.grTmpClients, cid)
s.grMu.Unlock()
// we don't need to send if the only route is the one we just accepted.
sendInfo = len(s.routes) > 1
}
s.mu.Unlock()
if exists {
var r *route
c.mu.Lock()
// upgrade to solicited?
if c.route.didSolicit {
// Make a copy
rs := *c.route
r = &rs
}
c.mu.Unlock()
remote.mu.Lock()
// r will be not nil if c.route.didSolicit was true
if r != nil {
// If we upgrade to solicited, we still want to keep the remote's
// connectURLs. So transfer those.
r.connectURLs = remote.route.connectURLs
remote.route = r
}
// This is to mitigate the issue where both sides add the route
// on the opposite connection, and therefore end-up with both
// connections being dropped.
remote.route.retry = true
remote.mu.Unlock()
}
return !exists, sendInfo
}
func (s *Server) broadcastInterestToRoutes(sub *subscription, proto string) {
var arg []byte
if atomic.LoadInt32(&s.logging.trace) == 1 {
arg = []byte(proto[:len(proto)-LEN_CR_LF])
}
protoAsBytes := []byte(proto)
s.mu.Lock()
for _, route := range s.routes {
// FIXME(dlc) - Make same logic as deliverMsg
route.mu.Lock()
// The permission of this cluster applies to all routes, and each
// route will have the same `perms`, so check with the first route
// and send SUB interest only if subject has a match in import permissions.
// If there is no match, we stop here.
if !route.canImport(sub.subject) {
route.mu.Unlock()
break
}
route.sendProto(protoAsBytes, true)
route.mu.Unlock()
route.traceOutOp("", arg)
}
s.mu.Unlock()
}
// broadcastSubscribe will forward a client subscription
// to all active routes.
func (s *Server) broadcastSubscribe(sub *subscription) {
if s.numRoutes() == 0 {
return
}
rsid := routeSid(sub)
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
s.broadcastInterestToRoutes(sub, proto)
}
// broadcastUnSubscribe will forward a client unsubscribe
// action to all active routes.
func (s *Server) broadcastUnSubscribe(sub *subscription) {
if s.numRoutes() == 0 {
return
}
sub.client.mu.Lock()
// Max has no meaning on the other side of a route, so do not send.
hasMax := sub.max > 0 && sub.nm < sub.max
sub.client.mu.Unlock()
if hasMax {
return
}
rsid := routeSid(sub)
proto := fmt.Sprintf(unsubProto, rsid)
s.broadcastInterestToRoutes(sub, proto)
}
func (s *Server) routeAcceptLoop(ch chan struct{}) {
defer func() {
if ch != nil {
close(ch)
}
}()
// Snapshot server options.
opts := s.getOpts()
// Snapshot server options.
port := opts.Cluster.Port
if port == -1 {
port = 0
}
hp := net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(port))
l, e := net.Listen("tcp", hp)
if e != nil {
s.Fatalf("Error listening on router port: %d - %v", opts.Cluster.Port, e)
return
}
s.Noticef("Listening for route connections on %s",
net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
s.mu.Lock()
// Check for TLSConfig
tlsReq := opts.Cluster.TLSConfig != nil
info := Info{
ID: s.info.ID,
Version: s.info.Version,
AuthRequired: false,
TLSRequired: tlsReq,
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
}
// Set this if only if advertise is not disabled
if !opts.Cluster.NoAdvertise {
info.ClientConnectURLs = s.clientConnectURLs
}
// If we have selected a random port...
if port == 0 {
// Write resolved port back to options.
opts.Cluster.Port = l.Addr().(*net.TCPAddr).Port
}
// Keep track of actual listen port. This will be needed in case of
// config reload.
s.clusterActualPort = opts.Cluster.Port
// Check for Auth items
if opts.Cluster.Username != "" {
info.AuthRequired = true
}
s.routeInfo = info
// Possibly override Host/Port and set IP based on Cluster.Advertise
if err := s.setRouteInfoHostPortAndIP(); err != nil {
s.Fatalf("Error setting route INFO with Cluster.Advertise value of %s, err=%v", s.opts.Cluster.Advertise, err)
l.Close()
s.mu.Unlock()
return
}
// Setup state that can enable shutdown
s.routeListener = l
s.mu.Unlock()
// Let them know we are up
close(ch)
ch = nil
tmpDelay := ACCEPT_MIN_SLEEP
for s.isRunning() {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
s.Debugf("Temporary Route Accept Errorf(%v), sleeping %dms",
ne, tmpDelay/time.Millisecond)
time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else if s.isRunning() {
s.Noticef("Accept error: %v", err)
}
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
s.startGoRoutine(func() {
s.createRoute(conn, nil)
s.grWG.Done()
})
}
s.Debugf("Router accept loop exiting..")
s.done <- true
}
// Similar to setInfoHostPortAndGenerateJSON, but for routeInfo.
func (s *Server) setRouteInfoHostPortAndIP() error {
if s.opts.Cluster.Advertise != "" {
advHost, advPort, err := parseHostPort(s.opts.Cluster.Advertise, s.opts.Cluster.Port)
if err != nil {
return err
}
s.routeInfo.Host = advHost
s.routeInfo.Port = advPort
s.routeInfo.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort)))
} else {
s.routeInfo.Host = s.opts.Cluster.Host
s.routeInfo.Port = s.opts.Cluster.Port
s.routeInfo.IP = ""
}
// (re)generate the routeInfoJSON byte array
s.generateRouteInfoJSON()
return nil
}
// StartRouting will start the accept loop on the cluster host:port
// and will actively try to connect to listed routes.
func (s *Server) StartRouting(clientListenReady chan struct{}) {
defer s.grWG.Done()
// Wait for the client listen port to be opened, and
// the possible ephemeral port to be selected.
<-clientListenReady
// Spin up the accept loop
ch := make(chan struct{})
go s.routeAcceptLoop(ch)
<-ch
// Solicit Routes if needed.
s.solicitRoutes(s.getOpts().Routes)
}
func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) {
tryForEver := rtype == Explicit
// If A connects to B, and B to A (regardless if explicit or
// implicit - due to auto-discovery), and if each server first
// registers the route on the opposite TCP connection, the
// two connections will end-up being closed.
// Add some random delay to reduce risk of repeated failures.
delay := time.Duration(rand.Intn(100)) * time.Millisecond
if tryForEver {
delay += DEFAULT_ROUTE_RECONNECT
}
time.Sleep(delay)
s.connectToRoute(rURL, tryForEver)
}
// Checks to make sure the route is still valid.
func (s *Server) routeStillValid(rURL *url.URL) bool {
for _, ri := range s.getOpts().Routes {
if ri.String() == rURL.String() {
return true
}
}
return false
}
func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
// Snapshot server options.
opts := s.getOpts()
defer s.grWG.Done()
attempts := 0
for s.isRunning() && rURL != nil {
if tryForEver && !s.routeStillValid(rURL) {
return
}
s.Debugf("Trying to connect to route on %s", rURL.Host)
conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL)
if err != nil {
s.Errorf("Error trying to connect to route: %v", err)
if !tryForEver {
if opts.Cluster.ConnectRetries <= 0 {
return
}
attempts++
if attempts > opts.Cluster.ConnectRetries {
return
}
}
select {
case <-s.quitCh:
return
case <-time.After(DEFAULT_ROUTE_CONNECT):
continue
}
}
if tryForEver && !s.routeStillValid(rURL) {
conn.Close()
return
}
// We have a route connection here.
// Go ahead and create it and exit this func.
s.createRoute(conn, rURL)
return
}
}
func (c *client) isSolicitedRoute() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.typ == ROUTER && c.route != nil && c.route.didSolicit
}
func (s *Server) solicitRoutes(routes []*url.URL) {
for _, r := range routes {
route := r
s.startGoRoutine(func() { s.connectToRoute(route, true) })
}
}
func (s *Server) numRoutes() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.routes)
}