diff --git a/gnatsd.go b/gnatsd.go index 6635e9bd..c18e6bc8 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -58,6 +58,7 @@ func main() { flag.BoolVar(&showVersion, "v", false, "Print version information.") flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port") flag.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") + flag.StringVar(&opts.ClusterListenStr, "cluster", "", "Cluster url from which members can solicit routes.") flag.StringVar(&opts.ClusterListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") flag.BoolVar(&showTlsHelp, "help_tls", false, "TLS help.") flag.BoolVar(&opts.TLS, "tls", false, "Enable TLS.") @@ -203,6 +204,9 @@ func configureTLS(opts *server.Options) { func configureClusterOpts(opts *server.Options) error { if opts.ClusterListenStr == "" { + if opts.RoutesStr != "" { + server.PrintAndDie("Solicited routes require cluster capabilities, e.g. --cluster.") + } return nil } @@ -228,5 +232,10 @@ func configureClusterOpts(opts *server.Options) error { opts.ClusterUsername = user } + // If we have routes but no config file, fill in here. + if opts.RoutesStr != "" && opts.Routes == nil { + opts.Routes = server.RoutesFromStr(opts.RoutesStr) + } + return nil } diff --git a/server/client.go b/server/client.go index e67ef5b6..e246da3c 100644 --- a/server/client.go +++ b/server/client.go @@ -47,6 +47,7 @@ type client struct { ptmr *time.Timer pout int msgb [msgScratchSize]byte + parseState stats @@ -963,7 +964,7 @@ func (c *client) closeConnection() { if rid != "" && srv.remotes[rid] != nil { Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid) return - } else { + } else if c.route.routeType != Implicit { Debugf("Attempting reconnect for solicited route \"%s\"", c.route.url) go srv.reConnectToRoute(c.route.url) } diff --git a/server/const.go b/server/const.go index 89bd6945..3e4e3c8b 100644 --- a/server/const.go +++ b/server/const.go @@ -8,7 +8,7 @@ import ( const ( // VERSION is the current version for the server. - VERSION = "0.7.2" + VERSION = "0.7.3" // DEFAULT_PORT is the deault port for client connections. DEFAULT_PORT = 4222 diff --git a/server/monitor.go b/server/monitor.go index b76685b2..35d076f4 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -182,18 +182,19 @@ type Routez struct { // RouteInfo has detailed information on a per connection basis. type RouteInfo struct { - Rid uint64 `json:"rid"` - RemoteId string `json:"remote_id"` - DidSolicit bool `json:"did_solicit"` - IP string `json:"ip"` - Port int `json:"port"` - Pending int `json:"pending_size"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - NumSubs uint32 `json:"subscriptions"` - Subs []string `json:"subscriptions_list,omitempty"` + Rid uint64 `json:"rid"` + RemoteId string `json:"remote_id"` + DidSolicit bool `json:"did_solicit"` + IsConfigured bool `json:"is_configured"` + IP string `json:"ip"` + Port int `json:"port"` + Pending int `json:"pending_size"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + NumSubs uint32 `json:"subscriptions"` + Subs []string `json:"subscriptions_list,omitempty"` } // HandleRoutez process HTTP requests for route information. @@ -207,23 +208,24 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) { s.mu.Lock() rs.NumRoutes = len(s.routes) - for _, route := range s.routes { + for _, r := range s.routes { ri := &RouteInfo{ - Rid: route.cid, - RemoteId: route.route.remoteID, - DidSolicit: route.route.didSolicit, - InMsgs: route.inMsgs, - OutMsgs: route.outMsgs, - InBytes: route.inBytes, - OutBytes: route.outBytes, - NumSubs: route.subs.Count(), + Rid: r.cid, + RemoteId: r.route.remoteID, + DidSolicit: r.route.didSolicit, + IsConfigured: r.route.routeType == Explicit, + InMsgs: r.inMsgs, + OutMsgs: r.outMsgs, + InBytes: r.inBytes, + OutBytes: r.outBytes, + NumSubs: r.subs.Count(), } if subs == 1 { - ri.Subs = castToSliceString(route.subs.All()) + ri.Subs = castToSliceString(r.subs.All()) } - if ip, ok := route.nc.(*net.TCPConn); ok { + if ip, ok := r.nc.(*net.TCPConn); ok { addr := ip.RemoteAddr().(*net.TCPAddr) ri.Port = addr.Port ri.IP = addr.IP.String() diff --git a/server/monitor_test.go b/server/monitor_test.go index c263a98b..31caeb9e 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -23,6 +23,7 @@ var DefaultMonitorOptions = Options{ Host: "localhost", Port: CLIENT_PORT, HTTPPort: MONITOR_PORT, + ClusterHost: "localhost", ClusterPort: CLUSTER_PORT, NoLog: true, NoSigs: true, @@ -680,6 +681,7 @@ func TestConnzWithRoutes(t *testing.T) { var opts = Options{ Host: "localhost", Port: CLIENT_PORT + 1, + ClusterHost: "localhost", ClusterPort: CLUSTER_PORT + 1, NoLog: true, NoSigs: true, diff --git a/server/route.go b/server/route.go index fe279057..c667ad5c 100644 --- a/server/route.go +++ b/server/route.go @@ -11,14 +11,36 @@ import ( "net" "net/url" "regexp" + "strconv" "sync/atomic" "time" ) +// Designate the router type +type RouteType int + +// Type of Route +const ( + // This route we learned from speaking to other routes. + Implicit RouteType = iota + // This route was explicitly configured. + Explicit +) + type route struct { - remoteID string - didSolicit bool - url *url.URL + remoteID string + didSolicit bool + routeType RouteType + url *url.URL + authRequired bool + tlsRequired bool +} + +type RemoteInfo struct { + RemoteID string `json:"id"` + URL string `json:"url"` + AuthRequired bool `json:"auth_required"` + TLSRequired bool `json:"tls_required"` } type connectInfo struct { @@ -30,7 +52,10 @@ type connectInfo struct { Name string `json:"name"` } -const conProto = "CONNECT %s" + _CRLF_ +const ( + ConProto = "CONNECT %s" + _CRLF_ + InfoProto = "INFO %s" + _CRLF_ +) // Lock should be held entering here. func (c *client) sendConnect(tlsRequired bool) { @@ -49,11 +74,11 @@ func (c *client) sendConnect(tlsRequired bool) { } b, err := json.Marshal(cinfo) if err != nil { - Errorf("Error marshalling CONNECT to route: %v\n", err) + c.Errorf("Error marshalling CONNECT to route: %v\n", err) c.closeConnection() return } - c.bw.WriteString(fmt.Sprintf(conProto, b)) + c.bw.WriteString(fmt.Sprintf(ConProto, b)) c.bw.Flush() } @@ -64,7 +89,25 @@ func (c *client) processRouteInfo(info *Info) { c.mu.Unlock() return } + // Copy over important information c.route.remoteID = info.ID + c.route.authRequired = info.AuthRequired + c.route.tlsRequired = info.TLSRequired + + // If we do not know this route's URL, construct one on the fly + // from the information provided. + if c.route.url == nil { + // Add in the URL from host and port + hp := net.JoinHostPort(info.Host, strconv.Itoa(info.Port)) + url, err := url.Parse(fmt.Sprintf("nats-route://%s/", hp)) + if err != nil { + c.Errorf("Error parsing URL from INFO: %v\n", err) + c.mu.Unlock() + c.closeConnection() + return + } + c.route.url = url + } // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. @@ -75,12 +118,39 @@ func (c *client) processRouteInfo(info *Info) { c.Debugf("Registering remote route %q", info.ID) // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) + if len(info.Routes) > 0 { + s.processImplicitRoutes(info.Routes) + } } else { c.Debugf("Detected duplicate remote route %q", info.ID) c.closeConnection() } } +// This will process implicit route information from other servers. +// We will check to see if we have configured or are already connected, +// and if so we will ignore. Otherwise we will attempt to connect. +func (s *Server) processImplicitRoutes(routes []RemoteInfo) { + s.mu.Lock() + defer s.mu.Unlock() + for _, ri := range routes { + if _, exists := s.remotes[ri.RemoteID]; exists { + continue + } + // We have a new route that we do not currently know about. + // Process here and solicit a connection. + r, err := url.Parse(ri.URL) + if err != nil { + Debugf("Error parsing URL from Remote INFO: %v\n", err) + continue + } + if ri.AuthRequired { + r.User = url.UserPassword(s.opts.ClusterUsername, s.opts.ClusterPassword) + } + go s.connectToRoute(r) + } +} + // This will send local subscription state to a new route connection. // FIXME(dlc) - This could be a DOS or perf issue with many clients // and large subscription space. Plus buffering in place not a good idea. @@ -89,14 +159,11 @@ func (s *Server) sendLocalSubsToRoute(route *client) { s.mu.Lock() if s.routes[route.cid] == nil { - // We are too early, let createRoute call this function. route.mu.Lock() route.sendLocalSubs = true route.mu.Unlock() - s.mu.Unlock() - return } @@ -125,15 +192,35 @@ func (s *Server) sendLocalSubsToRoute(route *client) { func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { didSolicit := rURL != nil r := &route{didSolicit: didSolicit} + for _, route := range s.opts.Routes { + if rURL == route { + r.routeType = Explicit + } + } c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} - // Grab server variables. + // Grab server variables and clone known routes. s.mu.Lock() - info := s.routeInfoJSON - authRequired := s.routeInfo.AuthRequired - tlsRequired := s.routeInfo.TLSRequired + // copy + info := s.routeInfo + for _, r := range s.routes { + r.mu.Lock() + if r.route.url != nil { + ri := RemoteInfo{ + RemoteID: r.route.remoteID, + URL: fmt.Sprintf("%s", r.route.url), + AuthRequired: r.route.authRequired, + TLSRequired: r.route.tlsRequired, + } + info.Routes = append(info.Routes, ri) + } + r.mu.Unlock() + } s.mu.Unlock() + authRequired := info.AuthRequired + tlsRequired := info.TLSRequired + // Grab lock c.mu.Lock() @@ -202,8 +289,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { c.sendConnect(tlsRequired) } + // Add other routes in that are known to the info payload + b, _ := json.Marshal(info) + infoJSON := []byte(fmt.Sprintf(InfoProto, b)) + // Send our info to the other side. - s.sendInfo(c, info) + s.sendInfo(c, infoJSON) // Check for Auth required state for incoming connections. if authRequired && !didSolicit { @@ -418,12 +509,6 @@ func (s *Server) StartRouting() { info.AuthRequired = true } s.routeInfo = info - // Generate the info json - b, err := json.Marshal(info) - if err != nil { - Fatalf("Error marshalling Route INFO JSON: %+v\n", err) - } - s.routeInfoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) // Spin up the accept loop ch := make(chan struct{}) diff --git a/server/server.go b/server/server.go index 4422aa4d..db19ddd2 100644 --- a/server/server.go +++ b/server/server.go @@ -26,16 +26,17 @@ import ( // Info is the information sent to clients to help them understand information // about this server. type Info struct { - ID string `json:"server_id"` - Version string `json:"version"` - GoVersion string `json:"go"` - Host string `json:"host"` - Port int `json:"port"` - AuthRequired bool `json:"auth_required"` - SSLRequired bool `json:"ssl_required"` // ssl json used for older clients - TLSRequired bool `json:"tls_required"` - TLSVerify bool `json:"tls_verify"` - MaxPayload int `json:"max_payload"` + ID string `json:"server_id"` + Version string `json:"version"` + GoVersion string `json:"go"` + Host string `json:"host"` + Port int `json:"port"` + AuthRequired bool `json:"auth_required"` + SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients + TLSRequired bool `json:"tls_required"` + TLSVerify bool `json:"tls_verify"` + MaxPayload int `json:"max_payload"` + Routes []RemoteInfo `json:"routes,omitempty"` } // Server is our main struct. @@ -62,7 +63,6 @@ type Server struct { routeListener net.Listener routeInfo Info - routeInfoJSON []byte rcQuit chan bool } @@ -707,3 +707,10 @@ func (s *Server) GetListenEndpoint() string { // when the listener is started, due to the use of RANDOM_PORT return net.JoinHostPort(host, strconv.Itoa(s.opts.Port)) } + +// Server's ID +func (s *Server) Id() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.info.ID +} diff --git a/server/usage.go b/server/usage.go index e447a34b..7c46ece6 100644 --- a/server/usage.go +++ b/server/usage.go @@ -38,6 +38,7 @@ TLS Options: Cluster Options: --routes [rurl-1, rurl-2] Routes to solicit and connect + --cluster [cluster url] Cluster URL for solicited routes Common Options: -h, --help Show this message diff --git a/test/configs/auth_seed.conf b/test/configs/auth_seed.conf new file mode 100644 index 00000000..ea80850a --- /dev/null +++ b/test/configs/auth_seed.conf @@ -0,0 +1,19 @@ +# Copyright 2015 Apcera Inc. All rights reserved. + +# Cluster Seed Node + +port: 4222 +net: '127.0.0.1' + +http_port: 8222 + +cluster { + host: '127.0.0.1' + port: 4248 + + authorization { + user: ruser + password: T0PS3cr3T! + timeout: 1 + } +} diff --git a/test/configs/cluster.conf b/test/configs/cluster.conf index ca57aaa7..c9fd0eba 100644 --- a/test/configs/cluster.conf +++ b/test/configs/cluster.conf @@ -1,9 +1,9 @@ -# Copyright 2012-2013 Apcera Inc. All rights reserved. +# Copyright 2012-2015 Apcera Inc. All rights reserved. # Cluster config file +host: "127.0.0.1" port: 4242 -#net: apcera.me # net interface cluster { host: '127.0.0.1' @@ -24,4 +24,3 @@ cluster { nats-route://foo:bar@127.0.0.1:4246 ] } - diff --git a/test/configs/override.conf b/test/configs/override.conf index 6bf4339e..88d351b7 100644 --- a/test/configs/override.conf +++ b/test/configs/override.conf @@ -2,8 +2,8 @@ # Config file to test overrides to client +host: "127.0.0.1" port: 4224 # maximum payload max_payload: 2222 - diff --git a/test/configs/seed.conf b/test/configs/seed.conf new file mode 100644 index 00000000..b0bf45a6 --- /dev/null +++ b/test/configs/seed.conf @@ -0,0 +1,13 @@ +# Copyright 2015 Apcera Inc. All rights reserved. + +# Cluster Seed Node + +port: 4222 +net: '127.0.0.1' + +http_port: 8222 + +cluster { + host: '127.0.0.1' + port: 4248 +} diff --git a/test/configs/srv_a_tls.conf b/test/configs/srv_a_tls.conf index ebe122c0..fc16e5ce 100644 --- a/test/configs/srv_a_tls.conf +++ b/test/configs/srv_a_tls.conf @@ -2,6 +2,7 @@ # Cluster Server A +host: '127.0.0.1' port: 4222 cluster { diff --git a/test/configs/srv_b_tls.conf b/test/configs/srv_b_tls.conf index bcddc5ab..2a9fa707 100644 --- a/test/configs/srv_b_tls.conf +++ b/test/configs/srv_b_tls.conf @@ -2,6 +2,7 @@ # Cluster Server B +host: '127.0.0.1' port: 4224 cluster { diff --git a/test/port_test.go b/test/port_test.go index 96ec20c1..7ae65359 100644 --- a/test/port_test.go +++ b/test/port_test.go @@ -11,7 +11,7 @@ import ( ) func TestResolveRandomPort(t *testing.T) { - opts := &server.Options{Port: server.RANDOM_PORT} + opts := &server.Options{Host: "127.0.0.1", Port: server.RANDOM_PORT} s := RunServer(opts) defer s.Shutdown() diff --git a/test/route_discovery_test.go b/test/route_discovery_test.go new file mode 100644 index 00000000..a0269428 --- /dev/null +++ b/test/route_discovery_test.go @@ -0,0 +1,355 @@ +// Copyright 2015 Apcera Inc. All rights reserved. + +package test + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "runtime" + "strconv" + "testing" + "time" + + "github.com/nats-io/gnatsd/server" +) + +func runSeedServer(t *testing.T) (*server.Server, *server.Options) { + return RunServerWithConfig("./configs/seed.conf") +} + +func runAuthSeedServer(t *testing.T) (*server.Server, *server.Options) { + return RunServerWithConfig("./configs/auth_seed.conf") +} + +func TestSeedFirstRouteInfo(t *testing.T) { + s, opts := runSeedServer(t) + defer s.Shutdown() + + rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + defer rc.Close() + + _, routeExpect := setupRoute(t, rc, opts) + buf := routeExpect(infoRe) + + info := server.Info{} + if err := json.Unmarshal(buf[4:], &info); err != nil { + t.Fatalf("Could not unmarshal route info: %v", err) + } + + if len(info.Routes) != 0 { + t.Fatalf("Expected len of []Routes to be zero vs %d\n", len(info.Routes)) + } +} + +func TestSeedMultipleRouteInfo(t *testing.T) { + s, opts := runSeedServer(t) + defer s.Shutdown() + + rc1 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + defer rc1.Close() + + routeSend1, route1Expect := setupRoute(t, rc1, opts) + route1Expect(infoRe) + + rc1ID := "2222" + rc1Port := 22 + rc1Host := "127.0.0.1" + + hp1 := fmt.Sprintf("nats-route://%s/", net.JoinHostPort(rc1Host, strconv.Itoa(rc1Port))) + + // register ourselves via INFO + r1Info := server.Info{ID: rc1ID, Host: rc1Host, Port: rc1Port} + b, _ := json.Marshal(r1Info) + infoJSON := fmt.Sprintf(server.InfoProto, b) + routeSend1(infoJSON) + routeSend1("PING\r\n") + route1Expect(pongRe) + + rc2 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + defer rc2.Close() + + routeSend2, route2Expect := setupRoute(t, rc2, opts) + + rc2ID := "2224" + rc2Port := 24 + rc2Host := "127.0.0.1" + + // hp2 := net.JoinHostPort(rc2Host, strconv.Itoa(rc2Port)) + + // register ourselves via INFO + r2Info := server.Info{ID: rc2ID, Host: rc2Host, Port: rc2Port} + b, _ = json.Marshal(r2Info) + infoJSON = fmt.Sprintf(server.InfoProto, b) + routeSend2(infoJSON) + + // Now read back out the info from the seed route + buf := route2Expect(infoRe) + + info := server.Info{} + if err := json.Unmarshal(buf[4:], &info); err != nil { + t.Fatalf("Could not unmarshal route info: %v", err) + } + + if len(info.Routes) != 1 { + t.Fatalf("Expected len of []Routes to be 1 vs %d\n", len(info.Routes)) + } + + route := info.Routes[0] + if route.RemoteID != rc1ID { + t.Fatalf("Expected RemoteID of \"22\", got %q\n", route.RemoteID) + } + if route.URL == "" { + t.Fatalf("Expected a URL for the implicit route") + } + if route.URL != hp1 { + t.Fatalf("Expected URL Host of %s, got %s\n", hp1, route.URL) + } + + routeSend2("PING\r\n") + route2Expect(pongRe) + + // Now let's do a third. + rc3 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + defer rc3.Close() + + routeSend3, route3Expect := setupRoute(t, rc3, opts) + + rc3ID := "2226" + rc3Port := 26 + rc3Host := "127.0.0.1" + + // register ourselves via INFO + r3Info := server.Info{ID: rc3ID, Host: rc3Host, Port: rc3Port} + b, _ = json.Marshal(r3Info) + infoJSON = fmt.Sprintf(server.InfoProto, b) + routeSend3(infoJSON) + + // Now read back out the info from the seed route + buf = route3Expect(infoRe) + + info = server.Info{} + if err := json.Unmarshal(buf[4:], &info); err != nil { + t.Fatalf("Could not unmarshal route info: %v", err) + } + if len(info.Routes) != 2 { + t.Fatalf("Expected len of []Routes to be 2 vs %d\n", len(info.Routes)) + } +} + +func TestSeedSolicitWorks(t *testing.T) { + s1, opts := runSeedServer(t) + defer s1.Shutdown() + + // Create the routes string for others to connect to the seed. + routesStr := fmt.Sprintf("nats-route://%s:%d/", opts.ClusterHost, opts.ClusterPort) + + // Run Server #2 + s2Opts := nextServerOpts(opts) + s2Opts.Routes = server.RoutesFromStr(routesStr) + + s2 := RunServer(s2Opts) + defer s2.Shutdown() + + // Run Server #3 + s3Opts := nextServerOpts(s2Opts) + + s3 := RunServer(s3Opts) + defer s3.Shutdown() + + // Wait for a bit for graph to connect + time.Sleep(500 * time.Millisecond) + + // Grab Routez from monitor ports, make sure we are fully connected + url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort) + rz := readHttpRoutez(t, url) + ris := expectRids(t, rz, []string{s2.Id(), s3.Id()}) + if ris[s2.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + if ris[s3.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + + url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort) + rz = readHttpRoutez(t, url) + ris = expectRids(t, rz, []string{s1.Id(), s3.Id()}) + if ris[s1.Id()].IsConfigured != true { + t.Fatalf("Expected seed server to be configured\n") + } + if ris[s3.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + + url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort) + rz = readHttpRoutez(t, url) + ris = expectRids(t, rz, []string{s1.Id(), s2.Id()}) + if ris[s1.Id()].IsConfigured != true { + t.Fatalf("Expected seed server to be configured\n") + } + if ris[s2.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } +} + +func TestChainedSolicitWorks(t *testing.T) { + s1, opts := runSeedServer(t) + defer s1.Shutdown() + + // Create the routes string for others to connect to the seed. + routesStr := fmt.Sprintf("nats-route://%s:%d/", opts.ClusterHost, opts.ClusterPort) + + // Run Server #2 + s2Opts := nextServerOpts(opts) + s2Opts.Routes = server.RoutesFromStr(routesStr) + + s2 := RunServer(s2Opts) + defer s2.Shutdown() + + // Run Server #3 + s3Opts := nextServerOpts(s2Opts) + // We will have s3 connect to s2, not the seed. + routesStr = fmt.Sprintf("nats-route://%s:%d/", s2Opts.ClusterHost, s2Opts.ClusterPort) + s3Opts.Routes = server.RoutesFromStr(routesStr) + + s3 := RunServer(s3Opts) + defer s3.Shutdown() + + // Wait for a bit for graph to connect + time.Sleep(500 * time.Millisecond) + + // Grab Routez from monitor ports, make sure we are fully connected + url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort) + rz := readHttpRoutez(t, url) + ris := expectRids(t, rz, []string{s2.Id(), s3.Id()}) + if ris[s2.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + if ris[s3.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + + url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort) + rz = readHttpRoutez(t, url) + ris = expectRids(t, rz, []string{s1.Id(), s3.Id()}) + if ris[s1.Id()].IsConfigured != true { + t.Fatalf("Expected seed server to be configured\n") + } + if ris[s3.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + + url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort) + rz = readHttpRoutez(t, url) + ris = expectRids(t, rz, []string{s1.Id(), s2.Id()}) + if ris[s2.Id()].IsConfigured != true { + t.Fatalf("Expected s2 server to be configured\n") + } + if ris[s1.Id()].IsConfigured == true { + t.Fatalf("Expected seed server not to be configured\n") + } +} + +func TestAuthSeedSolicitWorks(t *testing.T) { + s1, opts := runAuthSeedServer(t) + defer s1.Shutdown() + + // Create the routes string for others to connect to the seed. + routesStr := fmt.Sprintf("nats-route://%s:%s@%s:%d/", opts.ClusterUsername, opts.ClusterPassword, opts.ClusterHost, opts.ClusterPort) + + // Run Server #2 + s2Opts := nextServerOpts(opts) + s2Opts.Routes = server.RoutesFromStr(routesStr) + + s2 := RunServer(s2Opts) + defer s2.Shutdown() + + // Run Server #3 + s3Opts := nextServerOpts(s2Opts) + + s3 := RunServer(s3Opts) + defer s3.Shutdown() + + // Wait for a bit for graph to connect + time.Sleep(500 * time.Millisecond) + + // Grab Routez from monitor ports, make sure we are fully connected + url := fmt.Sprintf("http://%s:%d/", opts.Host, opts.HTTPPort) + rz := readHttpRoutez(t, url) + ris := expectRids(t, rz, []string{s2.Id(), s3.Id()}) + if ris[s2.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + if ris[s3.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + + url = fmt.Sprintf("http://%s:%d/", s2Opts.Host, s2Opts.HTTPPort) + rz = readHttpRoutez(t, url) + ris = expectRids(t, rz, []string{s1.Id(), s3.Id()}) + if ris[s1.Id()].IsConfigured != true { + t.Fatalf("Expected seed server to be configured\n") + } + if ris[s3.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } + + url = fmt.Sprintf("http://%s:%d/", s3Opts.Host, s3Opts.HTTPPort) + rz = readHttpRoutez(t, url) + ris = expectRids(t, rz, []string{s1.Id(), s2.Id()}) + if ris[s1.Id()].IsConfigured != true { + t.Fatalf("Expected seed server to be configured\n") + } + if ris[s2.Id()].IsConfigured == true { + t.Fatalf("Expected server not to be configured\n") + } +} + +// Helper to check for correct route memberships +func expectRids(t *testing.T, rz *server.Routez, rids []string) map[string]*server.RouteInfo { + if len(rids) != rz.NumRoutes { + _, fn, line, _ := runtime.Caller(1) + t.Fatalf("[%s:%d] Expecting %d routes, got %d\n", fn, line, len(rids), rz.NumRoutes) + } + set := make(map[string]bool) + for _, v := range rids { + set[v] = true + } + // Make result map for additional checking + ri := make(map[string]*server.RouteInfo) + for _, r := range rz.Routes { + if set[r.RemoteId] != true { + _, fn, line, _ := runtime.Caller(1) + t.Fatalf("[%s:%d] Route with rid %s unexpected, expected %+v\n", fn, line, r.RemoteId, rids) + } + ri[r.RemoteId] = r + } + return ri +} + +// Helper to easily grab routez info. +func readHttpRoutez(t *testing.T, url string) *server.Routez { + resp, err := http.Get(url + "routez") + if err != nil { + t.Fatalf("Expected no error: Got %v\n", err) + } + if resp.StatusCode != 200 { + // Do one retry - FIXME(dlc) - Why does this fail when running the solicit tests b2b? + resp, _ = http.Get(url + "routez") + if resp.StatusCode != 200 { + t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode) + } + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Got an error reading the body: %v\n", err) + } + r := server.Routez{} + if err := json.Unmarshal(body, &r); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v\n", err) + } + return &r +} diff --git a/test/routes_test.go b/test/routes_test.go index b1d0e8f8..61759d45 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2014 Apcera Inc. All rights reserved. +// Copyright 2012-2015 Apcera Inc. All rights reserved. package test @@ -15,18 +15,7 @@ import ( ) func runRouteServer(t *testing.T) (*server.Server, *server.Options) { - opts, err := server.ProcessConfigFile("./configs/cluster.conf") - - // Override for running in Go routine. - opts.NoSigs = true - opts.Debug = true - opts.Trace = true - opts.NoLog = true - - if err != nil { - t.Fatalf("Error parsing config file: %v\n", err) - } - return RunServer(opts), opts + return RunServerWithConfig("./configs/cluster.conf") } func TestRouterListeningSocket(t *testing.T) { diff --git a/test/test.go b/test/test.go index cc500549..e7f20ddd 100644 --- a/test/test.go +++ b/test/test.go @@ -423,3 +423,12 @@ func checkForPubSids(t tLogger, matches [][][]byte, sids []string) { } } } + +// Helper function to generate next opts to make sure no port conflicts etc. +func nextServerOpts(opts *server.Options) *server.Options { + nopts := *opts + nopts.Port += 1 + nopts.ClusterPort += 1 + nopts.HTTPPort += 1 + return &nopts +}