diff --git a/main.go b/main.go index bd679e2c..c77b9a64 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,8 @@ TLS Options: Cluster Options: --routes Routes to solicit and connect --cluster Cluster URL for solicited routes + --no_advertise Advertise known cluster IPs to clients + Common Options: -h, --help Show this message @@ -108,6 +110,7 @@ func main() { flag.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") flag.StringVar(&opts.ClusterListenStr, "cluster", "", "Cluster url from which members can solicit routes.") flag.StringVar(&opts.ClusterListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") + flag.BoolVar(&opts.ClusterNoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") flag.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") flag.BoolVar(&opts.TLS, "tls", false, "Enable TLS.") flag.BoolVar(&opts.TLSVerify, "tlsverify", false, "Enable TLS with client verification.") diff --git a/server/opts.go b/server/opts.go index a0e61089..80f8815d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -62,6 +62,7 @@ type Options struct { ClusterTLSTimeout float64 `json:"-"` ClusterTLSConfig *tls.Config `json:"-"` ClusterListenStr string `json:"-"` + ClusterNoAdvertise bool `json:"-"` ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` @@ -310,6 +311,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.ClusterTLSConfig.ClientAuth = tls.RequireAndVerifyClientCert opts.ClusterTLSConfig.ClientCAs = opts.ClusterTLSConfig.RootCAs opts.ClusterTLSTimeout = tc.Timeout + case "no_advertise": + opts.ClusterNoAdvertise = mv.(bool) } } return nil @@ -640,6 +643,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { if flagOpts.ClusterListenStr != "" { opts.ClusterListenStr = flagOpts.ClusterListenStr } + if flagOpts.ClusterNoAdvertise { + opts.ClusterNoAdvertise = true + } if flagOpts.RoutesStr != "" { mergeRoutes(&opts, flagOpts) } diff --git a/server/opts_test.go b/server/opts_test.go index 70c1301b..70228a7b 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -168,24 +168,25 @@ func TestTLSConfigFile(t *testing.T) { func TestMergeOverrides(t *testing.T) { golden := &Options{ - Host: "localhost", - Port: 2222, - Username: "derek", - Password: "spooky", - AuthTimeout: 1.0, - Debug: true, - Trace: true, - Logtime: false, - HTTPPort: DEFAULT_HTTP_PORT, - LogFile: "/tmp/gnatsd.log", - PidFile: "/tmp/gnatsd.pid", - ProfPort: 6789, - Syslog: true, - RemoteSyslog: "udp://foo.com:33", - MaxControlLine: 2048, - MaxPayload: 65536, - MaxConn: 100, - MaxPending: 10000000, + Host: "localhost", + Port: 2222, + Username: "derek", + Password: "spooky", + AuthTimeout: 1.0, + Debug: true, + Trace: true, + Logtime: false, + HTTPPort: DEFAULT_HTTP_PORT, + LogFile: "/tmp/gnatsd.log", + PidFile: "/tmp/gnatsd.pid", + ProfPort: 6789, + Syslog: true, + RemoteSyslog: "udp://foo.com:33", + MaxControlLine: 2048, + MaxPayload: 65536, + MaxConn: 100, + MaxPending: 10000000, + ClusterNoAdvertise: true, } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { @@ -194,11 +195,12 @@ func TestMergeOverrides(t *testing.T) { // Overrides via flags opts := &Options{ - Port: 2222, - Password: "spooky", - Debug: true, - HTTPPort: DEFAULT_HTTP_PORT, - ProfPort: 6789, + Port: 2222, + Password: "spooky", + Debug: true, + HTTPPort: DEFAULT_HTTP_PORT, + ProfPort: 6789, + ClusterNoAdvertise: true, } merged := MergeOptions(fopts, opts) diff --git a/server/route.go b/server/route.go index a3df2c89..8eaa37b5 100644 --- a/server/route.go +++ b/server/route.go @@ -154,7 +154,7 @@ func (c *client) processRouteInfo(info *Info) { s.forwardNewRouteInfoToKnownServers(info) } // If the server Info did not have these URLs, update and send an INFO - // protocol to all clients that support it. + // protocol to all clients that support it (unless the feature is disabled). if s.updateServerINFO(info.ClientConnectURLs) { s.sendAsyncInfoToClients() } diff --git a/server/server.go b/server/server.go index 890c884c..89333287 100644 --- a/server/server.go +++ b/server/server.go @@ -627,12 +627,18 @@ func (s *Server) createClient(conn net.Conn) *client { // updateServerINFO updates the server's Info object with the given // array of URLs and re-generate the infoJSON byte array, only if the -// given URLs were not already recorded. +// given URLs were not already recorded and if the feature is not +// disabled. // Returns a boolean indicating if server's Info was updated. func (s *Server) updateServerINFO(urls []string) bool { s.mu.Lock() defer s.mu.Unlock() + // Feature disabled, do not update. + if s.opts.ClusterNoAdvertise { + return false + } + // Will be set to true if we alter the server's Info object. wasUpdated := false for _, url := range urls { diff --git a/test/routes_test.go b/test/routes_test.go index 497ee8cb..78737af9 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -662,164 +662,197 @@ func TestRouteConnectOnShutdownRace(t *testing.T) { } func TestRouteSendAsyncINFOToClients(t *testing.T) { - s, opts := runRouteServer(t) - defer s.Shutdown() + f := func(opts *server.Options) { + s := RunServer(opts) + defer s.Shutdown() - clientURL := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) + clientURL := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) - oldClient := createClientConn(t, opts.Host, opts.Port) - defer oldClient.Close() + oldClient := createClientConn(t, opts.Host, opts.Port) + defer oldClient.Close() - oldClientSend, oldClientExpect := setupConn(t, oldClient) - oldClientSend("PING\r\n") - oldClientExpect(pongRe) + oldClientSend, oldClientExpect := setupConn(t, oldClient) + oldClientSend("PING\r\n") + oldClientExpect(pongRe) - newClient := createClientConn(t, opts.Host, opts.Port) - defer newClient.Close() + newClient := createClientConn(t, opts.Host, opts.Port) + defer newClient.Close() - newClientSend, newClientExpect := setupConnWithProto(t, newClient, clientProtoInfo) - newClientSend("PING\r\n") - newClientExpect(pongRe) + newClientSend, newClientExpect := setupConnWithProto(t, newClient, clientProtoInfo) + newClientSend("PING\r\n") + newClientExpect(pongRe) - // Check that even a new client does not receive an async INFO at this point - // since there is no route created yet. - expectNothing(t, newClient) + // Check that even a new client does not receive an async INFO at this point + // since there is no route created yet. + expectNothing(t, newClient) - routeID := "Server-B" + routeID := "Server-B" - createRoute := func() (net.Conn, sendFun, expectFun) { - rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) - routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID) + createRoute := func() (net.Conn, sendFun, expectFun) { + rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID) - buf := routeExpect(infoRe) - info := server.Info{} - if err := json.Unmarshal(buf[4:], &info); err != nil { - t.Fatalf("Could not unmarshal route info: %v", err) - } - if len(info.ClientConnectURLs) == 0 { - t.Fatal("Expected a list of URLs, got none") - } - if info.ClientConnectURLs[0] != clientURL { - t.Fatalf("Expected ClientConnectURLs to be %q, got %q", clientURL, info.ClientConnectURLs[0]) + buf := routeExpect(infoRe) + info := server.Info{} + if err := json.Unmarshal(buf[4:], &info); err != nil { + t.Fatalf("Could not unmarshal route info: %v", err) + } + if len(info.ClientConnectURLs) == 0 { + t.Fatal("Expected a list of URLs, got none") + } + if info.ClientConnectURLs[0] != clientURL { + t.Fatalf("Expected ClientConnectURLs to be %q, got %q", clientURL, info.ClientConnectURLs[0]) + } + + return rc, routeSend, routeExpect } - return rc, routeSend, routeExpect - } + sendRouteINFO := func(routeSend sendFun, routeExpect expectFun, urls []string) { + routeInfo := server.Info{} + routeInfo.ID = routeID + routeInfo.Host = "localhost" + routeInfo.Port = 5222 + routeInfo.ClientConnectURLs = urls + b, err := json.Marshal(routeInfo) + if err != nil { + t.Fatalf("Could not marshal test route info: %v", err) + } + infoJSON := fmt.Sprintf("INFO %s\r\n", b) + routeSend(infoJSON) + routeSend("PING\r\n") + routeExpect(pongRe) + } - sendRouteINFO := func(routeSend sendFun, routeExpect expectFun, urls []string) { - routeInfo := server.Info{} - routeInfo.ID = routeID - routeInfo.Host = "localhost" - routeInfo.Port = 5222 - routeInfo.ClientConnectURLs = urls - b, err := json.Marshal(routeInfo) + checkINFOReceived := func(client net.Conn, clientExpect expectFun, expectedURLs []string) { + if opts.ClusterNoAdvertise { + expectNothing(t, client) + return + } + buf := clientExpect(infoRe) + info := server.Info{} + if err := json.Unmarshal(buf[4:], &info); err != nil { + t.Fatalf("Could not unmarshal route info: %v", err) + } + if !reflect.DeepEqual(info.ClientConnectURLs, expectedURLs) { + t.Fatalf("Expected ClientConnectURLs to be %v, got %v", expectedURLs, info.ClientConnectURLs) + } + } + + // Create a route + rc, routeSend, routeExpect := createRoute() + defer rc.Close() + + // Send an INFO with single URL + routeConnectURLs := []string{"localhost:5222"} + sendRouteINFO(routeSend, routeExpect, routeConnectURLs) + + // Expect nothing for old clients + expectNothing(t, oldClient) + + // Expect new client to receive an INFO (unless disabled) + checkINFOReceived(newClient, newClientExpect, routeConnectURLs) + + // Disconnect and reconnect the route. + rc.Close() + rc, routeSend, routeExpect = createRoute() + defer rc.Close() + + // Resend the same route INFO json, since there is no new URL, + // no client should receive an INFO + sendRouteINFO(routeSend, routeExpect, routeConnectURLs) + + // Expect nothing for old clients + expectNothing(t, oldClient) + + // Expect nothing for new clients as well (no real update) + expectNothing(t, newClient) + + // Now stop the route and restart with an additional URL + rc.Close() + rc, routeSend, routeExpect = createRoute() + defer rc.Close() + + // Create a client not sending the CONNECT until after route is added + clientNoConnect := createClientConn(t, opts.Host, opts.Port) + defer clientNoConnect.Close() + + // Create a client that does not send the first PING yet + clientNoPing := createClientConn(t, opts.Host, opts.Port) + defer clientNoPing.Close() + clientNoPingSend, clientNoPingExpect := setupConnWithProto(t, clientNoPing, clientProtoInfo) + + // The route now has an additional URL + routeConnectURLs = append(routeConnectURLs, "localhost:7777") + // This causes the server to add the route and send INFO to clients + sendRouteINFO(routeSend, routeExpect, routeConnectURLs) + + // Expect nothing for old clients + expectNothing(t, oldClient) + + // Expect new client to receive an INFO, and verify content as expected. + checkINFOReceived(newClient, newClientExpect, routeConnectURLs) + + // Expect nothing yet for client that did not send the PING + expectNothing(t, clientNoPing) + + // Now send the first PING + clientNoPingSend("PING\r\n") + // Should receive PONG followed by INFO + // Receive PONG only first + pongBuf := make([]byte, len("PONG\r\n")) + clientNoPing.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, err := clientNoPing.Read(pongBuf) + clientNoPing.SetReadDeadline(time.Time{}) + if n <= 0 && err != nil { + t.Fatalf("Error reading from conn: %v\n", err) + } + if !pongRe.Match(pongBuf) { + t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe) + } + checkINFOReceived(clientNoPing, clientNoPingExpect, routeConnectURLs) + + // Have the client that did not send the connect do it now + clientNoConnectSend, clientNoConnectExpect := setupConnWithProto(t, clientNoConnect, clientProtoInfo) + // Send the PING + clientNoConnectSend("PING\r\n") + // Should receive PONG followed by INFO + // Receive PONG only first + clientNoConnect.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, err = clientNoConnect.Read(pongBuf) + clientNoConnect.SetReadDeadline(time.Time{}) + if n <= 0 && err != nil { + t.Fatalf("Error reading from conn: %v\n", err) + } + if !pongRe.Match(pongBuf) { + t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe) + } + checkINFOReceived(clientNoConnect, clientNoConnectExpect, routeConnectURLs) + + // Create a client connection and verify content of initial INFO contains array + // (but empty if no advertise option is set) + cli := createClientConn(t, opts.Host, opts.Port) + defer cli.Close() + buf := expectResult(t, cli, infoRe) + js := infoRe.FindAllSubmatch(buf, 1)[0][1] + var sinfo server.Info + err = json.Unmarshal(js, &sinfo) if err != nil { - t.Fatalf("Could not marshal test route info: %v", err) + t.Fatalf("Could not unmarshal INFO json: %v\n", err) } - infoJSON := fmt.Sprintf("INFO %s\r\n", b) - routeSend(infoJSON) - routeSend("PING\r\n") - routeExpect(pongRe) - } - - checkINFOReceived := func(clientExpect expectFun, expectedURLs []string) { - buf := clientExpect(infoRe) - info := server.Info{} - if err := json.Unmarshal(buf[4:], &info); err != nil { - t.Fatalf("Could not unmarshal route info: %v", err) - } - if !reflect.DeepEqual(info.ClientConnectURLs, expectedURLs) { - t.Fatalf("Expected ClientConnectURLs to be %v, got %v", expectedURLs, info.ClientConnectURLs) + if opts.ClusterNoAdvertise { + if len(sinfo.ClientConnectURLs) != 0 { + t.Fatalf("Expected ClientConnectURLs to be empty, got %v", sinfo.ClientConnectURLs) + } + } else if !reflect.DeepEqual(sinfo.ClientConnectURLs, routeConnectURLs) { + t.Fatalf("Expected ClientConnectURLs to be %v, got %v", routeConnectURLs, sinfo.ClientConnectURLs) } } - // Create a route - rc, routeSend, routeExpect := createRoute() - defer rc.Close() - - // Send an INFO with single URL - routeConnectURLs := []string{"localhost:5222"} - sendRouteINFO(routeSend, routeExpect, routeConnectURLs) - - // Expect nothing for old clients - expectNothing(t, oldClient) - - // Expect new client to receive an INFO - checkINFOReceived(newClientExpect, routeConnectURLs) - - // Disconnect and reconnect the route. - rc.Close() - rc, routeSend, routeExpect = createRoute() - defer rc.Close() - - // Resend the same route INFO json, since there is no new URL, - // no client should receive an INFO - sendRouteINFO(routeSend, routeExpect, routeConnectURLs) - - // Expect nothing for old clients - expectNothing(t, oldClient) - - // Expect nothing for new clients as well (no real update) - expectNothing(t, newClient) - - // Now stop the route and restart with an additional URL - rc.Close() - rc, routeSend, routeExpect = createRoute() - defer rc.Close() - - // Create a client not sending the CONNECT until after route is added - clientNoConnect := createClientConn(t, opts.Host, opts.Port) - defer clientNoConnect.Close() - - // Create a client that does not send the first PING yet - clientNoPing := createClientConn(t, opts.Host, opts.Port) - defer clientNoPing.Close() - clientNoPingSend, clientNoPingExpect := setupConnWithProto(t, clientNoPing, clientProtoInfo) - - // The route now has an additional URL - routeConnectURLs = append(routeConnectURLs, "localhost:7777") - // This causes the server to add the route and send INFO to clients - sendRouteINFO(routeSend, routeExpect, routeConnectURLs) - - // Expect nothing for old clients - expectNothing(t, oldClient) - - // Expect new client to receive an INFO, and verify content as expected. - checkINFOReceived(newClientExpect, routeConnectURLs) - - // Expect nothing yet for client that did not send the PING - expectNothing(t, clientNoPing) - - // Now send the first PING - clientNoPingSend("PING\r\n") - // Should receive PONG followed by INFO - // Receive PONG only first - pongBuf := make([]byte, len("PONG\r\n")) - clientNoPing.SetReadDeadline(time.Now().Add(2 * time.Second)) - n, err := clientNoPing.Read(pongBuf) - clientNoPing.SetReadDeadline(time.Time{}) - if n <= 0 && err != nil { - t.Fatalf("Error reading from conn: %v\n", err) + opts := LoadConfig("./configs/cluster.conf") + for i := 0; i < 2; i++ { + if i == 1 { + opts.ClusterNoAdvertise = true + } + f(opts) } - if !pongRe.Match(pongBuf) { - t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe) - } - checkINFOReceived(clientNoPingExpect, routeConnectURLs) - - // Have the client that did not send the connect do it now - clientNoConnectSend, clientNoConnectExpect := setupConnWithProto(t, clientNoConnect, clientProtoInfo) - // Send the PING - clientNoConnectSend("PING\r\n") - // Should receive PONG followed by INFO - // Receive PONG only first - clientNoConnect.SetReadDeadline(time.Now().Add(2 * time.Second)) - n, err = clientNoConnect.Read(pongBuf) - clientNoConnect.SetReadDeadline(time.Time{}) - if n <= 0 && err != nil { - t.Fatalf("Error reading from conn: %v\n", err) - } - if !pongRe.Match(pongBuf) { - t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe) - } - checkINFOReceived(clientNoConnectExpect, routeConnectURLs) }