mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Merge branch 'discovery'
This commit is contained in:
@@ -58,6 +58,7 @@ func main() {
|
||||
flag.BoolVar(&showVersion, "v", false, "Print version information.")
|
||||
flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port")
|
||||
flag.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")
|
||||
flag.StringVar(&opts.ClusterListenStr, "cluster", "", "Cluster url from which members can solicit routes.")
|
||||
flag.StringVar(&opts.ClusterListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.")
|
||||
flag.BoolVar(&showTlsHelp, "help_tls", false, "TLS help.")
|
||||
flag.BoolVar(&opts.TLS, "tls", false, "Enable TLS.")
|
||||
@@ -203,6 +204,9 @@ func configureTLS(opts *server.Options) {
|
||||
|
||||
func configureClusterOpts(opts *server.Options) error {
|
||||
if opts.ClusterListenStr == "" {
|
||||
if opts.RoutesStr != "" {
|
||||
server.PrintAndDie("Solicited routes require cluster capabilities, e.g. --cluster.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -228,5 +232,10 @@ func configureClusterOpts(opts *server.Options) error {
|
||||
opts.ClusterUsername = user
|
||||
}
|
||||
|
||||
// If we have routes but no config file, fill in here.
|
||||
if opts.RoutesStr != "" && opts.Routes == nil {
|
||||
opts.Routes = server.RoutesFromStr(opts.RoutesStr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ type client struct {
|
||||
ptmr *time.Timer
|
||||
pout int
|
||||
msgb [msgScratchSize]byte
|
||||
|
||||
parseState
|
||||
stats
|
||||
|
||||
@@ -963,7 +964,7 @@ func (c *client) closeConnection() {
|
||||
if rid != "" && srv.remotes[rid] != nil {
|
||||
Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
|
||||
return
|
||||
} else {
|
||||
} else if c.route.routeType != Implicit {
|
||||
Debugf("Attempting reconnect for solicited route \"%s\"", c.route.url)
|
||||
go srv.reConnectToRoute(c.route.url)
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "0.7.2"
|
||||
VERSION = "0.7.3"
|
||||
|
||||
// DEFAULT_PORT is the deault port for client connections.
|
||||
DEFAULT_PORT = 4222
|
||||
|
||||
@@ -182,18 +182,19 @@ type Routez struct {
|
||||
|
||||
// RouteInfo has detailed information on a per connection basis.
|
||||
type RouteInfo struct {
|
||||
Rid uint64 `json:"rid"`
|
||||
RemoteId string `json:"remote_id"`
|
||||
DidSolicit bool `json:"did_solicit"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
Pending int `json:"pending_size"`
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
NumSubs uint32 `json:"subscriptions"`
|
||||
Subs []string `json:"subscriptions_list,omitempty"`
|
||||
Rid uint64 `json:"rid"`
|
||||
RemoteId string `json:"remote_id"`
|
||||
DidSolicit bool `json:"did_solicit"`
|
||||
IsConfigured bool `json:"is_configured"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
Pending int `json:"pending_size"`
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
NumSubs uint32 `json:"subscriptions"`
|
||||
Subs []string `json:"subscriptions_list,omitempty"`
|
||||
}
|
||||
|
||||
// HandleRoutez process HTTP requests for route information.
|
||||
@@ -207,23 +208,24 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
|
||||
s.mu.Lock()
|
||||
rs.NumRoutes = len(s.routes)
|
||||
|
||||
for _, route := range s.routes {
|
||||
for _, r := range s.routes {
|
||||
ri := &RouteInfo{
|
||||
Rid: route.cid,
|
||||
RemoteId: route.route.remoteID,
|
||||
DidSolicit: route.route.didSolicit,
|
||||
InMsgs: route.inMsgs,
|
||||
OutMsgs: route.outMsgs,
|
||||
InBytes: route.inBytes,
|
||||
OutBytes: route.outBytes,
|
||||
NumSubs: route.subs.Count(),
|
||||
Rid: r.cid,
|
||||
RemoteId: r.route.remoteID,
|
||||
DidSolicit: r.route.didSolicit,
|
||||
IsConfigured: r.route.routeType == Explicit,
|
||||
InMsgs: r.inMsgs,
|
||||
OutMsgs: r.outMsgs,
|
||||
InBytes: r.inBytes,
|
||||
OutBytes: r.outBytes,
|
||||
NumSubs: r.subs.Count(),
|
||||
}
|
||||
|
||||
if subs == 1 {
|
||||
ri.Subs = castToSliceString(route.subs.All())
|
||||
ri.Subs = castToSliceString(r.subs.All())
|
||||
}
|
||||
|
||||
if ip, ok := route.nc.(*net.TCPConn); ok {
|
||||
if ip, ok := r.nc.(*net.TCPConn); ok {
|
||||
addr := ip.RemoteAddr().(*net.TCPAddr)
|
||||
ri.Port = addr.Port
|
||||
ri.IP = addr.IP.String()
|
||||
|
||||
@@ -23,6 +23,7 @@ var DefaultMonitorOptions = Options{
|
||||
Host: "localhost",
|
||||
Port: CLIENT_PORT,
|
||||
HTTPPort: MONITOR_PORT,
|
||||
ClusterHost: "localhost",
|
||||
ClusterPort: CLUSTER_PORT,
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
@@ -680,6 +681,7 @@ func TestConnzWithRoutes(t *testing.T) {
|
||||
var opts = Options{
|
||||
Host: "localhost",
|
||||
Port: CLIENT_PORT + 1,
|
||||
ClusterHost: "localhost",
|
||||
ClusterPort: CLUSTER_PORT + 1,
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
|
||||
125
server/route.go
125
server/route.go
@@ -11,14 +11,36 @@ import (
|
||||
"net"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Designate the router type
|
||||
type RouteType int
|
||||
|
||||
// Type of Route
|
||||
const (
|
||||
// This route we learned from speaking to other routes.
|
||||
Implicit RouteType = iota
|
||||
// This route was explicitly configured.
|
||||
Explicit
|
||||
)
|
||||
|
||||
type route struct {
|
||||
remoteID string
|
||||
didSolicit bool
|
||||
url *url.URL
|
||||
remoteID string
|
||||
didSolicit bool
|
||||
routeType RouteType
|
||||
url *url.URL
|
||||
authRequired bool
|
||||
tlsRequired bool
|
||||
}
|
||||
|
||||
type RemoteInfo struct {
|
||||
RemoteID string `json:"id"`
|
||||
URL string `json:"url"`
|
||||
AuthRequired bool `json:"auth_required"`
|
||||
TLSRequired bool `json:"tls_required"`
|
||||
}
|
||||
|
||||
type connectInfo struct {
|
||||
@@ -30,7 +52,10 @@ type connectInfo struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
const conProto = "CONNECT %s" + _CRLF_
|
||||
const (
|
||||
ConProto = "CONNECT %s" + _CRLF_
|
||||
InfoProto = "INFO %s" + _CRLF_
|
||||
)
|
||||
|
||||
// Lock should be held entering here.
|
||||
func (c *client) sendConnect(tlsRequired bool) {
|
||||
@@ -49,11 +74,11 @@ func (c *client) sendConnect(tlsRequired bool) {
|
||||
}
|
||||
b, err := json.Marshal(cinfo)
|
||||
if err != nil {
|
||||
Errorf("Error marshalling CONNECT to route: %v\n", err)
|
||||
c.Errorf("Error marshalling CONNECT to route: %v\n", err)
|
||||
c.closeConnection()
|
||||
return
|
||||
}
|
||||
c.bw.WriteString(fmt.Sprintf(conProto, b))
|
||||
c.bw.WriteString(fmt.Sprintf(ConProto, b))
|
||||
c.bw.Flush()
|
||||
}
|
||||
|
||||
@@ -64,7 +89,25 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Copy over important information
|
||||
c.route.remoteID = info.ID
|
||||
c.route.authRequired = info.AuthRequired
|
||||
c.route.tlsRequired = info.TLSRequired
|
||||
|
||||
// If we do not know this route's URL, construct one on the fly
|
||||
// from the information provided.
|
||||
if c.route.url == nil {
|
||||
// Add in the URL from host and port
|
||||
hp := net.JoinHostPort(info.Host, strconv.Itoa(info.Port))
|
||||
url, err := url.Parse(fmt.Sprintf("nats-route://%s/", hp))
|
||||
if err != nil {
|
||||
c.Errorf("Error parsing URL from INFO: %v\n", err)
|
||||
c.mu.Unlock()
|
||||
c.closeConnection()
|
||||
return
|
||||
}
|
||||
c.route.url = url
|
||||
}
|
||||
|
||||
// Check to see if we have this remote already registered.
|
||||
// This can happen when both servers have routes to each other.
|
||||
@@ -75,12 +118,39 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
c.Debugf("Registering remote route %q", info.ID)
|
||||
// Send our local subscriptions to this route.
|
||||
s.sendLocalSubsToRoute(c)
|
||||
if len(info.Routes) > 0 {
|
||||
s.processImplicitRoutes(info.Routes)
|
||||
}
|
||||
} else {
|
||||
c.Debugf("Detected duplicate remote route %q", info.ID)
|
||||
c.closeConnection()
|
||||
}
|
||||
}
|
||||
|
||||
// This will process implicit route information from other servers.
|
||||
// We will check to see if we have configured or are already connected,
|
||||
// and if so we will ignore. Otherwise we will attempt to connect.
|
||||
func (s *Server) processImplicitRoutes(routes []RemoteInfo) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for _, ri := range routes {
|
||||
if _, exists := s.remotes[ri.RemoteID]; exists {
|
||||
continue
|
||||
}
|
||||
// We have a new route that we do not currently know about.
|
||||
// Process here and solicit a connection.
|
||||
r, err := url.Parse(ri.URL)
|
||||
if err != nil {
|
||||
Debugf("Error parsing URL from Remote INFO: %v\n", err)
|
||||
continue
|
||||
}
|
||||
if ri.AuthRequired {
|
||||
r.User = url.UserPassword(s.opts.ClusterUsername, s.opts.ClusterPassword)
|
||||
}
|
||||
go s.connectToRoute(r)
|
||||
}
|
||||
}
|
||||
|
||||
// This will send local subscription state to a new route connection.
|
||||
// FIXME(dlc) - This could be a DOS or perf issue with many clients
|
||||
// and large subscription space. Plus buffering in place not a good idea.
|
||||
@@ -89,14 +159,11 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
|
||||
|
||||
s.mu.Lock()
|
||||
if s.routes[route.cid] == nil {
|
||||
|
||||
// We are too early, let createRoute call this function.
|
||||
route.mu.Lock()
|
||||
route.sendLocalSubs = true
|
||||
route.mu.Unlock()
|
||||
|
||||
s.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -125,15 +192,35 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
|
||||
func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
didSolicit := rURL != nil
|
||||
r := &route{didSolicit: didSolicit}
|
||||
for _, route := range s.opts.Routes {
|
||||
if rURL == route {
|
||||
r.routeType = Explicit
|
||||
}
|
||||
}
|
||||
c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
|
||||
|
||||
// Grab server variables.
|
||||
// Grab server variables and clone known routes.
|
||||
s.mu.Lock()
|
||||
info := s.routeInfoJSON
|
||||
authRequired := s.routeInfo.AuthRequired
|
||||
tlsRequired := s.routeInfo.TLSRequired
|
||||
// copy
|
||||
info := s.routeInfo
|
||||
for _, r := range s.routes {
|
||||
r.mu.Lock()
|
||||
if r.route.url != nil {
|
||||
ri := RemoteInfo{
|
||||
RemoteID: r.route.remoteID,
|
||||
URL: fmt.Sprintf("%s", r.route.url),
|
||||
AuthRequired: r.route.authRequired,
|
||||
TLSRequired: r.route.tlsRequired,
|
||||
}
|
||||
info.Routes = append(info.Routes, ri)
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
authRequired := info.AuthRequired
|
||||
tlsRequired := info.TLSRequired
|
||||
|
||||
// Grab lock
|
||||
c.mu.Lock()
|
||||
|
||||
@@ -202,8 +289,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
c.sendConnect(tlsRequired)
|
||||
}
|
||||
|
||||
// Add other routes in that are known to the info payload
|
||||
b, _ := json.Marshal(info)
|
||||
infoJSON := []byte(fmt.Sprintf(InfoProto, b))
|
||||
|
||||
// Send our info to the other side.
|
||||
s.sendInfo(c, info)
|
||||
s.sendInfo(c, infoJSON)
|
||||
|
||||
// Check for Auth required state for incoming connections.
|
||||
if authRequired && !didSolicit {
|
||||
@@ -418,12 +509,6 @@ func (s *Server) StartRouting() {
|
||||
info.AuthRequired = true
|
||||
}
|
||||
s.routeInfo = info
|
||||
// Generate the info json
|
||||
b, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
Fatalf("Error marshalling Route INFO JSON: %+v\n", err)
|
||||
}
|
||||
s.routeInfoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF))
|
||||
|
||||
// Spin up the accept loop
|
||||
ch := make(chan struct{})
|
||||
|
||||
@@ -26,16 +26,17 @@ import (
|
||||
// Info is the information sent to clients to help them understand information
|
||||
// about this server.
|
||||
type Info struct {
|
||||
ID string `json:"server_id"`
|
||||
Version string `json:"version"`
|
||||
GoVersion string `json:"go"`
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
AuthRequired bool `json:"auth_required"`
|
||||
SSLRequired bool `json:"ssl_required"` // ssl json used for older clients
|
||||
TLSRequired bool `json:"tls_required"`
|
||||
TLSVerify bool `json:"tls_verify"`
|
||||
MaxPayload int `json:"max_payload"`
|
||||
ID string `json:"server_id"`
|
||||
Version string `json:"version"`
|
||||
GoVersion string `json:"go"`
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
AuthRequired bool `json:"auth_required"`
|
||||
SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients
|
||||
TLSRequired bool `json:"tls_required"`
|
||||
TLSVerify bool `json:"tls_verify"`
|
||||
MaxPayload int `json:"max_payload"`
|
||||
Routes []RemoteInfo `json:"routes,omitempty"`
|
||||
}
|
||||
|
||||
// Server is our main struct.
|
||||
@@ -62,7 +63,6 @@ type Server struct {
|
||||
|
||||
routeListener net.Listener
|
||||
routeInfo Info
|
||||
routeInfoJSON []byte
|
||||
rcQuit chan bool
|
||||
}
|
||||
|
||||
@@ -707,3 +707,10 @@ func (s *Server) GetListenEndpoint() string {
|
||||
// when the listener is started, due to the use of RANDOM_PORT
|
||||
return net.JoinHostPort(host, strconv.Itoa(s.opts.Port))
|
||||
}
|
||||
|
||||
// Server's ID
|
||||
func (s *Server) Id() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.info.ID
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ TLS Options:
|
||||
|
||||
Cluster Options:
|
||||
--routes [rurl-1, rurl-2] Routes to solicit and connect
|
||||
--cluster [cluster url] Cluster URL for solicited routes
|
||||
|
||||
Common Options:
|
||||
-h, --help Show this message
|
||||
|
||||
19
test/configs/auth_seed.conf
Normal file
19
test/configs/auth_seed.conf
Normal file
@@ -0,0 +1,19 @@
|
||||
# Copyright 2015 Apcera Inc. All rights reserved.
|
||||
|
||||
# Cluster Seed Node
|
||||
|
||||
port: 4222
|
||||
net: '127.0.0.1'
|
||||
|
||||
http_port: 8222
|
||||
|
||||
cluster {
|
||||
host: '127.0.0.1'
|
||||
port: 4248
|
||||
|
||||
authorization {
|
||||
user: ruser
|
||||
password: T0PS3cr3T!
|
||||
timeout: 1
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
# Copyright 2012-2013 Apcera Inc. All rights reserved.
|
||||
# Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
|
||||
# Cluster config file
|
||||
|
||||
host: "127.0.0.1"
|
||||
port: 4242
|
||||
#net: apcera.me # net interface
|
||||
|
||||
cluster {
|
||||
host: '127.0.0.1'
|
||||
@@ -24,4 +24,3 @@ cluster {
|
||||
nats-route://foo:bar@127.0.0.1:4246
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
|
||||
# Config file to test overrides to client
|
||||
|
||||
host: "127.0.0.1"
|
||||
port: 4224
|
||||
|
||||
# maximum payload
|
||||
max_payload: 2222
|
||||
|
||||
|
||||
13
test/configs/seed.conf
Normal file
13
test/configs/seed.conf
Normal file
@@ -0,0 +1,13 @@
|
||||
# Copyright 2015 Apcera Inc. All rights reserved.
|
||||
|
||||
# Cluster Seed Node
|
||||
|
||||
port: 4222
|
||||
net: '127.0.0.1'
|
||||
|
||||
http_port: 8222
|
||||
|
||||
cluster {
|
||||
host: '127.0.0.1'
|
||||
port: 4248
|
||||
}
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
# Cluster Server A
|
||||
|
||||
host: '127.0.0.1'
|
||||
port: 4222
|
||||
|
||||
cluster {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
# Cluster Server B
|
||||
|
||||
host: '127.0.0.1'
|
||||
port: 4224
|
||||
|
||||
cluster {
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
func TestResolveRandomPort(t *testing.T) {
|
||||
opts := &server.Options{Port: server.RANDOM_PORT}
|
||||
opts := &server.Options{Host: "127.0.0.1", Port: server.RANDOM_PORT}
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
|
||||
355
test/route_discovery_test.go
Normal file
355
test/route_discovery_test.go
Normal file
@@ -0,0 +1,355 @@
|
||||
// Copyright 2015 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/gnatsd/server"
|
||||
)
|
||||
|
||||
func runSeedServer(t *testing.T) (*server.Server, *server.Options) {
|
||||
return RunServerWithConfig("./configs/seed.conf")
|
||||
}
|
||||
|
||||
func runAuthSeedServer(t *testing.T) (*server.Server, *server.Options) {
|
||||
return RunServerWithConfig("./configs/auth_seed.conf")
|
||||
}
|
||||
|
||||
func TestSeedFirstRouteInfo(t *testing.T) {
|
||||
s, opts := runSeedServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
defer rc.Close()
|
||||
|
||||
_, routeExpect := setupRoute(t, rc, opts)
|
||||
buf := routeExpect(infoRe)
|
||||
|
||||
info := server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
}
|
||||
|
||||
if len(info.Routes) != 0 {
|
||||
t.Fatalf("Expected len of []Routes to be zero vs %d\n", len(info.Routes))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeedMultipleRouteInfo(t *testing.T) {
|
||||
s, opts := runSeedServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
rc1 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
defer rc1.Close()
|
||||
|
||||
routeSend1, route1Expect := setupRoute(t, rc1, opts)
|
||||
route1Expect(infoRe)
|
||||
|
||||
rc1ID := "2222"
|
||||
rc1Port := 22
|
||||
rc1Host := "127.0.0.1"
|
||||
|
||||
hp1 := fmt.Sprintf("nats-route://%s/", net.JoinHostPort(rc1Host, strconv.Itoa(rc1Port)))
|
||||
|
||||
// register ourselves via INFO
|
||||
r1Info := server.Info{ID: rc1ID, Host: rc1Host, Port: rc1Port}
|
||||
b, _ := json.Marshal(r1Info)
|
||||
infoJSON := fmt.Sprintf(server.InfoProto, b)
|
||||
routeSend1(infoJSON)
|
||||
routeSend1("PING\r\n")
|
||||
route1Expect(pongRe)
|
||||
|
||||
rc2 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
defer rc2.Close()
|
||||
|
||||
routeSend2, route2Expect := setupRoute(t, rc2, opts)
|
||||
|
||||
rc2ID := "2224"
|
||||
rc2Port := 24
|
||||
rc2Host := "127.0.0.1"
|
||||
|
||||
// hp2 := net.JoinHostPort(rc2Host, strconv.Itoa(rc2Port))
|
||||
|
||||
// register ourselves via INFO
|
||||
r2Info := server.Info{ID: rc2ID, Host: rc2Host, Port: rc2Port}
|
||||
b, _ = json.Marshal(r2Info)
|
||||
infoJSON = fmt.Sprintf(server.InfoProto, b)
|
||||
routeSend2(infoJSON)
|
||||
|
||||
// Now read back out the info from the seed route
|
||||
buf := route2Expect(infoRe)
|
||||
|
||||
info := server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
}
|
||||
|
||||
if len(info.Routes) != 1 {
|
||||
t.Fatalf("Expected len of []Routes to be 1 vs %d\n", len(info.Routes))
|
||||
}
|
||||
|
||||
route := info.Routes[0]
|
||||
if route.RemoteID != rc1ID {
|
||||
t.Fatalf("Expected RemoteID of \"22\", got %q\n", route.RemoteID)
|
||||
}
|
||||
if route.URL == "" {
|
||||
t.Fatalf("Expected a URL for the implicit route")
|
||||
}
|
||||
if route.URL != hp1 {
|
||||
t.Fatalf("Expected URL Host of %s, got %s\n", hp1, route.URL)
|
||||
}
|
||||
|
||||
routeSend2("PING\r\n")
|
||||
route2Expect(pongRe)
|
||||
|
||||
// Now let's do a third.
|
||||
rc3 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
defer rc3.Close()
|
||||
|
||||
routeSend3, route3Expect := setupRoute(t, rc3, opts)
|
||||
|
||||
rc3ID := "2226"
|
||||
rc3Port := 26
|
||||
rc3Host := "127.0.0.1"
|
||||
|
||||
// register ourselves via INFO
|
||||
r3Info := server.Info{ID: rc3ID, Host: rc3Host, Port: rc3Port}
|
||||
b, _ = json.Marshal(r3Info)
|
||||
infoJSON = fmt.Sprintf(server.InfoProto, b)
|
||||
routeSend3(infoJSON)
|
||||
|
||||
// Now read back out the info from the seed route
|
||||
buf = route3Expect(infoRe)
|
||||
|
||||
info = server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
}
|
||||
if len(info.Routes) != 2 {
|
||||
t.Fatalf("Expected len of []Routes to be 2 vs %d\n", len(info.Routes))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeedSolicitWorks(t *testing.T) {
|
||||
s1, opts := runSeedServer(t)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Create the routes string for others to connect to the seed.
|
||||
routesStr := fmt.Sprintf("nats-route://%s:%d/", opts.ClusterHost, opts.ClusterPort)
|
||||
|
||||
// Run Server #2
|
||||
s2Opts := nextServerOpts(opts)
|
||||
s2Opts.Routes = server.RoutesFromStr(routesStr)
|
||||
|
||||
s2 := RunServer(s2Opts)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Run Server #3
|
||||
s3Opts := nextServerOpts(s2Opts)
|
||||
|
||||
s3 := RunServer(s3Opts)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Wait for a bit for graph to connect
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Grab Routez from monitor ports, make sure we are fully connected
|
||||
url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort)
|
||||
rz := readHttpRoutez(t, url)
|
||||
ris := expectRids(t, rz, []string{s2.Id(), s3.Id()})
|
||||
if ris[s2.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
if ris[s3.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s3.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
}
|
||||
if ris[s3.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s2.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
}
|
||||
if ris[s2.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
}
|
||||
|
||||
func TestChainedSolicitWorks(t *testing.T) {
|
||||
s1, opts := runSeedServer(t)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Create the routes string for others to connect to the seed.
|
||||
routesStr := fmt.Sprintf("nats-route://%s:%d/", opts.ClusterHost, opts.ClusterPort)
|
||||
|
||||
// Run Server #2
|
||||
s2Opts := nextServerOpts(opts)
|
||||
s2Opts.Routes = server.RoutesFromStr(routesStr)
|
||||
|
||||
s2 := RunServer(s2Opts)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Run Server #3
|
||||
s3Opts := nextServerOpts(s2Opts)
|
||||
// We will have s3 connect to s2, not the seed.
|
||||
routesStr = fmt.Sprintf("nats-route://%s:%d/", s2Opts.ClusterHost, s2Opts.ClusterPort)
|
||||
s3Opts.Routes = server.RoutesFromStr(routesStr)
|
||||
|
||||
s3 := RunServer(s3Opts)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Wait for a bit for graph to connect
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Grab Routez from monitor ports, make sure we are fully connected
|
||||
url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort)
|
||||
rz := readHttpRoutez(t, url)
|
||||
ris := expectRids(t, rz, []string{s2.Id(), s3.Id()})
|
||||
if ris[s2.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
if ris[s3.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s3.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
}
|
||||
if ris[s3.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s2.Id()})
|
||||
if ris[s2.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected s2 server to be configured\n")
|
||||
}
|
||||
if ris[s1.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected seed server not to be configured\n")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAuthSeedSolicitWorks(t *testing.T) {
|
||||
s1, opts := runAuthSeedServer(t)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Create the routes string for others to connect to the seed.
|
||||
routesStr := fmt.Sprintf("nats-route://%s:%s@%s:%d/", opts.ClusterUsername, opts.ClusterPassword, opts.ClusterHost, opts.ClusterPort)
|
||||
|
||||
// Run Server #2
|
||||
s2Opts := nextServerOpts(opts)
|
||||
s2Opts.Routes = server.RoutesFromStr(routesStr)
|
||||
|
||||
s2 := RunServer(s2Opts)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Run Server #3
|
||||
s3Opts := nextServerOpts(s2Opts)
|
||||
|
||||
s3 := RunServer(s3Opts)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Wait for a bit for graph to connect
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Grab Routez from monitor ports, make sure we are fully connected
|
||||
url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort)
|
||||
rz := readHttpRoutez(t, url)
|
||||
ris := expectRids(t, rz, []string{s2.Id(), s3.Id()})
|
||||
if ris[s2.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
if ris[s3.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s3.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
}
|
||||
if ris[s3.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort)
|
||||
rz = readHttpRoutez(t, url)
|
||||
ris = expectRids(t, rz, []string{s1.Id(), s2.Id()})
|
||||
if ris[s1.Id()].IsConfigured != true {
|
||||
t.Fatalf("Expected seed server to be configured\n")
|
||||
}
|
||||
if ris[s2.Id()].IsConfigured == true {
|
||||
t.Fatalf("Expected server not to be configured\n")
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to check for correct route memberships
|
||||
func expectRids(t *testing.T, rz *server.Routez, rids []string) map[string]*server.RouteInfo {
|
||||
if len(rids) != rz.NumRoutes {
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
t.Fatalf("[%s:%d] Expecting %d routes, got %d\n", fn, line, len(rids), rz.NumRoutes)
|
||||
}
|
||||
set := make(map[string]bool)
|
||||
for _, v := range rids {
|
||||
set[v] = true
|
||||
}
|
||||
// Make result map for additional checking
|
||||
ri := make(map[string]*server.RouteInfo)
|
||||
for _, r := range rz.Routes {
|
||||
if set[r.RemoteId] != true {
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
t.Fatalf("[%s:%d] Route with rid %s unexpected, expected %+v\n", fn, line, r.RemoteId, rids)
|
||||
}
|
||||
ri[r.RemoteId] = r
|
||||
}
|
||||
return ri
|
||||
}
|
||||
|
||||
// Helper to easily grab routez info.
|
||||
func readHttpRoutez(t *testing.T, url string) *server.Routez {
|
||||
resp, err := http.Get(url + "routez")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
// Do one retry - FIXME(dlc) - Why does this fail when running the solicit tests b2b?
|
||||
resp, _ = http.Get(url + "routez")
|
||||
if resp.StatusCode != 200 {
|
||||
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("Got an error reading the body: %v\n", err)
|
||||
}
|
||||
r := server.Routez{}
|
||||
if err := json.Unmarshal(body, &r); err != nil {
|
||||
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
||||
}
|
||||
return &r
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2014 Apcera Inc. All rights reserved.
|
||||
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
@@ -15,18 +15,7 @@ import (
|
||||
)
|
||||
|
||||
func runRouteServer(t *testing.T) (*server.Server, *server.Options) {
|
||||
opts, err := server.ProcessConfigFile("./configs/cluster.conf")
|
||||
|
||||
// Override for running in Go routine.
|
||||
opts.NoSigs = true
|
||||
opts.Debug = true
|
||||
opts.Trace = true
|
||||
opts.NoLog = true
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Error parsing config file: %v\n", err)
|
||||
}
|
||||
return RunServer(opts), opts
|
||||
return RunServerWithConfig("./configs/cluster.conf")
|
||||
}
|
||||
|
||||
func TestRouterListeningSocket(t *testing.T) {
|
||||
|
||||
@@ -423,3 +423,12 @@ func checkForPubSids(t tLogger, matches [][][]byte, sids []string) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to generate next opts to make sure no port conflicts etc.
|
||||
func nextServerOpts(opts *server.Options) *server.Options {
|
||||
nopts := *opts
|
||||
nopts.Port += 1
|
||||
nopts.ClusterPort += 1
|
||||
nopts.HTTPPort += 1
|
||||
return &nopts
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user