mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[CHANGED] Server notifies clients when server rejoins cluster
When the option Cluster.NoAdvertise is false, a server will send an INFO protocol message to its client when a server has joined the cluster. Previously, the protocol would be sent only if the joining server's "client URLs" (the addresses where clients connect to) were new. It will now be sent regardless if the server joins (for the first time) or rejoins the cluster. Clients are still by default invoking the DiscoveredServersCB callback only if they themselves detect that new URLs were added. A separate PR may be filled to client libraries repo to be able to invoke the callback anytime an async INFO protocol is received. Based on @madgrenadier PR #597.
This commit is contained in:
@@ -142,6 +142,8 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
c.Debugf("Registering remote route %q", info.ID)
|
||||
// Send our local subscriptions to this route.
|
||||
s.sendLocalSubsToRoute(c)
|
||||
// sendInfo will be false if the route that we just accepted
|
||||
// is the only route there is.
|
||||
if sendInfo {
|
||||
// Need to get the remote IP address.
|
||||
c.mu.Lock()
|
||||
@@ -156,9 +158,10 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
// Now let the known servers know about this new route
|
||||
s.forwardNewRouteInfoToKnownServers(info)
|
||||
}
|
||||
// If the server Info did not have these URLs, update and send an INFO
|
||||
// protocol to all clients that support it (unless the feature is disabled).
|
||||
if s.updateServerINFO(info.ClientConnectURLs) {
|
||||
// Unless disabled, possibly update the server's INFO protcol
|
||||
// and send to clients that know how to handle async INFOs.
|
||||
if !s.getOpts().Cluster.NoAdvertise {
|
||||
s.updateServerINFO(info.ClientConnectURLs)
|
||||
s.sendAsyncInfoToClients()
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -767,18 +767,11 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
|
||||
// updateServerINFO updates the server's Info object with the given
|
||||
// array of URLs and re-generate the infoJSON byte array, only if the
|
||||
// given URLs were not already recorded and if the feature is not
|
||||
// disabled.
|
||||
// Returns a boolean indicating if server's Info was updated.
|
||||
func (s *Server) updateServerINFO(urls []string) bool {
|
||||
// given URLs were not already recorded.
|
||||
func (s *Server) updateServerINFO(urls []string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Feature disabled, do not update.
|
||||
if s.getOpts().Cluster.NoAdvertise {
|
||||
return false
|
||||
}
|
||||
|
||||
// Will be set to true if we alter the server's Info object.
|
||||
wasUpdated := false
|
||||
for _, url := range urls {
|
||||
@@ -792,7 +785,6 @@ func (s *Server) updateServerINFO(urls []string) bool {
|
||||
if wasUpdated {
|
||||
s.generateServerInfoJSON()
|
||||
}
|
||||
return wasUpdated
|
||||
}
|
||||
|
||||
// Handle closing down a connection when the handshake has timedout.
|
||||
|
||||
@@ -717,15 +717,15 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
|
||||
rc, routeSend, routeExpect = createRoute()
|
||||
defer rc.Close()
|
||||
|
||||
// Resend the same route INFO json, since there is no new URL,
|
||||
// no client should receive an INFO
|
||||
// Resend the same route INFO json. The server will now send
|
||||
// the INFO even when there is no change.
|
||||
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
|
||||
|
||||
// Expect nothing for old clients
|
||||
expectNothing(t, oldClient)
|
||||
|
||||
// Expect nothing for new clients as well (no real update)
|
||||
expectNothing(t, newClient)
|
||||
// Expect new client to receive an INFO (unless disabled)
|
||||
checkINFOReceived(newClient, newClientExpect, routeConnectURLs)
|
||||
|
||||
// Now stop the route and restart with an additional URL
|
||||
rc.Close()
|
||||
|
||||
Reference in New Issue
Block a user