Merge pull request #606 from nats-io/always_send_info

[CHANGED] Server notifies clients when server rejoins cluster
This commit is contained in:
Ivan Kozlovic
2017-11-27 13:37:21 -07:00
committed by GitHub
3 changed files with 12 additions and 17 deletions

View File

@@ -142,6 +142,8 @@ func (c *client) processRouteInfo(info *Info) {
c.Debugf("Registering remote route %q", info.ID)
// Send our local subscriptions to this route.
s.sendLocalSubsToRoute(c)
// sendInfo will be false if the route that we just accepted
// is the only route there is.
if sendInfo {
// Need to get the remote IP address.
c.mu.Lock()
@@ -156,9 +158,10 @@ func (c *client) processRouteInfo(info *Info) {
// Now let the known servers know about this new route
s.forwardNewRouteInfoToKnownServers(info)
}
// If the server Info did not have these URLs, update and send an INFO
// protocol to all clients that support it (unless the feature is disabled).
if s.updateServerINFO(info.ClientConnectURLs) {
// Unless disabled, possibly update the server's INFO protcol
// and send to clients that know how to handle async INFOs.
if !s.getOpts().Cluster.NoAdvertise {
s.updateServerINFO(info.ClientConnectURLs)
s.sendAsyncInfoToClients()
}
} else {

View File

@@ -770,18 +770,11 @@ func (s *Server) createClient(conn net.Conn) *client {
// updateServerINFO updates the server's Info object with the given
// array of URLs and re-generate the infoJSON byte array, only if the
// given URLs were not already recorded and if the feature is not
// disabled.
// Returns a boolean indicating if server's Info was updated.
func (s *Server) updateServerINFO(urls []string) bool {
// given URLs were not already recorded.
func (s *Server) updateServerINFO(urls []string) {
s.mu.Lock()
defer s.mu.Unlock()
// Feature disabled, do not update.
if s.getOpts().Cluster.NoAdvertise {
return false
}
// Will be set to true if we alter the server's Info object.
wasUpdated := false
for _, url := range urls {
@@ -795,7 +788,6 @@ func (s *Server) updateServerINFO(urls []string) bool {
if wasUpdated {
s.generateServerInfoJSON()
}
return wasUpdated
}
// Handle closing down a connection when the handshake has timedout.

View File

@@ -717,15 +717,15 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
rc, routeSend, routeExpect = createRoute()
defer rc.Close()
// Resend the same route INFO json, since there is no new URL,
// no client should receive an INFO
// Resend the same route INFO json. The server will now send
// the INFO even when there is no change.
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
// Expect nothing for old clients
expectNothing(t, oldClient)
// Expect nothing for new clients as well (no real update)
expectNothing(t, newClient)
// Expect new client to receive an INFO (unless disabled)
checkINFOReceived(newClient, newClientExpect, routeConnectURLs)
// Now stop the route and restart with an additional URL
rc.Close()