Merge pull request #314 from nats-io/server_send_async_info

[ADDED] Server sends INFO with cluster URLs to clients with support
This commit is contained in:
Derek Collison
2016-08-01 16:53:50 -04:00
committed by GitHub
9 changed files with 552 additions and 43 deletions

View File

@@ -1,6 +1,7 @@
# General
- [ ] Auth for queue groups?
- [ ] Blacklist or ERR escalation to close connection for auth/permissions
- [ ] Protocol updates, MAP, MPUB, etc
- [ ] Multiple listen endpoints

View File

@@ -13,6 +13,23 @@ import (
"time"
)
// Type of client connection.
const (
// CLIENT is an end user.
CLIENT = iota
// ROUTER is another router in the cluster.
ROUTER
)
const (
// Original Client protocol from 2009.
// http://nats.io/documentation/internals/nats-protocol/
ClientProtoZero = iota
// This signals a client can receive more then the original INFO block.
// This can be used to update clients on other cluster members, etc.
ClientProtoInfo
)
func init() {
rand.Seed(time.Now().UnixNano())
}
@@ -30,14 +47,42 @@ const (
maxBufSize = 65536
)
// Type of client
// Represent client booleans with a bitmask
type clientFlag byte
// Some client state represented as flags
const (
// CLIENT is an end user.
CLIENT = iota
// ROUTER is another router in the cluster.
ROUTER
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
firstPongSent // The first PONG has been sent
infoUpdated // The server's Info object has changed before first PONG was sent
)
// set the flag (would be equivalent to set the boolean to true)
func (cf *clientFlag) set(c clientFlag) {
*cf |= c
}
// isSet returns true if the flag is set, false otherwise
func (cf clientFlag) isSet(c clientFlag) bool {
return cf&c != 0
}
// setIfNotSet will set the flag `c` only if that flag was not already
// set and return true to indicate that the flag has been set. Returns
// false otherwise.
func (cf *clientFlag) setIfNotSet(c clientFlag) bool {
if *cf&c == 0 {
*cf |= c
return true
}
return false
}
// clear unset the flag (would be equivalent to set the boolean to false)
func (cf *clientFlag) clear(c clientFlag) {
*cf &= ^c
}
type client struct {
// Here first because of use of atomics, and memory alignment.
stats
@@ -67,6 +112,8 @@ type client struct {
route *route
debug bool
trace bool
flags clientFlag // Compact booleans into a single field. Size will be increased when needed.
}
type permissions struct {
@@ -118,6 +165,7 @@ type clientOpts struct {
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
}
var defaultOpts = clientOpts{Verbose: true, Pedantic: true}
@@ -371,13 +419,34 @@ func (c *client) processConnect(arg []byte) error {
typ := c.typ
r := c.route
srv := c.srv
c.mu.Unlock()
// Moved unmarshalling of clients' Options under the lock.
// The client has already been added to the server map, so it is possible
// that other routines lookup the client, and access its options under
// the client's lock, so unmarshalling the options outside of the lock
// would cause data RACEs.
if err := json.Unmarshal(arg, &c.opts); err != nil {
c.mu.Unlock()
return err
}
// Indicate that the CONNECT protocol has been received, and that the
// server now knows which protocol this client supports.
c.flags.set(connectReceived)
// Capture these under lock
proto := c.opts.Protocol
verbose := c.opts.Verbose
c.mu.Unlock()
if srv != nil {
// As soon as c.opts is unmarshalled and if the proto is at
// least ClientProtoInfo, we need to increment the following counter.
// This is decremented when client is removed from the server's
// clients map.
if proto >= ClientProtoInfo {
srv.mu.Lock()
srv.cproto++
srv.mu.Unlock()
}
// Check for Auth
if ok := srv.checkAuth(c); !ok {
c.authViolation()
@@ -385,6 +454,11 @@ func (c *client) processConnect(arg []byte) error {
}
}
// Check client protocol request if it exists.
if typ == CLIENT && (proto < ClientProtoZero || proto > ClientProtoInfo) {
return ErrBadClientProtocol
}
// Grab connection name of remote route.
if typ == ROUTER && r != nil {
c.mu.Lock()
@@ -392,7 +466,7 @@ func (c *client) processConnect(arg []byte) error {
c.mu.Unlock()
}
if c.opts.Verbose {
if verbose {
c.sendOK()
}
return nil
@@ -449,12 +523,15 @@ func (c *client) sendInfo(info []byte) {
func (c *client) sendErr(err string) {
c.mu.Lock()
c.traceOutOp("-ERR", []byte(err))
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", err)), true)
c.mu.Unlock()
}
func (c *client) sendOK() {
c.mu.Lock()
c.traceOutOp("OK", nil)
// Can not autoflush this one, needs to be async.
c.sendProto([]byte("+OK\r\n"), false)
c.pcd[c] = needFlush
c.mu.Unlock()
@@ -473,7 +550,33 @@ func (c *client) processPing() {
c.clearConnection()
c.Debugf("Error on Flush, error %s", err.Error())
}
srv := c.srv
sendUpdateINFO := false
// Check if this is the first PONG, if so...
if c.flags.setIfNotSet(firstPongSent) {
// Check if server should send an async INFO protocol to the client
if c.opts.Protocol >= ClientProtoInfo &&
srv != nil && c.flags.isSet(infoUpdated) {
sendUpdateINFO = true
}
// We can now clear the flag
c.flags.clear(infoUpdated)
}
c.mu.Unlock()
// Some clients send an initial PING as part of the synchronous connect process.
// They can't be receiving anything until the first PONG is received.
// So we delay the possible updated INFO after this point.
if sendUpdateINFO {
srv.mu.Lock()
// Use the cached protocol
proto := srv.infoJSON
srv.mu.Unlock()
c.mu.Lock()
c.sendInfo(proto)
c.mu.Unlock()
}
}
func (c *client) processPong() {

View File

@@ -16,6 +16,7 @@ import (
"time"
"crypto/tls"
"github.com/nats-io/nats"
)
@@ -167,6 +168,49 @@ func TestClientConnect(t *testing.T) {
}
}
func TestClientConnectProto(t *testing.T) {
_, c, _ := setupClient()
// Basic Connect setting flags, proto should be zero (original proto)
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
// ProtoInfo
connectOp = []byte(fmt.Sprintf("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false,\"protocol\":%d}\r\n", ClientProtoInfo))
err = c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
if c.opts.Protocol != ClientProtoInfo {
t.Fatalf("Protocol should have been set to %v, but is set to %v", ClientProtoInfo, c.opts.Protocol)
}
// Illegal Option
connectOp = []byte("CONNECT {\"protocol\":22}\r\n")
err = c.parse(connectOp)
if err == nil {
t.Fatalf("Expected to receive an error\n")
}
if err != ErrBadClientProtocol {
t.Fatalf("Expected err of %q, got %q\n", ErrBadClientProtocol, err)
}
}
func TestClientPing(t *testing.T) {
_, c, cr := setupClient()

View File

@@ -22,4 +22,7 @@ var (
// ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.>
ErrReservedPublishSubject = errors.New("Reserved Internal Subject")
// ErrBadClientProtocol signals a client requested an invalud client protocol.
ErrBadClientProtocol = errors.New("Invalid Client Protocol")
)

View File

@@ -153,12 +153,63 @@ func (c *client) processRouteInfo(info *Info) {
// Now let the known servers know about this new route
s.forwardNewRouteInfoToKnownServers(info)
}
// If the server Info did not have these URLs, update and send an INFO
// protocol to all clients that support it.
if s.updateServerINFO(info.ClientConnectURLs) {
s.sendAsyncInfoToClients()
}
} else {
c.Debugf("Detected duplicate remote route %q", info.ID)
c.closeConnection()
}
}
// sendAsyncInfoToClients sends an INFO protocol to all
// connected clients that accept async INFO updates.
func (s *Server) sendAsyncInfoToClients() {
s.mu.Lock()
// If there are no clients supporting async INFO protocols, we are done.
if s.cproto == 0 {
s.mu.Unlock()
return
}
// Capture under lock
proto := s.infoJSON
// Make a copy of ALL clients so we can release server lock while
// sending the protocol to clients. We could check the conditions
// (proto support, first PONG sent) here and so have potentially
// a limited number of clients, but that would mean grabbing the
// client's lock here, which we don't want since we would still
// need it in the second loop.
clients := make([]*client, 0, len(s.clients))
for _, c := range s.clients {
clients = append(clients, c)
}
s.mu.Unlock()
for _, c := range clients {
c.mu.Lock()
// If server did not yet receive the CONNECT protocol, check later
// when sending the first PONG.
if !c.flags.isSet(connectReceived) {
c.flags.set(infoUpdated)
} else if c.opts.Protocol >= ClientProtoInfo {
// Send only if first PONG was sent
if c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
c.sendInfo(proto)
} else {
// Otherwise, notify that INFO has changed and check later.
c.flags.set(infoUpdated)
}
}
c.mu.Unlock()
}
}
// This will process implicit route information received from another server.
// We will check to see if we have configured or are already connected,
// and if so we will ignore. Otherwise we will attempt to connect.
@@ -579,19 +630,31 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
// StartRouting will start the accept loop on the cluster host:port
// and will actively try to connect to listed routes.
func (s *Server) StartRouting() {
func (s *Server) StartRouting(clientListenReady chan struct{}) {
defer s.grWG.Done()
// Wait for the client listen port to be opened, and
// the possible ephemeral port to be selected.
<-clientListenReady
// Get all possible URLs (when server listens to 0.0.0.0).
// This is going to be sent to other Servers, so that they can let their
// clients know about us.
clientConnectURLs := s.getClientConnectURLs()
// Check for TLSConfig
tlsReq := s.opts.ClusterTLSConfig != nil
info := Info{
ID: s.info.ID,
Version: s.info.Version,
Host: s.opts.ClusterHost,
Port: s.opts.ClusterPort,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
ID: s.info.ID,
Version: s.info.Version,
Host: s.opts.ClusterHost,
Port: s.opts.ClusterPort,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
ClientConnectURLs: clientConnectURLs,
}
// Check for Auth items
if s.opts.ClusterUsername != "" {

View File

@@ -24,17 +24,21 @@ import (
// Info is the information sent to clients to help them understand information
// about this server.
type Info struct {
ID string `json:"server_id"`
Version string `json:"version"`
GoVersion string `json:"go"`
Host string `json:"host"`
Port int `json:"port"`
AuthRequired bool `json:"auth_required"`
SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients
TLSRequired bool `json:"tls_required"`
TLSVerify bool `json:"tls_verify"`
MaxPayload int `json:"max_payload"`
IP string `json:"ip,omitempty"`
ID string `json:"server_id"`
Version string `json:"version"`
GoVersion string `json:"go"`
Host string `json:"host"`
Port int `json:"port"`
AuthRequired bool `json:"auth_required"`
SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients
TLSRequired bool `json:"tls_required"`
TLSVerify bool `json:"tls_verify"`
MaxPayload int `json:"max_payload"`
IP string `json:"ip,omitempty"`
ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to.
// Used internally for quick look-ups.
clientConnectURLs map[string]struct{}
}
// Server is our main struct.
@@ -69,6 +73,7 @@ type Server struct {
grTmpClients map[uint64]*client
grRunning bool
grWG sync.WaitGroup // to wait on various go routines
cproto int64 // number of clients supporting async INFO
}
// Make sure all are 64bits for atomic use
@@ -89,16 +94,17 @@ func New(opts *Options) *Server {
verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAnyClientCert)
info := Info{
ID: genID(),
Version: VERSION,
GoVersion: runtime.Version(),
Host: opts.Host,
Port: opts.Port,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,
TLSVerify: verify,
MaxPayload: opts.MaxPayload,
ID: genID(),
Version: VERSION,
GoVersion: runtime.Version(),
Host: opts.Host,
Port: opts.Port,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,
TLSVerify: verify,
MaxPayload: opts.MaxPayload,
clientConnectURLs: make(map[string]struct{}),
}
s := &Server{
@@ -236,9 +242,15 @@ func (s *Server) Start() {
s.StartHTTPSMonitoring()
}
// The Routing routine needs to wait for the client listen
// port to be opened and potential ephemeral port selected.
clientListenReady := make(chan struct{})
// Start up routing as well if needed.
if s.opts.ClusterPort != 0 {
s.StartRouting()
s.startGoRoutine(func() {
s.StartRouting(clientListenReady)
})
}
// Pprof http endpoint for the profiler.
@@ -247,7 +259,7 @@ func (s *Server) Start() {
}
// Wait for clients.
s.AcceptLoop()
s.AcceptLoop(clientListenReady)
}
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
@@ -329,7 +341,7 @@ func (s *Server) Shutdown() {
}
// AcceptLoop is exported for easier testing.
func (s *Server) AcceptLoop() {
func (s *Server) AcceptLoop(clr chan struct{}) {
hp := net.JoinHostPort(s.opts.Host, strconv.Itoa(s.opts.Port))
Noticef("Listening for client connections on %s", hp)
l, e := net.Listen("tcp", hp)
@@ -370,6 +382,9 @@ func (s *Server) AcceptLoop() {
}
s.mu.Unlock()
// Let the caller know that we are ready
close(clr)
tmpDelay := ACCEPT_MIN_SLEEP
for s.isRunning() {
@@ -597,6 +612,30 @@ func (s *Server) createClient(conn net.Conn) *client {
return c
}
// updateServerINFO updates the server's Info object with the given
// array of URLs and re-generate the infoJSON byte array, only if the
// given URLs were not already recorded.
// Returns a boolean indicating if server's Info was updated.
func (s *Server) updateServerINFO(urls []string) bool {
s.mu.Lock()
defer s.mu.Unlock()
// Will be set to true if we alter the server's Info object.
wasUpdated := false
for _, url := range urls {
if _, present := s.info.clientConnectURLs[url]; !present {
s.info.clientConnectURLs[url] = struct{}{}
s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
wasUpdated = true
}
}
if wasUpdated {
s.generateServerInfoJSON()
}
return wasUpdated
}
// Handle closing down a connection when the handshake has timedout.
func tlsTimeout(c *client, conn *tls.Conn) {
c.mu.Lock()
@@ -700,12 +739,19 @@ func (s *Server) removeClient(c *client) {
if r != nil {
rID = r.remoteID
}
updateProtoInfoCount := false
if typ == CLIENT && c.opts.Protocol >= ClientProtoInfo {
updateProtoInfoCount = true
}
c.mu.Unlock()
s.mu.Lock()
switch typ {
case CLIENT:
delete(s.clients, cid)
if updateProtoInfoCount {
s.cproto--
}
case ROUTER:
delete(s.routes, cid)
if r != nil {
@@ -824,3 +870,45 @@ func (s *Server) startGoRoutine(f func()) {
}
s.grMu.Unlock()
}
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
// port based on the server options' Host and Port. If the Host corresponds to
// "any" interfaces, this call returns the list of resolved IP addresses.
func (s *Server) getClientConnectURLs() []string {
s.mu.Lock()
defer s.mu.Unlock()
sPort := strconv.Itoa(s.opts.Port)
urls := make([]string, 0, 1)
ipAddr, err := net.ResolveIPAddr("ip", s.opts.Host)
// If the host is "any" (0.0.0.0 or ::), get specific IPs from available
// interfaces.
if err == nil && ipAddr.IP.IsUnspecified() {
var ip net.IP
ifaces, _ := net.Interfaces()
for _, i := range ifaces {
addrs, _ := i.Addrs()
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
// Skip loopback/localhost
if ip.IsLoopback() {
ip = nil
continue
}
urls = append(urls, net.JoinHostPort(ip.String(), sPort))
}
}
}
if err != nil || len(urls) == 0 {
// We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some
// reason we could not add any URL in the loop above.
urls = append(urls, net.JoinHostPort(s.opts.Host, sPort))
}
return urls
}

View File

@@ -1,4 +1,4 @@
// Copyright 2015 Apcera Inc. All rights reserved.
// Copyright 2015-2016 Apcera Inc. All rights reserved.
package server
@@ -138,3 +138,36 @@ func TestTlsCipher(t *testing.T) {
t.Fatalf("Expected an unknown cipher.")
}
}
func TestGetConnectURLs(t *testing.T) {
opts := DefaultOptions
opts.Host = "0.0.0.0"
opts.Port = 4222
s := New(&opts)
defer s.Shutdown()
urls := s.getClientConnectURLs()
if len(urls) == 0 {
t.Fatal("Expected to get a list of urls, got none")
}
for _, u := range urls {
if strings.HasPrefix(u, opts.Host) {
t.Fatalf("This URL looks wrong: %v", u)
}
}
s.Shutdown()
opts.Host = "localhost"
opts.Port = 4222
s = New(&opts)
defer s.Shutdown()
expectedURL := "localhost:4222"
urls = s.getClientConnectURLs()
if len(urls) == 0 {
t.Fatal("Expected to get a list of urls, got none")
}
if urls[0] != expectedURL {
t.Fatalf("Expected to get %v, got %v", expectedURL, urls[0])
}
}

View File

@@ -14,8 +14,12 @@ import (
"time"
"github.com/nats-io/gnatsd/server"
"reflect"
"strconv"
)
const clientProtoInfo = 1
func shutdownServerAndWait(t *testing.T, s *server.Server) bool {
listenSpec := s.GetListenEndpoint()
routeListenSpec := s.GetRouteListenEndpoint()
@@ -656,3 +660,166 @@ func TestRouteConnectOnShutdownRace(t *testing.T) {
wg.Wait()
}
func TestRouteSendAsyncINFOToClients(t *testing.T) {
s, opts := runRouteServer(t)
defer s.Shutdown()
clientURL := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
oldClient := createClientConn(t, opts.Host, opts.Port)
defer oldClient.Close()
oldClientSend, oldClientExpect := setupConn(t, oldClient)
oldClientSend("PING\r\n")
oldClientExpect(pongRe)
newClient := createClientConn(t, opts.Host, opts.Port)
defer newClient.Close()
newClientSend, newClientExpect := setupConnWithProto(t, newClient, clientProtoInfo)
newClientSend("PING\r\n")
newClientExpect(pongRe)
// Check that even a new client does not receive an async INFO at this point
// since there is no route created yet.
expectNothing(t, newClient)
routeID := "Server-B"
createRoute := func() (net.Conn, sendFun, expectFun) {
rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID)
buf := routeExpect(infoRe)
info := server.Info{}
if err := json.Unmarshal(buf[4:], &info); err != nil {
t.Fatalf("Could not unmarshal route info: %v", err)
}
if len(info.ClientConnectURLs) == 0 {
t.Fatal("Expected a list of URLs, got none")
}
if info.ClientConnectURLs[0] != clientURL {
t.Fatalf("Expected ClientConnectURLs to be %q, got %q", clientURL, info.ClientConnectURLs[0])
}
return rc, routeSend, routeExpect
}
sendRouteINFO := func(routeSend sendFun, routeExpect expectFun, urls []string) {
routeInfo := server.Info{}
routeInfo.ID = routeID
routeInfo.Host = "localhost"
routeInfo.Port = 5222
routeInfo.ClientConnectURLs = urls
b, err := json.Marshal(routeInfo)
if err != nil {
t.Fatalf("Could not marshal test route info: %v", err)
}
infoJSON := fmt.Sprintf("INFO %s\r\n", b)
routeSend(infoJSON)
routeSend("PING\r\n")
routeExpect(pongRe)
}
checkINFOReceived := func(clientExpect expectFun, expectedURLs []string) {
buf := clientExpect(infoRe)
info := server.Info{}
if err := json.Unmarshal(buf[4:], &info); err != nil {
t.Fatalf("Could not unmarshal route info: %v", err)
}
if !reflect.DeepEqual(info.ClientConnectURLs, expectedURLs) {
t.Fatalf("Expected ClientConnectURLs to be %v, got %v", expectedURLs, info.ClientConnectURLs)
}
}
// Create a route
rc, routeSend, routeExpect := createRoute()
defer rc.Close()
// Send an INFO with single URL
routeConnectURLs := []string{"localhost:5222"}
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
// Expect nothing for old clients
expectNothing(t, oldClient)
// Expect new client to receive an INFO
checkINFOReceived(newClientExpect, routeConnectURLs)
// Disconnect and reconnect the route.
rc.Close()
rc, routeSend, routeExpect = createRoute()
defer rc.Close()
// Resend the same route INFO json, since there is no new URL,
// no client should receive an INFO
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
// Expect nothing for old clients
expectNothing(t, oldClient)
// Expect nothing for new clients as well (no real update)
expectNothing(t, newClient)
// Now stop the route and restart with an additional URL
rc.Close()
rc, routeSend, routeExpect = createRoute()
defer rc.Close()
// Create a client not sending the CONNECT until after route is added
clientNoConnect := createClientConn(t, opts.Host, opts.Port)
defer clientNoConnect.Close()
// Create a client that does not send the first PING yet
clientNoPing := createClientConn(t, opts.Host, opts.Port)
defer clientNoPing.Close()
clientNoPingSend, clientNoPingExpect := setupConnWithProto(t, clientNoPing, clientProtoInfo)
// The route now has an additional URL
routeConnectURLs = append(routeConnectURLs, "localhost:7777")
// This causes the server to add the route and send INFO to clients
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)
// Expect nothing for old clients
expectNothing(t, oldClient)
// Expect new client to receive an INFO, and verify content as expected.
checkINFOReceived(newClientExpect, routeConnectURLs)
// Expect nothing yet for client that did not send the PING
expectNothing(t, clientNoPing)
// Now send the first PING
clientNoPingSend("PING\r\n")
// Should receive PONG followed by INFO
// Receive PONG only first
pongBuf := make([]byte, len("PONG\r\n"))
clientNoPing.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err := clientNoPing.Read(pongBuf)
clientNoPing.SetReadDeadline(time.Time{})
if n <= 0 && err != nil {
t.Fatalf("Error reading from conn: %v\n", err)
}
if !pongRe.Match(pongBuf) {
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe)
}
checkINFOReceived(clientNoPingExpect, routeConnectURLs)
// Have the client that did not send the connect do it now
clientNoConnectSend, clientNoConnectExpect := setupConnWithProto(t, clientNoConnect, clientProtoInfo)
// Send the PING
clientNoConnectSend("PING\r\n")
// Should receive PONG followed by INFO
// Receive PONG only first
clientNoConnect.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err = clientNoConnect.Read(pongBuf)
clientNoConnect.SetReadDeadline(time.Time{})
if n <= 0 && err != nil {
t.Fatalf("Error reading from conn: %v\n", err)
}
if !pongRe.Match(pongBuf) {
t.Fatalf("Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", pongBuf, pongRe)
}
checkINFOReceived(clientNoConnectExpect, routeConnectURLs)
}

View File

@@ -244,6 +244,13 @@ func setupConn(t tLogger, c net.Conn) (sendFun, expectFun) {
return sendCommand(t, c), expectCommand(t, c)
}
func setupConnWithProto(t tLogger, c net.Conn, proto int) (sendFun, expectFun) {
checkInfoMsg(t, c)
cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"ssl_required\":%v,\"protocol\":%d}\r\n", false, false, false, proto)
sendProto(t, c, cs)
return sendCommand(t, c), expectCommand(t, c)
}
type sendFun func(string)
type expectFun func(*regexp.Regexp) []byte