mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[ADDED] Option to not advertise to clients cluster's IPs
By default, a server is now sending to its clients the client URLs of all servers in the cluster. This allows clients to be able to reconnect to any server in the cluster even if those clients were not configured with the list of servers in the cluster. However, there may be cases where it would make sense to disable this feature. This now can be done with this option/command line parameter. Resolves #322
This commit is contained in:
3
main.go
3
main.go
@@ -50,6 +50,8 @@ TLS Options:
|
||||
Cluster Options:
|
||||
--routes <rurl-1, rurl-2> Routes to solicit and connect
|
||||
--cluster <cluster-url> Cluster URL for solicited routes
|
||||
--no_advertise <bool> Advertise known cluster IPs to clients
|
||||
|
||||
|
||||
Common Options:
|
||||
-h, --help Show this message
|
||||
@@ -108,6 +110,7 @@ func main() {
|
||||
flag.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")
|
||||
flag.StringVar(&opts.ClusterListenStr, "cluster", "", "Cluster url from which members can solicit routes.")
|
||||
flag.StringVar(&opts.ClusterListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.")
|
||||
flag.BoolVar(&opts.ClusterNoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.")
|
||||
flag.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.")
|
||||
flag.BoolVar(&opts.TLS, "tls", false, "Enable TLS.")
|
||||
flag.BoolVar(&opts.TLSVerify, "tlsverify", false, "Enable TLS with client verification.")
|
||||
|
||||
@@ -62,6 +62,7 @@ type Options struct {
|
||||
ClusterTLSTimeout float64 `json:"-"`
|
||||
ClusterTLSConfig *tls.Config `json:"-"`
|
||||
ClusterListenStr string `json:"-"`
|
||||
ClusterNoAdvertise bool `json:"-"`
|
||||
ProfPort int `json:"-"`
|
||||
PidFile string `json:"-"`
|
||||
LogFile string `json:"-"`
|
||||
@@ -310,6 +311,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
|
||||
opts.ClusterTLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
opts.ClusterTLSConfig.ClientCAs = opts.ClusterTLSConfig.RootCAs
|
||||
opts.ClusterTLSTimeout = tc.Timeout
|
||||
case "no_advertise":
|
||||
opts.ClusterNoAdvertise = mv.(bool)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -640,6 +643,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
|
||||
if flagOpts.ClusterListenStr != "" {
|
||||
opts.ClusterListenStr = flagOpts.ClusterListenStr
|
||||
}
|
||||
if flagOpts.ClusterNoAdvertise {
|
||||
opts.ClusterNoAdvertise = true
|
||||
}
|
||||
if flagOpts.RoutesStr != "" {
|
||||
mergeRoutes(&opts, flagOpts)
|
||||
}
|
||||
|
||||
@@ -168,24 +168,25 @@ func TestTLSConfigFile(t *testing.T) {
|
||||
|
||||
func TestMergeOverrides(t *testing.T) {
|
||||
golden := &Options{
|
||||
Host: "localhost",
|
||||
Port: 2222,
|
||||
Username: "derek",
|
||||
Password: "spooky",
|
||||
AuthTimeout: 1.0,
|
||||
Debug: true,
|
||||
Trace: true,
|
||||
Logtime: false,
|
||||
HTTPPort: DEFAULT_HTTP_PORT,
|
||||
LogFile: "/tmp/gnatsd.log",
|
||||
PidFile: "/tmp/gnatsd.pid",
|
||||
ProfPort: 6789,
|
||||
Syslog: true,
|
||||
RemoteSyslog: "udp://foo.com:33",
|
||||
MaxControlLine: 2048,
|
||||
MaxPayload: 65536,
|
||||
MaxConn: 100,
|
||||
MaxPending: 10000000,
|
||||
Host: "localhost",
|
||||
Port: 2222,
|
||||
Username: "derek",
|
||||
Password: "spooky",
|
||||
AuthTimeout: 1.0,
|
||||
Debug: true,
|
||||
Trace: true,
|
||||
Logtime: false,
|
||||
HTTPPort: DEFAULT_HTTP_PORT,
|
||||
LogFile: "/tmp/gnatsd.log",
|
||||
PidFile: "/tmp/gnatsd.pid",
|
||||
ProfPort: 6789,
|
||||
Syslog: true,
|
||||
RemoteSyslog: "udp://foo.com:33",
|
||||
MaxControlLine: 2048,
|
||||
MaxPayload: 65536,
|
||||
MaxConn: 100,
|
||||
MaxPending: 10000000,
|
||||
ClusterNoAdvertise: true,
|
||||
}
|
||||
fopts, err := ProcessConfigFile("./configs/test.conf")
|
||||
if err != nil {
|
||||
@@ -194,11 +195,12 @@ func TestMergeOverrides(t *testing.T) {
|
||||
|
||||
// Overrides via flags
|
||||
opts := &Options{
|
||||
Port: 2222,
|
||||
Password: "spooky",
|
||||
Debug: true,
|
||||
HTTPPort: DEFAULT_HTTP_PORT,
|
||||
ProfPort: 6789,
|
||||
Port: 2222,
|
||||
Password: "spooky",
|
||||
Debug: true,
|
||||
HTTPPort: DEFAULT_HTTP_PORT,
|
||||
ProfPort: 6789,
|
||||
ClusterNoAdvertise: true,
|
||||
}
|
||||
merged := MergeOptions(fopts, opts)
|
||||
|
||||
|
||||
@@ -154,7 +154,7 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
s.forwardNewRouteInfoToKnownServers(info)
|
||||
}
|
||||
// If the server Info did not have these URLs, update and send an INFO
|
||||
// protocol to all clients that support it.
|
||||
// protocol to all clients that support it (unless the feature is disabled).
|
||||
if s.updateServerINFO(info.ClientConnectURLs) {
|
||||
s.sendAsyncInfoToClients()
|
||||
}
|
||||
|
||||
@@ -627,12 +627,18 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
|
||||
// updateServerINFO updates the server's Info object with the given
|
||||
// array of URLs and re-generate the infoJSON byte array, only if the
|
||||
// given URLs were not already recorded.
|
||||
// given URLs were not already recorded and if the feature is not
|
||||
// disabled.
|
||||
// Returns a boolean indicating if server's Info was updated.
|
||||
func (s *Server) updateServerINFO(urls []string) bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Feature disabled, do not update.
|
||||
if s.opts.ClusterNoAdvertise {
|
||||
return false
|
||||
}
|
||||
|
||||
// Will be set to true if we alter the server's Info object.
|
||||
wasUpdated := false
|
||||
for _, url := range urls {
|
||||
|
||||
@@ -662,164 +662,197 @@ func TestRouteConnectOnShutdownRace(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
s, opts := runRouteServer(t)
|
||||
defer s.Shutdown()
|
||||
f := func(opts *server.Options) {
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
clientURL := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
|
||||
clientURL := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
|
||||
|
||||
oldClient := createClientConn(t, opts.Host, opts.Port)
|
||||
defer oldClient.Close()
|
||||
oldClient := createClientConn(t, opts.Host, opts.Port)
|
||||
defer oldClient.Close()
|
||||
|
||||
oldClientSend, oldClientExpect := setupConn(t, oldClient)
|
||||
oldClientSend("PING\r\n")
|
||||
oldClientExpect(pongRe)
|
||||
oldClientSend, oldClientExpect := setupConn(t, oldClient)
|
||||
oldClientSend("PING\r\n")
|
||||
oldClientExpect(pongRe)
|
||||
|
||||
newClient := createClientConn(t, opts.Host, opts.Port)
|
||||
defer newClient.Close()
|
||||
newClient := createClientConn(t, opts.Host, opts.Port)
|
||||
defer newClient.Close()
|
||||
|
||||
newClientSend, newClientExpect := setupConnWithProto(t, newClient, clientProtoInfo)
|
||||
newClientSend("PING\r\n")
|
||||
newClientExpect(pongRe)
|
||||
newClientSend, newClientExpect := setupConnWithProto(t, newClient, clientProtoInfo)
|
||||
newClientSend("PING\r\n")
|
||||
newClientExpect(pongRe)
|
||||
|
||||
// Check that even a new client does not receive an async INFO at this point
|
||||
// since there is no route created yet.
|
||||
expectNothing(t, newClient)
|
||||
// Check that even a new client does not receive an async INFO at this point
|
||||
// since there is no route created yet.
|
||||
expectNothing(t, newClient)
|
||||
|
||||
routeID := "Server-B"
|
||||
routeID := "Server-B"
|
||||
|
||||
createRoute := func() (net.Conn, sendFun, expectFun) {
|
||||
rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID)
|
||||
createRoute := func() (net.Conn, sendFun, expectFun) {
|
||||
rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID)
|
||||
|
||||
buf := routeExpect(infoRe)
|
||||
info := server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
}
|
||||
if len(info.ClientConnectURLs) == 0 {
|
||||
t.Fatal("Expected a list of URLs, got none")
|
||||
}
|
||||
if info.ClientConnectURLs[0] != clientURL {
|
||||
t.Fatalf("Expected ClientConnectURLs to be %q, got %q", clientURL, info.ClientConnectURLs[0])
|
||||
buf := routeExpect(infoRe)
|
||||
info := server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
}
|
||||
if len(info.ClientConnectURLs) == 0 {
|
||||
t.Fatal("Expected a list of URLs, got none")
|
||||
}
|
||||
if info.ClientConnectURLs[0] != clientURL {
|
||||
t.Fatalf("Expected ClientConnectURLs to be %q, got %q", clientURL, info.ClientConnectURLs[0])
|
||||
}
|
||||
|
||||
return rc, routeSend, routeExpect
|
||||
}
|
||||
|
||||
return rc, routeSend, routeExpect
|
||||
}
|
||||
sendRouteINFO := func(routeSend sendFun, routeExpect expectFun, urls []string) {
|
||||
routeInfo := server.Info{}
|
||||
routeInfo.ID = routeID
|
||||
routeInfo.Host = "localhost"
|
||||
routeInfo.Port = 5222
|
||||
routeInfo.ClientConnectURLs = urls
|
||||
b, err := json.Marshal(routeInfo)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not marshal test route info: %v", err)
|
||||
}
|
||||
infoJSON := fmt.Sprintf("INFO %s\r\n", b)
|
||||
routeSend(infoJSON)
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
}
|
||||
|
||||
sendRouteINFO := func(routeSend sendFun, routeExpect expectFun, urls []string) {
|
||||
routeInfo := server.Info{}
|
||||
routeInfo.ID = routeID
|
||||
routeInfo.Host = "localhost"
|
||||
routeInfo.Port = 5222
|
||||
routeInfo.ClientConnectURLs = urls
|
||||
b, err := json.Marshal(routeInfo)
|
||||
checkINFOReceived := func(client net.Conn, clientExpect expectFun, expectedURLs []string) {
|
||||
if opts.ClusterNoAdvertise {
|
||||
expectNothing(t, client)
|
||||
return
|
||||
}
|
||||
buf := clientExpect(infoRe)
|
||||
info := server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(info.ClientConnectURLs, expectedURLs) {
|
||||
t.Fatalf("Expected ClientConnectURLs to be %v, got %v", expectedURLs, info.ClientConnectURLs)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a route
|
||||
rc, routeSend, routeExpect := createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
// Send an INFO with single URL
|
||||
routeConnectURLs := []string{"localhost:5222"}
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO (unless disabled)
|
||||
checkINFOReceived(newClient, newClientExpect, routeConnectURLs)
|
||||
|
||||
// Disconnect and reconnect the route.
|
||||
rc.Close()
|
||||
rc, routeSend, routeExpect = createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
// Resend the same route INFO json, since there is no new URL,
|
||||
// no client should receive an INFO
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect nothing for new clients as well (no real update)
|
||||
expectNothing(t, newClient)
|
||||
|
||||
// Now stop the route and restart with an additional URL
|
||||
rc.Close()
|
||||
rc, routeSend, routeExpect = createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
// Create a client not sending the CONNECT until after route is added
|
||||
clientNoConnect := createClientConn(t, opts.Host, opts.Port)
|
||||
defer clientNoConnect.Close()
|
||||
|
||||
// Create a client that does not send the first PING yet
|
||||
clientNoPing := createClientConn(t, opts.Host, opts.Port)
|
||||
defer clientNoPing.Close()
|
||||
clientNoPingSend, clientNoPingExpect := setupConnWithProto(t, clientNoPing, clientProtoInfo)
|
||||
|
||||
// The route now has an additional URL
|
||||
routeConnectURLs = append(routeConnectURLs, "localhost:7777")
|
||||
// This causes the server to add the route and send INFO to clients
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO, and verify content as expected.
|
||||
checkINFOReceived(newClient, newClientExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing yet for client that did not send the PING
|
||||
expectNothing(t, clientNoPing)
|
||||
|
||||
// Now send the first PING
|
||||
clientNoPingSend("PING\r\n")
|
||||
// Should receive PONG followed by INFO
|
||||
// Receive PONG only first
|
||||
pongBuf := make([]byte, len("PONG\r\n"))
|
||||
clientNoPing.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
n, err := clientNoPing.Read(pongBuf)
|
||||
clientNoPing.SetReadDeadline(time.Time{})
|
||||
if n <= 0 && err != nil {
|
||||
t.Fatalf("Error reading from conn: %v\n", err)
|
||||
}
|
||||
if !pongRe.Match(pongBuf) {
|
||||
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe)
|
||||
}
|
||||
checkINFOReceived(clientNoPing, clientNoPingExpect, routeConnectURLs)
|
||||
|
||||
// Have the client that did not send the connect do it now
|
||||
clientNoConnectSend, clientNoConnectExpect := setupConnWithProto(t, clientNoConnect, clientProtoInfo)
|
||||
// Send the PING
|
||||
clientNoConnectSend("PING\r\n")
|
||||
// Should receive PONG followed by INFO
|
||||
// Receive PONG only first
|
||||
clientNoConnect.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
n, err = clientNoConnect.Read(pongBuf)
|
||||
clientNoConnect.SetReadDeadline(time.Time{})
|
||||
if n <= 0 && err != nil {
|
||||
t.Fatalf("Error reading from conn: %v\n", err)
|
||||
}
|
||||
if !pongRe.Match(pongBuf) {
|
||||
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe)
|
||||
}
|
||||
checkINFOReceived(clientNoConnect, clientNoConnectExpect, routeConnectURLs)
|
||||
|
||||
// Create a client connection and verify content of initial INFO contains array
|
||||
// (but empty if no advertise option is set)
|
||||
cli := createClientConn(t, opts.Host, opts.Port)
|
||||
defer cli.Close()
|
||||
buf := expectResult(t, cli, infoRe)
|
||||
js := infoRe.FindAllSubmatch(buf, 1)[0][1]
|
||||
var sinfo server.Info
|
||||
err = json.Unmarshal(js, &sinfo)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not marshal test route info: %v", err)
|
||||
t.Fatalf("Could not unmarshal INFO json: %v\n", err)
|
||||
}
|
||||
infoJSON := fmt.Sprintf("INFO %s\r\n", b)
|
||||
routeSend(infoJSON)
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
}
|
||||
|
||||
checkINFOReceived := func(clientExpect expectFun, expectedURLs []string) {
|
||||
buf := clientExpect(infoRe)
|
||||
info := server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(info.ClientConnectURLs, expectedURLs) {
|
||||
t.Fatalf("Expected ClientConnectURLs to be %v, got %v", expectedURLs, info.ClientConnectURLs)
|
||||
if opts.ClusterNoAdvertise {
|
||||
if len(sinfo.ClientConnectURLs) != 0 {
|
||||
t.Fatalf("Expected ClientConnectURLs to be empty, got %v", sinfo.ClientConnectURLs)
|
||||
}
|
||||
} else if !reflect.DeepEqual(sinfo.ClientConnectURLs, routeConnectURLs) {
|
||||
t.Fatalf("Expected ClientConnectURLs to be %v, got %v", routeConnectURLs, sinfo.ClientConnectURLs)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a route
|
||||
rc, routeSend, routeExpect := createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
// Send an INFO with single URL
|
||||
routeConnectURLs := []string{"localhost:5222"}
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO
|
||||
checkINFOReceived(newClientExpect, routeConnectURLs)
|
||||
|
||||
// Disconnect and reconnect the route.
|
||||
rc.Close()
|
||||
rc, routeSend, routeExpect = createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
// Resend the same route INFO json, since there is no new URL,
|
||||
// no client should receive an INFO
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect nothing for new clients as well (no real update)
|
||||
expectNothing(t, newClient)
|
||||
|
||||
// Now stop the route and restart with an additional URL
|
||||
rc.Close()
|
||||
rc, routeSend, routeExpect = createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
// Create a client not sending the CONNECT until after route is added
|
||||
clientNoConnect := createClientConn(t, opts.Host, opts.Port)
|
||||
defer clientNoConnect.Close()
|
||||
|
||||
// Create a client that does not send the first PING yet
|
||||
clientNoPing := createClientConn(t, opts.Host, opts.Port)
|
||||
defer clientNoPing.Close()
|
||||
clientNoPingSend, clientNoPingExpect := setupConnWithProto(t, clientNoPing, clientProtoInfo)
|
||||
|
||||
// The route now has an additional URL
|
||||
routeConnectURLs = append(routeConnectURLs, "localhost:7777")
|
||||
// This causes the server to add the route and send INFO to clients
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO, and verify content as expected.
|
||||
checkINFOReceived(newClientExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing yet for client that did not send the PING
|
||||
expectNothing(t, clientNoPing)
|
||||
|
||||
// Now send the first PING
|
||||
clientNoPingSend("PING\r\n")
|
||||
// Should receive PONG followed by INFO
|
||||
// Receive PONG only first
|
||||
pongBuf := make([]byte, len("PONG\r\n"))
|
||||
clientNoPing.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
n, err := clientNoPing.Read(pongBuf)
|
||||
clientNoPing.SetReadDeadline(time.Time{})
|
||||
if n <= 0 && err != nil {
|
||||
t.Fatalf("Error reading from conn: %v\n", err)
|
||||
opts := LoadConfig("./configs/cluster.conf")
|
||||
for i := 0; i < 2; i++ {
|
||||
if i == 1 {
|
||||
opts.ClusterNoAdvertise = true
|
||||
}
|
||||
f(opts)
|
||||
}
|
||||
if !pongRe.Match(pongBuf) {
|
||||
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe)
|
||||
}
|
||||
checkINFOReceived(clientNoPingExpect, routeConnectURLs)
|
||||
|
||||
// Have the client that did not send the connect do it now
|
||||
clientNoConnectSend, clientNoConnectExpect := setupConnWithProto(t, clientNoConnect, clientProtoInfo)
|
||||
// Send the PING
|
||||
clientNoConnectSend("PING\r\n")
|
||||
// Should receive PONG followed by INFO
|
||||
// Receive PONG only first
|
||||
clientNoConnect.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
n, err = clientNoConnect.Read(pongBuf)
|
||||
clientNoConnect.SetReadDeadline(time.Time{})
|
||||
if n <= 0 && err != nil {
|
||||
t.Fatalf("Error reading from conn: %v\n", err)
|
||||
}
|
||||
if !pongRe.Match(pongBuf) {
|
||||
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe)
|
||||
}
|
||||
checkINFOReceived(clientNoConnectExpect, routeConnectURLs)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user