mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge pull request #626 from nats-io/notify_clients_when_routes_go_away
[ADDED] Notification to clients when servers leave the cluster
This commit is contained in:
@@ -1326,17 +1326,29 @@ func (c *client) closeConnection() {
|
||||
var (
|
||||
routeClosed bool
|
||||
retryImplicit bool
|
||||
connectURLs []string
|
||||
)
|
||||
if c.route != nil {
|
||||
routeClosed = c.route.closed
|
||||
if !routeClosed {
|
||||
retryImplicit = c.route.retry
|
||||
}
|
||||
connectURLs = c.route.connectURLs
|
||||
}
|
||||
|
||||
c.mu.Unlock()
|
||||
|
||||
if srv != nil {
|
||||
// This is a route that disconnected...
|
||||
if len(connectURLs) > 0 {
|
||||
// Unless disabled, possibly update the server's INFO protcol
|
||||
// and send to clients that know how to handle async INFOs.
|
||||
if !srv.getOpts().Cluster.NoAdvertise {
|
||||
srv.removeClientConnectURLs(connectURLs)
|
||||
srv.sendAsyncInfoToClients()
|
||||
}
|
||||
}
|
||||
|
||||
// Unregister
|
||||
srv.removeClient(c)
|
||||
|
||||
|
||||
@@ -237,6 +237,11 @@ func (c *clusterOption) Apply(server *Server) {
|
||||
server.routeInfo.SSLRequired = tlsRequired
|
||||
server.routeInfo.TLSVerify = tlsRequired
|
||||
server.routeInfo.AuthRequired = c.newValue.Username != ""
|
||||
if c.newValue.NoAdvertise {
|
||||
server.routeInfo.ClientConnectURLs = nil
|
||||
} else {
|
||||
server.routeInfo.ClientConnectURLs = server.clientConnectURLs
|
||||
}
|
||||
server.setRouteInfoHostPortAndIP()
|
||||
server.mu.Unlock()
|
||||
server.Noticef("Reloaded: cluster")
|
||||
|
||||
@@ -1374,6 +1374,15 @@ func TestConfigReloadClusterRoutes(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func reloadUpdateConfig(t *testing.T, s *Server, conf, content string) {
|
||||
if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil {
|
||||
stackFatalf(t, "Error creating config file: %v", err)
|
||||
}
|
||||
if err := s.Reload(); err != nil {
|
||||
stackFatalf(t, "Error on reload: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigReloadClusterAdvertise(t *testing.T) {
|
||||
conf := "routeadv.conf"
|
||||
if err := ioutil.WriteFile(conf, []byte(`
|
||||
@@ -1395,15 +1404,6 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
|
||||
|
||||
orgClusterPort := s.ClusterAddr().Port
|
||||
|
||||
updateConfig := func(content string) {
|
||||
if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil {
|
||||
stackFatalf(t, "Error creating config file: %v", err)
|
||||
}
|
||||
if err := s.Reload(); err != nil {
|
||||
stackFatalf(t, "Error on reload: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
verify := func(expectedHost string, expectedPort int, expectedIP string) {
|
||||
s.mu.Lock()
|
||||
routeInfo := s.routeInfo
|
||||
@@ -1424,7 +1424,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
|
||||
}
|
||||
|
||||
// Update config with cluster_advertise
|
||||
updateConfig(`
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
cluster: {
|
||||
listen: "0.0.0.0:-1"
|
||||
@@ -1434,7 +1434,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
|
||||
verify("me", 1, "nats-route://me:1/")
|
||||
|
||||
// Update config with cluster_advertise (no port specified)
|
||||
updateConfig(`
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
cluster: {
|
||||
listen: "0.0.0.0:-1"
|
||||
@@ -1444,7 +1444,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
|
||||
verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
|
||||
|
||||
// Update config with cluster_advertise (-1 port specified)
|
||||
updateConfig(`
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
cluster: {
|
||||
listen: "0.0.0.0:-1"
|
||||
@@ -1454,7 +1454,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
|
||||
verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
|
||||
|
||||
// Update to remove cluster_advertise
|
||||
updateConfig(`
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
cluster: {
|
||||
listen: "0.0.0.0:-1"
|
||||
@@ -1463,6 +1463,66 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
|
||||
verify("0.0.0.0", orgClusterPort, "")
|
||||
}
|
||||
|
||||
func TestConfigReloadClusterNoAdvertise(t *testing.T) {
|
||||
conf := "routeadv.conf"
|
||||
if err := ioutil.WriteFile(conf, []byte(`
|
||||
listen: "0.0.0.0:-1"
|
||||
client_advertise: "me:1"
|
||||
cluster: {
|
||||
listen: "0.0.0.0:-1"
|
||||
}
|
||||
`), 0666); err != nil {
|
||||
t.Fatalf("Error creating config file: %v", err)
|
||||
}
|
||||
defer os.Remove(conf)
|
||||
opts, err := ProcessConfigFile(conf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
opts.NoLog = true
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
s.mu.Lock()
|
||||
ccurls := s.routeInfo.ClientConnectURLs
|
||||
s.mu.Unlock()
|
||||
if len(ccurls) != 1 && ccurls[0] != "me:1" {
|
||||
t.Fatalf("Unexpected routeInfo.ClientConnectURLS: %v", ccurls)
|
||||
}
|
||||
|
||||
// Update config with no_advertise
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
client_advertise: "me:1"
|
||||
cluster: {
|
||||
listen: "0.0.0.0:-1"
|
||||
no_advertise: true
|
||||
}
|
||||
`)
|
||||
|
||||
s.mu.Lock()
|
||||
ccurls = s.routeInfo.ClientConnectURLs
|
||||
s.mu.Unlock()
|
||||
if len(ccurls) != 0 {
|
||||
t.Fatalf("Unexpected routeInfo.ClientConnectURLS: %v", ccurls)
|
||||
}
|
||||
|
||||
// Update config with cluster_advertise (no port specified)
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
client_advertise: "me:1"
|
||||
cluster: {
|
||||
listen: "0.0.0.0:-1"
|
||||
}
|
||||
`)
|
||||
s.mu.Lock()
|
||||
ccurls = s.routeInfo.ClientConnectURLs
|
||||
s.mu.Unlock()
|
||||
if len(ccurls) != 1 && ccurls[0] != "me:1" {
|
||||
t.Fatalf("Unexpected routeInfo.ClientConnectURLS: %v", ccurls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigReloadClientAdvertise(t *testing.T) {
|
||||
conf := "clientadv.conf"
|
||||
if err := ioutil.WriteFile(conf, []byte(`listen: "0.0.0.0:-1"`), 0666); err != nil {
|
||||
@@ -1479,15 +1539,6 @@ func TestConfigReloadClientAdvertise(t *testing.T) {
|
||||
|
||||
orgPort := s.Addr().(*net.TCPAddr).Port
|
||||
|
||||
updateConfig := func(content string) {
|
||||
if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil {
|
||||
stackFatalf(t, "Error creating config file: %v", err)
|
||||
}
|
||||
if err := s.Reload(); err != nil {
|
||||
stackFatalf(t, "Error on reload: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
verify := func(expectedHost string, expectedPort int) {
|
||||
s.mu.Lock()
|
||||
info := s.info
|
||||
@@ -1508,21 +1559,21 @@ func TestConfigReloadClientAdvertise(t *testing.T) {
|
||||
}
|
||||
|
||||
// Update config with ClientAdvertise (port specified)
|
||||
updateConfig(`
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
client_advertise: "me:1"
|
||||
`)
|
||||
verify("me", 1)
|
||||
|
||||
// Update config with ClientAdvertise (no port specified)
|
||||
updateConfig(`
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
client_advertise: "me"
|
||||
`)
|
||||
verify("me", orgPort)
|
||||
|
||||
// Update config with ClientAdvertise (-1 port specified)
|
||||
updateConfig(`
|
||||
reloadUpdateConfig(t, s, conf, `
|
||||
listen: "0.0.0.0:-1"
|
||||
client_advertise: "me:-1"
|
||||
`)
|
||||
@@ -1530,7 +1581,7 @@ func TestConfigReloadClientAdvertise(t *testing.T) {
|
||||
|
||||
// Now remove ClientAdvertise to check that original values
|
||||
// are restored.
|
||||
updateConfig(`listen: "0.0.0.0:-1"`)
|
||||
reloadUpdateConfig(t, s, conf, `listen: "0.0.0.0:-1"`)
|
||||
verify("0.0.0.0", orgPort)
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ type route struct {
|
||||
authRequired bool
|
||||
tlsRequired bool
|
||||
closed bool
|
||||
connectURLs []string
|
||||
}
|
||||
|
||||
type connectInfo struct {
|
||||
@@ -164,10 +165,10 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
// Now let the known servers know about this new route
|
||||
s.forwardNewRouteInfoToKnownServers(info)
|
||||
}
|
||||
// Unless disabled, possibly update the server's INFO protcol
|
||||
// Unless disabled, possibly update the server's INFO protocol
|
||||
// and send to clients that know how to handle async INFOs.
|
||||
if !s.getOpts().Cluster.NoAdvertise {
|
||||
s.updateServerINFO(info.ClientConnectURLs)
|
||||
s.addClientConnectURLs(info.ClientConnectURLs)
|
||||
s.sendAsyncInfoToClients()
|
||||
}
|
||||
} else {
|
||||
@@ -181,7 +182,8 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
func (s *Server) sendAsyncInfoToClients() {
|
||||
s.mu.Lock()
|
||||
// If there are no clients supporting async INFO protocols, we are done.
|
||||
if s.cproto == 0 {
|
||||
// Also don't send if we are shutting down...
|
||||
if s.cproto == 0 || s.shutdown {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -538,6 +540,9 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
|
||||
|
||||
s.routes[c.cid] = c
|
||||
s.remotes[id] = c
|
||||
c.mu.Lock()
|
||||
c.route.connectURLs = info.ClientConnectURLs
|
||||
c.mu.Unlock()
|
||||
|
||||
// we don't need to send if the only route is the one we just accepted.
|
||||
sendInfo = len(s.routes) > 1
|
||||
@@ -627,11 +632,6 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// Get all possible URLs (when server listens to 0.0.0.0).
|
||||
// This is going to be sent to other Servers, so that they can let their
|
||||
// clients know about us.
|
||||
clientConnectURLs := s.getClientConnectURLs()
|
||||
|
||||
// Snapshot server options.
|
||||
port := opts.Cluster.Port
|
||||
|
||||
@@ -651,14 +651,17 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
// Check for TLSConfig
|
||||
tlsReq := opts.Cluster.TLSConfig != nil
|
||||
info := Info{
|
||||
ID: s.info.ID,
|
||||
Version: s.info.Version,
|
||||
AuthRequired: false,
|
||||
TLSRequired: tlsReq,
|
||||
SSLRequired: tlsReq,
|
||||
TLSVerify: tlsReq,
|
||||
MaxPayload: s.info.MaxPayload,
|
||||
ClientConnectURLs: clientConnectURLs,
|
||||
ID: s.info.ID,
|
||||
Version: s.info.Version,
|
||||
AuthRequired: false,
|
||||
TLSRequired: tlsReq,
|
||||
SSLRequired: tlsReq,
|
||||
TLSVerify: tlsReq,
|
||||
MaxPayload: s.info.MaxPayload,
|
||||
}
|
||||
// Set this if only if advertise is not disabled
|
||||
if !opts.Cluster.NoAdvertise {
|
||||
info.ClientConnectURLs = s.clientConnectURLs
|
||||
}
|
||||
// If we have selected a random port...
|
||||
if port == 0 {
|
||||
|
||||
@@ -626,7 +626,6 @@ func TestClientConnectToRoutePort(t *testing.T) {
|
||||
// cluster's Host to localhost so it works on Windows too, since on
|
||||
// Windows, a client can't use 0.0.0.0 in a connect.
|
||||
opts.Cluster.Host = "localhost"
|
||||
opts.Cluster.NoAdvertise = true
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -650,6 +649,20 @@ func TestClientConnectToRoutePort(t *testing.T) {
|
||||
t.Fatalf("Expected client to be connected to %v, got %v", clientURL, nc.ConnectedUrl())
|
||||
}
|
||||
}
|
||||
|
||||
s.Shutdown()
|
||||
// Try again with NoAdvertise and this time, the client should fail to connect.
|
||||
opts.Cluster.NoAdvertise = true
|
||||
s = RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
for i := 0; i < total; i++ {
|
||||
nc, err := nats.Connect(url)
|
||||
if err == nil {
|
||||
nc.Close()
|
||||
t.Fatal("Expected error on connect, got none")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type checkDuplicateRouteLogger struct {
|
||||
|
||||
@@ -86,6 +86,7 @@ type Server struct {
|
||||
trace int32
|
||||
debug int32
|
||||
}
|
||||
clientConnectURLs []string
|
||||
|
||||
// These store the real client/cluster listen ports. They are
|
||||
// required during config reload to reset the Options (after
|
||||
@@ -441,6 +442,8 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Keep track of client connect URLs. We may need them later.
|
||||
s.clientConnectURLs = s.getClientConnectURLs()
|
||||
s.mu.Unlock()
|
||||
|
||||
// Let the caller know that we are ready
|
||||
@@ -810,24 +813,48 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
return c
|
||||
}
|
||||
|
||||
// addClientConnectURLs adds the given array of urls to the server's
|
||||
// INFO.ClientConnectURLs array. The server INFO JSON is regenerated.
|
||||
// Note that a check is made to ensure that given URLs are not
|
||||
// already present. So the INFO JSON is regenerated only if new ULRs
|
||||
// were added.
|
||||
func (s *Server) addClientConnectURLs(urls []string) {
|
||||
s.updateServerINFO(urls, true)
|
||||
}
|
||||
|
||||
// removeClientConnectURLs removes the given array of urls from the server's
|
||||
// INFO.ClientConnectURLs array. The server INFO JSON is regenerated.
|
||||
func (s *Server) removeClientConnectURLs(urls []string) {
|
||||
s.updateServerINFO(urls, false)
|
||||
}
|
||||
|
||||
// updateServerINFO updates the server's Info object with the given
|
||||
// array of URLs and re-generate the infoJSON byte array, only if the
|
||||
// given URLs were not already recorded.
|
||||
func (s *Server) updateServerINFO(urls []string) {
|
||||
// array of URLs and re-generate the infoJSON byte array.
|
||||
func (s *Server) updateServerINFO(urls []string, add bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Will be set to true if we alter the server's Info object.
|
||||
wasUpdated := false
|
||||
remove := !add
|
||||
for _, url := range urls {
|
||||
if _, present := s.info.clientConnectURLs[url]; !present {
|
||||
|
||||
_, present := s.info.clientConnectURLs[url]
|
||||
if add && !present {
|
||||
s.info.clientConnectURLs[url] = struct{}{}
|
||||
s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
|
||||
wasUpdated = true
|
||||
} else if remove && present {
|
||||
delete(s.info.clientConnectURLs, url)
|
||||
wasUpdated = true
|
||||
}
|
||||
}
|
||||
if wasUpdated {
|
||||
// Recreate the info.ClientConnectURL array from the map
|
||||
s.info.ClientConnectURLs = s.info.ClientConnectURLs[:0]
|
||||
// Add this server client connect ULRs first...
|
||||
s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, s.clientConnectURLs...)
|
||||
for url := range s.info.clientConnectURLs {
|
||||
s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
|
||||
}
|
||||
s.generateServerInfoJSON()
|
||||
}
|
||||
}
|
||||
@@ -1026,14 +1053,12 @@ func (s *Server) startGoRoutine(f func()) {
|
||||
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
|
||||
// port based on the server options' Host and Port. If the Host corresponds to
|
||||
// "any" interfaces, this call returns the list of resolved IP addresses.
|
||||
// If ClientAdvertise is set, returns the client advertise host and port
|
||||
// If ClientAdvertise is set, returns the client advertise host and port.
|
||||
// The server lock is assumed held on entry.
|
||||
func (s *Server) getClientConnectURLs() []string {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
urls := make([]string, 0, 1)
|
||||
|
||||
// short circuit if client advertise is set
|
||||
|
||||
@@ -153,7 +153,9 @@ func TestGetConnectURLs(t *testing.T) {
|
||||
s := New(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
s.mu.Lock()
|
||||
urls := s.getClientConnectURLs()
|
||||
s.mu.Unlock()
|
||||
if len(urls) == 0 {
|
||||
t.Fatalf("Expected to get a list of urls, got none for listen addr: %v", opts.Host)
|
||||
}
|
||||
@@ -189,7 +191,9 @@ func TestGetConnectURLs(t *testing.T) {
|
||||
s := New(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
s.mu.Lock()
|
||||
urls := s.getClientConnectURLs()
|
||||
s.mu.Unlock()
|
||||
if len(urls) != 1 {
|
||||
t.Fatalf("Expected one URL, got %v", urls)
|
||||
}
|
||||
@@ -220,7 +224,9 @@ func TestClientAdvertiseConnectURL(t *testing.T) {
|
||||
s := New(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
s.mu.Lock()
|
||||
urls := s.getClientConnectURLs()
|
||||
s.mu.Unlock()
|
||||
if len(urls) != 1 {
|
||||
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
|
||||
opts.Host, opts.ClientAdvertise)
|
||||
@@ -232,7 +238,9 @@ func TestClientAdvertiseConnectURL(t *testing.T) {
|
||||
|
||||
opts.ClientAdvertise = "nats.example.com:7777"
|
||||
s = New(opts)
|
||||
s.mu.Lock()
|
||||
urls = s.getClientConnectURLs()
|
||||
s.mu.Unlock()
|
||||
if len(urls) != 1 {
|
||||
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
|
||||
opts.Host, opts.ClientAdvertise)
|
||||
|
||||
@@ -98,7 +98,7 @@ func TestNoMonitorPort(t *testing.T) {
|
||||
func testEndpointDataRace(endpoint string, t *testing.T) {
|
||||
var doneWg sync.WaitGroup
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://127.0.0.1:%d/", MONITOR_PORT)
|
||||
|
||||
// Poll as fast as we can, while creating connections, publishing,
|
||||
// and subscribing.
|
||||
|
||||
@@ -8,14 +8,12 @@ import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"reflect"
|
||||
"strconv"
|
||||
|
||||
"github.com/nats-io/gnatsd/server"
|
||||
)
|
||||
|
||||
@@ -655,13 +653,19 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
buf := routeExpect(infoRe)
|
||||
info := server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
stackFatalf(t, "Could not unmarshal route info: %v", err)
|
||||
}
|
||||
if len(info.ClientConnectURLs) == 0 {
|
||||
t.Fatal("Expected a list of URLs, got none")
|
||||
}
|
||||
if info.ClientConnectURLs[0] != clientURL {
|
||||
t.Fatalf("Expected ClientConnectURLs to be %q, got %q", clientURL, info.ClientConnectURLs[0])
|
||||
if opts.Cluster.NoAdvertise {
|
||||
if len(info.ClientConnectURLs) != 0 {
|
||||
stackFatalf(t, "Expected ClientConnectURLs to be empty, got %v", info.ClientConnectURLs)
|
||||
}
|
||||
} else {
|
||||
if len(info.ClientConnectURLs) == 0 {
|
||||
stackFatalf(t, "Expected a list of URLs, got none")
|
||||
}
|
||||
if info.ClientConnectURLs[0] != clientURL {
|
||||
stackFatalf(t, "Expected ClientConnectURLs to be %q, got %q", clientURL, info.ClientConnectURLs[0])
|
||||
}
|
||||
}
|
||||
|
||||
return rc, routeSend, routeExpect
|
||||
@@ -675,7 +679,7 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
routeInfo.ClientConnectURLs = urls
|
||||
b, err := json.Marshal(routeInfo)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not marshal test route info: %v", err)
|
||||
stackFatalf(t, "Could not marshal test route info: %v", err)
|
||||
}
|
||||
infoJSON := fmt.Sprintf("INFO %s\r\n", b)
|
||||
routeSend(infoJSON)
|
||||
@@ -683,6 +687,27 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
routeExpect(pongRe)
|
||||
}
|
||||
|
||||
checkClientConnectURLS := func(urls, expected []string) {
|
||||
// Order of array is not guaranteed.
|
||||
ok := false
|
||||
if len(urls) == len(expected) {
|
||||
m := make(map[string]struct{}, len(expected))
|
||||
for _, url := range expected {
|
||||
m[url] = struct{}{}
|
||||
}
|
||||
ok = true
|
||||
for _, url := range urls {
|
||||
if _, present := m[url]; !present {
|
||||
ok = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
stackFatalf(t, "Expected ClientConnectURLs to be %v, got %v", expected, urls)
|
||||
}
|
||||
}
|
||||
|
||||
checkINFOReceived := func(client net.Conn, clientExpect expectFun, expectedURLs []string) {
|
||||
if opts.Cluster.NoAdvertise {
|
||||
expectNothing(t, client)
|
||||
@@ -691,11 +716,9 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
buf := clientExpect(infoRe)
|
||||
info := server.Info{}
|
||||
if err := json.Unmarshal(buf[4:], &info); err != nil {
|
||||
t.Fatalf("Could not unmarshal route info: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(info.ClientConnectURLs, expectedURLs) {
|
||||
t.Fatalf("Expected ClientConnectURLs to be %v, got %v", expectedURLs, info.ClientConnectURLs)
|
||||
stackFatalf(t, "Could not unmarshal route info: %v", err)
|
||||
}
|
||||
checkClientConnectURLS(info.ClientConnectURLs, expectedURLs)
|
||||
}
|
||||
|
||||
// Create a route
|
||||
@@ -703,32 +726,51 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
defer rc.Close()
|
||||
|
||||
// Send an INFO with single URL
|
||||
routeConnectURLs := []string{"localhost:5222"}
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
routeClientConnectURLs := []string{"127.0.0.1:5222"}
|
||||
sendRouteINFO(routeSend, routeExpect, routeClientConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO (unless disabled)
|
||||
checkINFOReceived(newClient, newClientExpect, routeConnectURLs)
|
||||
// We expect to get the one from the server we connect to and the other route.
|
||||
expectedURLs := []string{clientURL, routeClientConnectURLs[0]}
|
||||
|
||||
// Disconnect and reconnect the route.
|
||||
// Expect new client to receive an INFO (unless disabled)
|
||||
checkINFOReceived(newClient, newClientExpect, expectedURLs)
|
||||
|
||||
// Disconnect the route
|
||||
rc.Close()
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO (unless disabled).
|
||||
// The content will now have the disconnected route ClientConnectURLs
|
||||
// removed from the INFO. So it should be the one from the server the
|
||||
// client is connected to.
|
||||
checkINFOReceived(newClient, newClientExpect, []string{clientURL})
|
||||
|
||||
// Reconnect the route.
|
||||
rc, routeSend, routeExpect = createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
// Resend the same route INFO json. The server will now send
|
||||
// the INFO even when there is no change.
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
// the INFO since the disconnected route ClientConnectURLs was
|
||||
// removed in previous step.
|
||||
sendRouteINFO(routeSend, routeExpect, routeClientConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO (unless disabled)
|
||||
checkINFOReceived(newClient, newClientExpect, routeConnectURLs)
|
||||
checkINFOReceived(newClient, newClientExpect, expectedURLs)
|
||||
|
||||
// Now stop the route and restart with an additional URL
|
||||
rc.Close()
|
||||
// On route disconnect, clients will receive an updated INFO
|
||||
expectNothing(t, oldClient)
|
||||
checkINFOReceived(newClient, newClientExpect, []string{clientURL})
|
||||
|
||||
rc, routeSend, routeExpect = createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
@@ -742,15 +784,16 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
clientNoPingSend, clientNoPingExpect := setupConnWithProto(t, clientNoPing, clientProtoInfo)
|
||||
|
||||
// The route now has an additional URL
|
||||
routeConnectURLs = append(routeConnectURLs, "localhost:7777")
|
||||
routeClientConnectURLs = append(routeClientConnectURLs, "127.0.0.1:7777")
|
||||
expectedURLs = append(expectedURLs, "127.0.0.1:7777")
|
||||
// This causes the server to add the route and send INFO to clients
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
sendRouteINFO(routeSend, routeExpect, routeClientConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO, and verify content as expected.
|
||||
checkINFOReceived(newClient, newClientExpect, routeConnectURLs)
|
||||
checkINFOReceived(newClient, newClientExpect, expectedURLs)
|
||||
|
||||
// Expect nothing yet for client that did not send the PING
|
||||
expectNothing(t, clientNoPing)
|
||||
@@ -769,7 +812,7 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
if !pongRe.Match(pongBuf) {
|
||||
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe)
|
||||
}
|
||||
checkINFOReceived(clientNoPing, clientNoPingExpect, routeConnectURLs)
|
||||
checkINFOReceived(clientNoPing, clientNoPingExpect, expectedURLs)
|
||||
|
||||
// Have the client that did not send the connect do it now
|
||||
clientNoConnectSend, clientNoConnectExpect := setupConnWithProto(t, clientNoConnect, clientProtoInfo)
|
||||
@@ -786,7 +829,7 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
if !pongRe.Match(pongBuf) {
|
||||
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe)
|
||||
}
|
||||
checkINFOReceived(clientNoConnect, clientNoConnectExpect, routeConnectURLs)
|
||||
checkINFOReceived(clientNoConnect, clientNoConnectExpect, expectedURLs)
|
||||
|
||||
// Create a client connection and verify content of initial INFO contains array
|
||||
// (but empty if no advertise option is set)
|
||||
@@ -803,12 +846,44 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
if len(sinfo.ClientConnectURLs) != 0 {
|
||||
t.Fatalf("Expected ClientConnectURLs to be empty, got %v", sinfo.ClientConnectURLs)
|
||||
}
|
||||
} else if !reflect.DeepEqual(sinfo.ClientConnectURLs, routeConnectURLs) {
|
||||
t.Fatalf("Expected ClientConnectURLs to be %v, got %v", routeConnectURLs, sinfo.ClientConnectURLs)
|
||||
} else {
|
||||
checkClientConnectURLS(sinfo.ClientConnectURLs, expectedURLs)
|
||||
}
|
||||
|
||||
// Add a new route
|
||||
routeID = "Server-C"
|
||||
rc2, route2Send, route2Expect := createRoute()
|
||||
defer rc2.Close()
|
||||
|
||||
// Send an INFO with single URL
|
||||
rc2ConnectURLs := []string{"127.0.0.1:8888"}
|
||||
sendRouteINFO(route2Send, route2Expect, rc2ConnectURLs)
|
||||
|
||||
// This is the combined client connect URLs array
|
||||
totalConnectURLs := expectedURLs
|
||||
totalConnectURLs = append(totalConnectURLs, rc2ConnectURLs...)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO (unless disabled)
|
||||
checkINFOReceived(newClient, newClientExpect, totalConnectURLs)
|
||||
|
||||
// Make first route disconnect
|
||||
rc.Close()
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect new client to receive an INFO (unless disabled)
|
||||
// The content should be the server client is connected to and the last route
|
||||
checkINFOReceived(newClient, newClientExpect, []string{"127.0.0.1:4242", "127.0.0.1:8888"})
|
||||
}
|
||||
|
||||
opts := LoadConfig("./configs/cluster.conf")
|
||||
// For this test, be explicit about listen spec.
|
||||
opts.Host = "127.0.0.1"
|
||||
opts.Port = 4242
|
||||
for i := 0; i < 2; i++ {
|
||||
if i == 1 {
|
||||
opts.Cluster.NoAdvertise = true
|
||||
|
||||
Reference in New Issue
Block a user