Merge pull request #634 from nats-io/fix_connect_urls

Additional fix to #631
This commit is contained in:
Ivan Kozlovic
2018-03-06 11:32:51 -07:00
committed by GitHub
4 changed files with 194 additions and 126 deletions

View File

@@ -55,7 +55,6 @@ type clientFlag byte
const (
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
firstPongSent // The first PONG has been sent
infoUpdated // The server's Info object has changed before first PONG was sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
)
@@ -80,11 +79,6 @@ func (cf *clientFlag) setIfNotSet(c clientFlag) bool {
return false
}
// clear unset the flag (would be equivalent to set the boolean to false)
func (cf *clientFlag) clear(c clientFlag) {
*cf &= ^c
}
type client struct {
// Here first because of use of atomics, and memory alignment.
stats
@@ -579,37 +573,50 @@ func (c *client) processPing() {
return
}
c.traceOutOp("PONG", nil)
err := c.sendProto([]byte("PONG\r\n"), true)
if err != nil {
if err := c.sendProto([]byte("PONG\r\n"), true); err != nil {
c.clearConnection()
c.Debugf("Error on Flush, error %s", err.Error())
c.mu.Unlock()
return
}
srv := c.srv
sendUpdateINFO := false
// Check if this is the first PONG, if so...
if c.flags.setIfNotSet(firstPongSent) {
// Check if server should send an async INFO protocol to the client
if c.opts.Protocol >= ClientProtoInfo &&
srv != nil && c.flags.isSet(infoUpdated) {
sendUpdateINFO = true
}
// We can now clear the flag
c.flags.clear(infoUpdated)
// The CONNECT should have been received, but make sure it
// is so before proceeding
if !c.flags.isSet(connectReceived) {
c.mu.Unlock()
return
}
// If we are here, the CONNECT has been received so we know
// if this client supports async INFO or not.
var (
checkClusterChange bool
srv = c.srv
)
// For older clients, just flip the firstPongSent flag if not already
// set and we are done.
if c.opts.Protocol < ClientProtoInfo || srv == nil {
c.flags.setIfNotSet(firstPongSent)
} else {
// This is a client that supports async INFO protocols.
// If this is the first PING (so firstPongSent is not set yet),
// we will need to check if there was a change in cluster topology.
checkClusterChange = !c.flags.isSet(firstPongSent)
}
c.mu.Unlock()
// Some clients send an initial PING as part of the synchronous connect process.
// They can't be receiving anything until the first PONG is received.
// So we delay the possible updated INFO after this point.
if sendUpdateINFO {
if checkClusterChange {
srv.mu.Lock()
// Use the cached protocol
proto := srv.infoJSON
srv.mu.Unlock()
c.mu.Lock()
c.sendInfo(proto)
// Now that we are under both locks, we can flip the flag.
// This prevents sendAsyncInfoToClients() and and code here
// to send a double INFO protocol.
c.flags.set(firstPongSent)
// If there was a cluster update since this client was created,
// send an updated INFO protocol now.
if srv.lastCURLsUpdate >= c.start.UnixNano() {
c.sendInfo(srv.infoJSON)
}
c.mu.Unlock()
srv.mu.Unlock()
}
}
@@ -1344,8 +1351,7 @@ func (c *client) closeConnection() {
// Unless disabled, possibly update the server's INFO protcol
// and send to clients that know how to handle async INFOs.
if !srv.getOpts().Cluster.NoAdvertise {
srv.removeClientConnectURLs(connectURLs)
srv.sendAsyncInfoToClients()
srv.removeClientConnectURLsAndSendINFOToClients(connectURLs)
}
}

View File

@@ -168,8 +168,7 @@ func (c *client) processRouteInfo(info *Info) {
// Unless disabled, possibly update the server's INFO protocol
// and send to clients that know how to handle async INFOs.
if !s.getOpts().Cluster.NoAdvertise {
s.addClientConnectURLs(info.ClientConnectURLs)
s.sendAsyncInfoToClients()
s.addClientConnectURLsAndSendINFOToClients(info.ClientConnectURLs)
}
} else {
c.Debugf("Detected duplicate remote route %q", info.ID)
@@ -179,46 +178,24 @@ func (c *client) processRouteInfo(info *Info) {
// sendAsyncInfoToClients sends an INFO protocol to all
// connected clients that accept async INFO updates.
// The server lock is held on entry.
func (s *Server) sendAsyncInfoToClients() {
s.mu.Lock()
// If there are no clients supporting async INFO protocols, we are done.
// Also don't send if we are shutting down...
if s.cproto == 0 || s.shutdown {
s.mu.Unlock()
return
}
// Capture under lock
proto := s.infoJSON
// Make a copy of ALL clients so we can release server lock while
// sending the protocol to clients. We could check the conditions
// (proto support, first PONG sent) here and so have potentially
// a limited number of clients, but that would mean grabbing the
// client's lock here, which we don't want since we would still
// need it in the second loop.
clients := make([]*client, 0, len(s.clients))
for _, c := range s.clients {
clients = append(clients, c)
}
s.mu.Unlock()
for _, c := range clients {
c.mu.Lock()
// If server did not yet receive the CONNECT protocol, check later
// when sending the first PONG.
if !c.flags.isSet(connectReceived) {
c.flags.set(infoUpdated)
} else if c.opts.Protocol >= ClientProtoInfo {
// Send only if first PONG was sent
if c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
c.sendInfo(proto)
} else {
// Otherwise, notify that INFO has changed and check later.
c.flags.set(infoUpdated)
}
// Here, we are going to send only to the clients that are fully
// registered (server has received CONNECT and first PING). For
// clients that are not at this stage, this will happen in the
// processing of the first PING (see client.processPing)
if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
c.sendInfo(s.infoJSON)
}
c.mu.Unlock()
}

View File

@@ -729,57 +729,132 @@ func TestRoutesToEachOther(t *testing.T) {
}
}
func TestConnectULRsWithRoutesToEachOther(t *testing.T) {
optsA := DefaultOptions()
optsA.Host = "127.0.0.1"
optsA.Cluster.Port = 7246
optsA.Routes = RoutesFromStr("nats://127.0.0.1:7247")
optsB := DefaultOptions()
optsB.Host = "127.0.0.1"
optsB.Cluster.Port = 7247
optsB.Routes = RoutesFromStr("nats://127.0.0.1:7246")
// Start servers with go routines to increase change of
// each server connecting to each other at the same time.
srvA := New(optsA)
defer srvA.Shutdown()
srvB := New(optsB)
defer srvB.Shutdown()
go srvA.Start()
go srvB.Start()
// Wait for cluster to be formed
checkClusterFormed(t, srvA, srvB)
// Connect to serverB
url := fmt.Sprintf("nats://%s", srvB.Addr().String())
nc, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
ds := nc.Servers()
if len(ds) != 2 {
t.Fatalf("Expected 2 servers, got %v", ds)
}
// Shutdown server A and make sure that we are notfied
// that server A is no longer running.
srvA.Shutdown()
timeout := time.Now().Add(5 * time.Second)
ok := false
for time.Now().Before(timeout) {
ds = nc.Servers()
if len(ds) == 1 {
ok = true
break
}
time.Sleep(50 * time.Millisecond)
}
if !ok {
t.Fatalf("List of servers should be only 1, got %v", ds)
func wait(ch chan bool) error {
select {
case <-ch:
return nil
case <-time.After(5 * time.Second):
}
return fmt.Errorf("timeout")
}
func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
s1Opts := DefaultOptions()
s1Opts.Host = "127.0.0.1"
s1Opts.Port = 4222
s1Opts.Cluster.Host = "127.0.0.1"
s1Opts.Cluster.Port = 6222
s1Opts.Routes = RoutesFromStr("nats://127.0.0.1:6223,nats://127.0.0.1:6224")
s1 := RunServer(s1Opts)
defer s1.Shutdown()
s1Url := "nats://127.0.0.1:4222"
s2Url := "nats://127.0.0.1:4223"
s3Url := "nats://127.0.0.1:4224"
ch := make(chan bool, 1)
chch := make(chan bool, 1)
connHandler := func(_ *nats.Conn) {
chch <- true
}
nc, err := nats.Connect(s1Url,
nats.ReconnectHandler(connHandler),
nats.DiscoveredServersHandler(func(_ *nats.Conn) {
ch <- true
}))
if err != nil {
t.Fatalf("Error on connect")
}
s2Opts := DefaultOptions()
s2Opts.Host = "127.0.0.1"
s2Opts.Port = s1Opts.Port + 1
s2Opts.Cluster.Host = "127.0.0.1"
s2Opts.Cluster.Port = 6223
s2Opts.Routes = RoutesFromStr("nats://127.0.0.1:6222,nats://127.0.0.1:6224")
s2 := RunServer(s2Opts)
defer s2.Shutdown()
// Wait to be notified
if err := wait(ch); err != nil {
t.Fatal("New server callback was not invoked")
}
checkPool := func(expected []string) {
// Don't use discovered here, but Servers to have the full list.
// Also, there may be cases where the mesh is not formed yet,
// so try again on failure.
var (
ds []string
timeout = time.Now().Add(5 * time.Second)
)
for time.Now().Before(timeout) {
ds = nc.Servers()
if len(ds) == len(expected) {
m := make(map[string]struct{}, len(ds))
for _, url := range ds {
m[url] = struct{}{}
}
ok := true
for _, url := range expected {
if _, present := m[url]; !present {
ok = false
break
}
}
if ok {
return
}
}
time.Sleep(50 * time.Millisecond)
}
stackFatalf(t, "Expected %v, got %v", expected, ds)
}
// Verify that we now know about s2
checkPool([]string{s1Url, s2Url})
s3Opts := DefaultOptions()
s3Opts.Host = "127.0.0.1"
s3Opts.Port = s2Opts.Port + 1
s3Opts.Cluster.Host = "127.0.0.1"
s3Opts.Cluster.Port = 6224
s3Opts.Routes = RoutesFromStr("nats://127.0.0.1:6222,nats://127.0.0.1:6223")
s3 := RunServer(s3Opts)
defer s3.Shutdown()
// Wait to be notified
if err := wait(ch); err != nil {
t.Fatal("New server callback was not invoked")
}
// Verify that we now know about s3
checkPool([]string{s1Url, s2Url, s3Url})
// Stop s1. Since this was passed to the Connect() call, this one should
// still be present.
s1.Shutdown()
// Wait for reconnect
if err := wait(chch); err != nil {
t.Fatal("Reconnect handler not invoked")
}
checkPool([]string{s1Url, s2Url, s3Url})
// Check the server we reconnected to.
reConnectedTo := nc.ConnectedUrl()
expected := []string{s1Url}
if reConnectedTo == s2Url {
s2.Shutdown()
expected = append(expected, s3Url)
} else if reConnectedTo == s3Url {
s3.Shutdown()
expected = append(expected, s2Url)
} else {
t.Fatalf("Unexpected server client has reconnected to: %v", reConnectedTo)
}
// Wait for reconnect
if err := wait(chch); err != nil {
t.Fatal("Reconnect handler not invoked")
}
// The implicit server that we just shutdown should have been removed from the pool
checkPool(expected)
nc.Close()
}

View File

@@ -87,6 +87,7 @@ type Server struct {
debug int32
}
clientConnectURLs []string
lastCURLsUpdate int64
// These store the real client/cluster listen ports. They are
// required during config reload to reset the Options (after
@@ -814,24 +815,29 @@ func (s *Server) createClient(conn net.Conn) *client {
return c
}
// addClientConnectURLs adds the given array of urls to the server's
// INFO.ClientConnectURLs array. The server INFO JSON is regenerated.
// Adds the given array of urls to the server's INFO.ClientConnectURLs
// array. The server INFO JSON is regenerated.
// Note that a check is made to ensure that given URLs are not
// already present. So the INFO JSON is regenerated only if new ULRs
// were added.
func (s *Server) addClientConnectURLs(urls []string) {
s.updateServerINFO(urls, true)
// If there was a change, an INFO protocol is sent to registered clients
// that support async INFO protocols.
func (s *Server) addClientConnectURLsAndSendINFOToClients(urls []string) {
s.updateServerINFOAndSendINFOToClients(urls, true)
}
// removeClientConnectURLs removes the given array of urls from the server's
// INFO.ClientConnectURLs array. The server INFO JSON is regenerated.
func (s *Server) removeClientConnectURLs(urls []string) {
s.updateServerINFO(urls, false)
// Removes the given array of urls from the server's INFO.ClientConnectURLs
// array. The server INFO JSON is regenerated if needed.
// If there was a change, an INFO protocol is sent to registered clients
// that support async INFO protocols.
func (s *Server) removeClientConnectURLsAndSendINFOToClients(urls []string) {
s.updateServerINFOAndSendINFOToClients(urls, false)
}
// updateServerINFO updates the server's Info object with the given
// array of URLs and re-generate the infoJSON byte array.
func (s *Server) updateServerINFO(urls []string, add bool) {
// Updates the server's Info object with the given array of URLs and re-generate
// the infoJSON byte array, then send an (async) INFO protocol to clients that
// support it.
func (s *Server) updateServerINFOAndSendINFOToClients(urls []string, add bool) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -857,6 +863,10 @@ func (s *Server) updateServerINFO(urls []string, add bool) {
s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
}
s.generateServerInfoJSON()
// Update the time of this update
s.lastCURLsUpdate = time.Now().UnixNano()
// Send to all registered clients that support async INFO protocols.
s.sendAsyncInfoToClients()
}
}