mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
Additional fix to #631
This is the result of flapping tests in go-nats that were caused by a defect (see PR https://github.com/nats-io/go-nats/pull/348). However, during debugging, I realize that there were also things that were not quite right in the server side. This change should make it the notification of cluster topology changes to clients more robust.
This commit is contained in:
@@ -55,7 +55,6 @@ type clientFlag byte
|
||||
const (
|
||||
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
|
||||
firstPongSent // The first PONG has been sent
|
||||
infoUpdated // The server's Info object has changed before first PONG was sent
|
||||
handshakeComplete // For TLS clients, indicate that the handshake is complete
|
||||
)
|
||||
|
||||
@@ -80,10 +79,12 @@ func (cf *clientFlag) setIfNotSet(c clientFlag) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Commenting out for now otherwise megacheck complains.
|
||||
// We may need that in the future.
|
||||
// clear unset the flag (would be equivalent to set the boolean to false)
|
||||
func (cf *clientFlag) clear(c clientFlag) {
|
||||
*cf &= ^c
|
||||
}
|
||||
// func (cf *clientFlag) clear(c clientFlag) {
|
||||
// *cf &= ^c
|
||||
// }
|
||||
|
||||
type client struct {
|
||||
// Here first because of use of atomics, and memory alignment.
|
||||
@@ -579,37 +580,50 @@ func (c *client) processPing() {
|
||||
return
|
||||
}
|
||||
c.traceOutOp("PONG", nil)
|
||||
err := c.sendProto([]byte("PONG\r\n"), true)
|
||||
if err != nil {
|
||||
if err := c.sendProto([]byte("PONG\r\n"), true); err != nil {
|
||||
c.clearConnection()
|
||||
c.Debugf("Error on Flush, error %s", err.Error())
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
srv := c.srv
|
||||
sendUpdateINFO := false
|
||||
// Check if this is the first PONG, if so...
|
||||
if c.flags.setIfNotSet(firstPongSent) {
|
||||
// Check if server should send an async INFO protocol to the client
|
||||
if c.opts.Protocol >= ClientProtoInfo &&
|
||||
srv != nil && c.flags.isSet(infoUpdated) {
|
||||
sendUpdateINFO = true
|
||||
}
|
||||
// We can now clear the flag
|
||||
c.flags.clear(infoUpdated)
|
||||
// The CONNECT should have been received, but make sure it
|
||||
// is so before proceeding
|
||||
if !c.flags.isSet(connectReceived) {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// If we are here, the CONNECT has been received so we know
|
||||
// if this client supports async INFO or not.
|
||||
var (
|
||||
checkClusterChange bool
|
||||
srv = c.srv
|
||||
)
|
||||
// For older clients, just flip the firstPongSent flag if not already
|
||||
// set and we are done.
|
||||
if c.opts.Protocol < ClientProtoInfo || srv == nil {
|
||||
c.flags.setIfNotSet(firstPongSent)
|
||||
} else {
|
||||
// This is a client that supports async INFO protocols.
|
||||
// If this is the first PING (so firstPongSent is not set yet),
|
||||
// we will need to check if there was a change in cluster topology.
|
||||
checkClusterChange = !c.flags.isSet(firstPongSent)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
// Some clients send an initial PING as part of the synchronous connect process.
|
||||
// They can't be receiving anything until the first PONG is received.
|
||||
// So we delay the possible updated INFO after this point.
|
||||
if sendUpdateINFO {
|
||||
if checkClusterChange {
|
||||
srv.mu.Lock()
|
||||
// Use the cached protocol
|
||||
proto := srv.infoJSON
|
||||
srv.mu.Unlock()
|
||||
|
||||
c.mu.Lock()
|
||||
c.sendInfo(proto)
|
||||
// Now that we are under both locks, we can flip the flag.
|
||||
// This prevents sendAsyncInfoToClients() and and code here
|
||||
// to send a double INFO protocol.
|
||||
c.flags.set(firstPongSent)
|
||||
// If there was a cluster update since this client was created,
|
||||
// send an updated INFO protocol now.
|
||||
if srv.lastCURLsUpdate >= c.start.UnixNano() {
|
||||
c.sendInfo(srv.infoJSON)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
srv.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1344,8 +1358,7 @@ func (c *client) closeConnection() {
|
||||
// Unless disabled, possibly update the server's INFO protcol
|
||||
// and send to clients that know how to handle async INFOs.
|
||||
if !srv.getOpts().Cluster.NoAdvertise {
|
||||
srv.removeClientConnectURLs(connectURLs)
|
||||
srv.sendAsyncInfoToClients()
|
||||
srv.removeClientConnectURLsAndSendINFOToClients(connectURLs)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -168,8 +168,7 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
// Unless disabled, possibly update the server's INFO protocol
|
||||
// and send to clients that know how to handle async INFOs.
|
||||
if !s.getOpts().Cluster.NoAdvertise {
|
||||
s.addClientConnectURLs(info.ClientConnectURLs)
|
||||
s.sendAsyncInfoToClients()
|
||||
s.addClientConnectURLsAndSendINFOToClients(info.ClientConnectURLs)
|
||||
}
|
||||
} else {
|
||||
c.Debugf("Detected duplicate remote route %q", info.ID)
|
||||
@@ -179,46 +178,24 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
|
||||
// sendAsyncInfoToClients sends an INFO protocol to all
|
||||
// connected clients that accept async INFO updates.
|
||||
// The server lock is held on entry.
|
||||
func (s *Server) sendAsyncInfoToClients() {
|
||||
s.mu.Lock()
|
||||
// If there are no clients supporting async INFO protocols, we are done.
|
||||
// Also don't send if we are shutting down...
|
||||
if s.cproto == 0 || s.shutdown {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Capture under lock
|
||||
proto := s.infoJSON
|
||||
|
||||
// Make a copy of ALL clients so we can release server lock while
|
||||
// sending the protocol to clients. We could check the conditions
|
||||
// (proto support, first PONG sent) here and so have potentially
|
||||
// a limited number of clients, but that would mean grabbing the
|
||||
// client's lock here, which we don't want since we would still
|
||||
// need it in the second loop.
|
||||
clients := make([]*client, 0, len(s.clients))
|
||||
for _, c := range s.clients {
|
||||
clients = append(clients, c)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, c := range clients {
|
||||
c.mu.Lock()
|
||||
// If server did not yet receive the CONNECT protocol, check later
|
||||
// when sending the first PONG.
|
||||
if !c.flags.isSet(connectReceived) {
|
||||
c.flags.set(infoUpdated)
|
||||
} else if c.opts.Protocol >= ClientProtoInfo {
|
||||
// Send only if first PONG was sent
|
||||
if c.flags.isSet(firstPongSent) {
|
||||
// sendInfo takes care of checking if the connection is still
|
||||
// valid or not, so don't duplicate tests here.
|
||||
c.sendInfo(proto)
|
||||
} else {
|
||||
// Otherwise, notify that INFO has changed and check later.
|
||||
c.flags.set(infoUpdated)
|
||||
}
|
||||
// Here, we are going to send only to the clients that are fully
|
||||
// registered (server has received CONNECT and first PING). For
|
||||
// clients that are not at this stage, this will happen in the
|
||||
// processing of the first PING (see client.processPing)
|
||||
if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
|
||||
// sendInfo takes care of checking if the connection is still
|
||||
// valid or not, so don't duplicate tests here.
|
||||
c.sendInfo(s.infoJSON)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -729,57 +729,132 @@ func TestRoutesToEachOther(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnectULRsWithRoutesToEachOther(t *testing.T) {
|
||||
optsA := DefaultOptions()
|
||||
optsA.Host = "127.0.0.1"
|
||||
optsA.Cluster.Port = 7246
|
||||
optsA.Routes = RoutesFromStr("nats://127.0.0.1:7247")
|
||||
|
||||
optsB := DefaultOptions()
|
||||
optsB.Host = "127.0.0.1"
|
||||
optsB.Cluster.Port = 7247
|
||||
optsB.Routes = RoutesFromStr("nats://127.0.0.1:7246")
|
||||
|
||||
// Start servers with go routines to increase change of
|
||||
// each server connecting to each other at the same time.
|
||||
srvA := New(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
srvB := New(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
go srvA.Start()
|
||||
go srvB.Start()
|
||||
|
||||
// Wait for cluster to be formed
|
||||
checkClusterFormed(t, srvA, srvB)
|
||||
|
||||
// Connect to serverB
|
||||
url := fmt.Sprintf("nats://%s", srvB.Addr().String())
|
||||
nc, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
ds := nc.Servers()
|
||||
if len(ds) != 2 {
|
||||
t.Fatalf("Expected 2 servers, got %v", ds)
|
||||
}
|
||||
|
||||
// Shutdown server A and make sure that we are notfied
|
||||
// that server A is no longer running.
|
||||
srvA.Shutdown()
|
||||
timeout := time.Now().Add(5 * time.Second)
|
||||
ok := false
|
||||
for time.Now().Before(timeout) {
|
||||
ds = nc.Servers()
|
||||
if len(ds) == 1 {
|
||||
ok = true
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("List of servers should be only 1, got %v", ds)
|
||||
func wait(ch chan bool) error {
|
||||
select {
|
||||
case <-ch:
|
||||
return nil
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
return fmt.Errorf("timeout")
|
||||
}
|
||||
|
||||
func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
|
||||
s1Opts := DefaultOptions()
|
||||
s1Opts.Host = "127.0.0.1"
|
||||
s1Opts.Port = 4222
|
||||
s1Opts.Cluster.Host = "127.0.0.1"
|
||||
s1Opts.Cluster.Port = 6222
|
||||
s1Opts.Routes = RoutesFromStr("nats://127.0.0.1:6223,nats://127.0.0.1:6224")
|
||||
s1 := RunServer(s1Opts)
|
||||
defer s1.Shutdown()
|
||||
|
||||
s1Url := "nats://127.0.0.1:4222"
|
||||
s2Url := "nats://127.0.0.1:4223"
|
||||
s3Url := "nats://127.0.0.1:4224"
|
||||
|
||||
ch := make(chan bool, 1)
|
||||
chch := make(chan bool, 1)
|
||||
connHandler := func(_ *nats.Conn) {
|
||||
chch <- true
|
||||
}
|
||||
nc, err := nats.Connect(s1Url,
|
||||
nats.ReconnectHandler(connHandler),
|
||||
nats.DiscoveredServersHandler(func(_ *nats.Conn) {
|
||||
ch <- true
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect")
|
||||
}
|
||||
|
||||
s2Opts := DefaultOptions()
|
||||
s2Opts.Host = "127.0.0.1"
|
||||
s2Opts.Port = s1Opts.Port + 1
|
||||
s2Opts.Cluster.Host = "127.0.0.1"
|
||||
s2Opts.Cluster.Port = 6223
|
||||
s2Opts.Routes = RoutesFromStr("nats://127.0.0.1:6222,nats://127.0.0.1:6224")
|
||||
s2 := RunServer(s2Opts)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Wait to be notified
|
||||
if err := wait(ch); err != nil {
|
||||
t.Fatal("New server callback was not invoked")
|
||||
}
|
||||
|
||||
checkPool := func(expected []string) {
|
||||
// Don't use discovered here, but Servers to have the full list.
|
||||
// Also, there may be cases where the mesh is not formed yet,
|
||||
// so try again on failure.
|
||||
var (
|
||||
ds []string
|
||||
timeout = time.Now().Add(5 * time.Second)
|
||||
)
|
||||
for time.Now().Before(timeout) {
|
||||
ds = nc.Servers()
|
||||
if len(ds) == len(expected) {
|
||||
m := make(map[string]struct{}, len(ds))
|
||||
for _, url := range ds {
|
||||
m[url] = struct{}{}
|
||||
}
|
||||
ok := true
|
||||
for _, url := range expected {
|
||||
if _, present := m[url]; !present {
|
||||
ok = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
stackFatalf(t, "Expected %v, got %v", expected, ds)
|
||||
}
|
||||
// Verify that we now know about s2
|
||||
checkPool([]string{s1Url, s2Url})
|
||||
|
||||
s3Opts := DefaultOptions()
|
||||
s3Opts.Host = "127.0.0.1"
|
||||
s3Opts.Port = s2Opts.Port + 1
|
||||
s3Opts.Cluster.Host = "127.0.0.1"
|
||||
s3Opts.Cluster.Port = 6224
|
||||
s3Opts.Routes = RoutesFromStr("nats://127.0.0.1:6222,nats://127.0.0.1:6223")
|
||||
s3 := RunServer(s3Opts)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Wait to be notified
|
||||
if err := wait(ch); err != nil {
|
||||
t.Fatal("New server callback was not invoked")
|
||||
}
|
||||
// Verify that we now know about s3
|
||||
checkPool([]string{s1Url, s2Url, s3Url})
|
||||
|
||||
// Stop s1. Since this was passed to the Connect() call, this one should
|
||||
// still be present.
|
||||
s1.Shutdown()
|
||||
// Wait for reconnect
|
||||
if err := wait(chch); err != nil {
|
||||
t.Fatal("Reconnect handler not invoked")
|
||||
}
|
||||
checkPool([]string{s1Url, s2Url, s3Url})
|
||||
|
||||
// Check the server we reconnected to.
|
||||
reConnectedTo := nc.ConnectedUrl()
|
||||
expected := []string{s1Url}
|
||||
if reConnectedTo == s2Url {
|
||||
s2.Shutdown()
|
||||
expected = append(expected, s3Url)
|
||||
} else if reConnectedTo == s3Url {
|
||||
s3.Shutdown()
|
||||
expected = append(expected, s2Url)
|
||||
} else {
|
||||
t.Fatalf("Unexpected server client has reconnected to: %v", reConnectedTo)
|
||||
}
|
||||
// Wait for reconnect
|
||||
if err := wait(chch); err != nil {
|
||||
t.Fatal("Reconnect handler not invoked")
|
||||
}
|
||||
// The implicit server that we just shutdown should have been removed from the pool
|
||||
checkPool(expected)
|
||||
nc.Close()
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ type Server struct {
|
||||
debug int32
|
||||
}
|
||||
clientConnectURLs []string
|
||||
lastCURLsUpdate int64
|
||||
|
||||
// These store the real client/cluster listen ports. They are
|
||||
// required during config reload to reset the Options (after
|
||||
@@ -814,24 +815,29 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
return c
|
||||
}
|
||||
|
||||
// addClientConnectURLs adds the given array of urls to the server's
|
||||
// INFO.ClientConnectURLs array. The server INFO JSON is regenerated.
|
||||
// Adds the given array of urls to the server's INFO.ClientConnectURLs
|
||||
// array. The server INFO JSON is regenerated.
|
||||
// Note that a check is made to ensure that given URLs are not
|
||||
// already present. So the INFO JSON is regenerated only if new ULRs
|
||||
// were added.
|
||||
func (s *Server) addClientConnectURLs(urls []string) {
|
||||
s.updateServerINFO(urls, true)
|
||||
// If there was a change, an INFO protocol is sent to registered clients
|
||||
// that support async INFO protocols.
|
||||
func (s *Server) addClientConnectURLsAndSendINFOToClients(urls []string) {
|
||||
s.updateServerINFOAndSendINFOToClients(urls, true)
|
||||
}
|
||||
|
||||
// removeClientConnectURLs removes the given array of urls from the server's
|
||||
// INFO.ClientConnectURLs array. The server INFO JSON is regenerated.
|
||||
func (s *Server) removeClientConnectURLs(urls []string) {
|
||||
s.updateServerINFO(urls, false)
|
||||
// Removes the given array of urls from the server's INFO.ClientConnectURLs
|
||||
// array. The server INFO JSON is regenerated if needed.
|
||||
// If there was a change, an INFO protocol is sent to registered clients
|
||||
// that support async INFO protocols.
|
||||
func (s *Server) removeClientConnectURLsAndSendINFOToClients(urls []string) {
|
||||
s.updateServerINFOAndSendINFOToClients(urls, false)
|
||||
}
|
||||
|
||||
// updateServerINFO updates the server's Info object with the given
|
||||
// array of URLs and re-generate the infoJSON byte array.
|
||||
func (s *Server) updateServerINFO(urls []string, add bool) {
|
||||
// Updates the server's Info object with the given array of URLs and re-generate
|
||||
// the infoJSON byte array, then send an (async) INFO protocol to clients that
|
||||
// support it.
|
||||
func (s *Server) updateServerINFOAndSendINFOToClients(urls []string, add bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -857,6 +863,10 @@ func (s *Server) updateServerINFO(urls []string, add bool) {
|
||||
s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
|
||||
}
|
||||
s.generateServerInfoJSON()
|
||||
// Update the time of this update
|
||||
s.lastCURLsUpdate = time.Now().UnixNano()
|
||||
// Send to all registered clients that support async INFO protocols.
|
||||
s.sendAsyncInfoToClients()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user