diff --git a/TODO.md b/TODO.md index 5c507fcf..aa6aef46 100644 --- a/TODO.md +++ b/TODO.md @@ -1,6 +1,7 @@ # General +- [ ] Auth for queue groups? - [ ] Blacklist or ERR escalation to close connection for auth/permissions - [ ] Protocol updates, MAP, MPUB, etc - [ ] Multiple listen endpoints diff --git a/server/client.go b/server/client.go index bf5ffdd0..85b4c19f 100644 --- a/server/client.go +++ b/server/client.go @@ -13,6 +13,23 @@ import ( "time" ) +// Type of client connection. +const ( + // CLIENT is an end user. + CLIENT = iota + // ROUTER is another router in the cluster. + ROUTER +) + +const ( + // Original Client protocol from 2009. + // http://nats.io/documentation/internals/nats-protocol/ + ClientProtoZero = iota + // This signals a client can receive more then the original INFO block. + // This can be used to update clients on other cluster members, etc. + ClientProtoInfo +) + func init() { rand.Seed(time.Now().UnixNano()) } @@ -30,14 +47,42 @@ const ( maxBufSize = 65536 ) -// Type of client +// Represent client booleans with a bitmask +type clientFlag byte + +// Some client state represented as flags const ( - // CLIENT is an end user. - CLIENT = iota - // ROUTER is another router in the cluster. - ROUTER + connectReceived clientFlag = 1 << iota // The CONNECT proto has been received + firstPongSent // The first PONG has been sent + infoUpdated // The server's Info object has changed before first PONG was sent ) +// set the flag (would be equivalent to set the boolean to true) +func (cf *clientFlag) set(c clientFlag) { + *cf |= c +} + +// isSet returns true if the flag is set, false otherwise +func (cf clientFlag) isSet(c clientFlag) bool { + return cf&c != 0 +} + +// setIfNotSet will set the flag `c` only if that flag was not already +// set and return true to indicate that the flag has been set. Returns +// false otherwise. +func (cf *clientFlag) setIfNotSet(c clientFlag) bool { + if *cf&c == 0 { + *cf |= c + return true + } + return false +} + +// clear unset the flag (would be equivalent to set the boolean to false) +func (cf *clientFlag) clear(c clientFlag) { + *cf &= ^c +} + type client struct { // Here first because of use of atomics, and memory alignment. stats @@ -65,8 +110,9 @@ type client struct { parseState route *route - debug bool trace bool + + flags clientFlag // Compact booleans into a single field. Size will be increased when needed. } type permissions struct { @@ -118,6 +164,7 @@ type clientOpts struct { Name string `json:"name"` Lang string `json:"lang"` Version string `json:"version"` + Protocol int `json:"protocol"` } var defaultOpts = clientOpts{Verbose: true, Pedantic: true} @@ -132,7 +179,6 @@ func (c *client) initClient() { c.cid = atomic.AddUint64(&s.gcid, 1) c.bw = bufio.NewWriterSize(c.nc, startBufSize) c.subs = make(map[string]*subscription) - c.debug = (atomic.LoadInt32(&debug) != 0) c.trace = (atomic.LoadInt32(&trace) != 0) // This is a scratch buffer used for processMsg() @@ -371,13 +417,34 @@ func (c *client) processConnect(arg []byte) error { typ := c.typ r := c.route srv := c.srv - c.mu.Unlock() - + // Moved unmarshalling of clients' Options under the lock. + // The client has already been added to the server map, so it is possible + // that other routines lookup the client, and access its options under + // the client's lock, so unmarshalling the options outside of the lock + // would cause data RACEs. if err := json.Unmarshal(arg, &c.opts); err != nil { + c.mu.Unlock() return err } + // Indicate that the CONNECT protocol has been received, and that the + // server now knows which protocol this client supports. + c.flags.set(connectReceived) + // Capture these under lock + proto := c.opts.Protocol + verbose := c.opts.Verbose + c.mu.Unlock() if srv != nil { + // As soon as c.opts is unmarshalled and if the proto is at + // least ClientProtoInfo, we need to increment the following counter. + // This is decremented when client is removed from the server's + // clients map. + if proto >= ClientProtoInfo { + srv.mu.Lock() + srv.cproto++ + srv.mu.Unlock() + } + // Check for Auth if ok := srv.checkAuth(c); !ok { c.authViolation() @@ -385,6 +452,11 @@ func (c *client) processConnect(arg []byte) error { } } + // Check client protocol request if it exists. + if typ == CLIENT && (proto < ClientProtoZero || proto > ClientProtoInfo) { + return ErrBadClientProtocol + } + // Grab connection name of remote route. if typ == ROUTER && r != nil { c.mu.Lock() @@ -392,7 +464,7 @@ func (c *client) processConnect(arg []byte) error { c.mu.Unlock() } - if c.opts.Verbose { + if verbose { c.sendOK() } return nil @@ -449,12 +521,15 @@ func (c *client) sendInfo(info []byte) { func (c *client) sendErr(err string) { c.mu.Lock() + c.traceOutOp("-ERR", []byte(err)) c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", err)), true) c.mu.Unlock() } func (c *client) sendOK() { c.mu.Lock() + c.traceOutOp("OK", nil) + // Can not autoflush this one, needs to be async. c.sendProto([]byte("+OK\r\n"), false) c.pcd[c] = needFlush c.mu.Unlock() @@ -473,7 +548,33 @@ func (c *client) processPing() { c.clearConnection() c.Debugf("Error on Flush, error %s", err.Error()) } + srv := c.srv + sendUpdateINFO := false + // Check if this is the first PONG, if so... + if c.flags.setIfNotSet(firstPongSent) { + // Check if server should send an async INFO protocol to the client + if c.opts.Protocol >= ClientProtoInfo && + srv != nil && c.flags.isSet(infoUpdated) { + sendUpdateINFO = true + } + // We can now clear the flag + c.flags.clear(infoUpdated) + } c.mu.Unlock() + + // Some clients send an initial PING as part of the synchronous connect process. + // They can't be receiving anything until the first PONG is received. + // So we delay the possible updated INFO after this point. + if sendUpdateINFO { + srv.mu.Lock() + // Use the cached protocol + proto := srv.infoJSON + srv.mu.Unlock() + + c.mu.Lock() + c.sendInfo(proto) + c.mu.Unlock() + } } func (c *client) processPong() { diff --git a/server/client_test.go b/server/client_test.go index 8c39e919..1260a9b7 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -16,6 +16,7 @@ import ( "time" "crypto/tls" + "github.com/nats-io/nats" ) @@ -167,6 +168,49 @@ func TestClientConnect(t *testing.T) { } } +func TestClientConnectProto(t *testing.T) { + _, c, _ := setupClient() + + // Basic Connect setting flags, proto should be zero (original proto) + connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n") + err := c.parse(connectOp) + if err != nil { + t.Fatalf("Received error: %v\n", err) + } + if c.state != OP_START { + t.Fatalf("Expected state of OP_START vs %d\n", c.state) + } + if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) { + t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) + } + + // ProtoInfo + connectOp = []byte(fmt.Sprintf("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false,\"protocol\":%d}\r\n", ClientProtoInfo)) + err = c.parse(connectOp) + if err != nil { + t.Fatalf("Received error: %v\n", err) + } + if c.state != OP_START { + t.Fatalf("Expected state of OP_START vs %d\n", c.state) + } + if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) { + t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) + } + if c.opts.Protocol != ClientProtoInfo { + t.Fatalf("Protocol should have been set to %v, but is set to %v", ClientProtoInfo, c.opts.Protocol) + } + + // Illegal Option + connectOp = []byte("CONNECT {\"protocol\":22}\r\n") + err = c.parse(connectOp) + if err == nil { + t.Fatalf("Expected to receive an error\n") + } + if err != ErrBadClientProtocol { + t.Fatalf("Expected err of %q, got %q\n", ErrBadClientProtocol, err) + } +} + func TestClientPing(t *testing.T) { _, c, cr := setupClient() diff --git a/server/errors.go b/server/errors.go index b8ebb11b..aa33f06f 100644 --- a/server/errors.go +++ b/server/errors.go @@ -22,4 +22,7 @@ var ( // ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.> ErrReservedPublishSubject = errors.New("Reserved Internal Subject") + + // ErrBadClientProtocol signals a client requested an invalud client protocol. + ErrBadClientProtocol = errors.New("Invalid Client Protocol") ) diff --git a/server/route.go b/server/route.go index 29e7d248..06dd04a1 100644 --- a/server/route.go +++ b/server/route.go @@ -153,12 +153,63 @@ func (c *client) processRouteInfo(info *Info) { // Now let the known servers know about this new route s.forwardNewRouteInfoToKnownServers(info) } + // If the server Info did not have these URLs, update and send an INFO + // protocol to all clients that support it. + if s.updateServerINFO(info.ClientConnectURLs) { + s.sendAsyncInfoToClients() + } } else { c.Debugf("Detected duplicate remote route %q", info.ID) c.closeConnection() } } +// sendAsyncInfoToClients sends an INFO protocol to all +// connected clients that accept async INFO updates. +func (s *Server) sendAsyncInfoToClients() { + s.mu.Lock() + // If there are no clients supporting async INFO protocols, we are done. + if s.cproto == 0 { + s.mu.Unlock() + return + } + + // Capture under lock + proto := s.infoJSON + + // Make a copy of ALL clients so we can release server lock while + // sending the protocol to clients. We could check the conditions + // (proto support, first PONG sent) here and so have potentially + // a limited number of clients, but that would mean grabbing the + // client's lock here, which we don't want since we would still + // need it in the second loop. + clients := make([]*client, 0, len(s.clients)) + for _, c := range s.clients { + clients = append(clients, c) + } + s.mu.Unlock() + + for _, c := range clients { + c.mu.Lock() + // If server did not yet receive the CONNECT protocol, check later + // when sending the first PONG. + if !c.flags.isSet(connectReceived) { + c.flags.set(infoUpdated) + } else if c.opts.Protocol >= ClientProtoInfo { + // Send only if first PONG was sent + if c.flags.isSet(firstPongSent) { + // sendInfo takes care of checking if the connection is still + // valid or not, so don't duplicate tests here. + c.sendInfo(proto) + } else { + // Otherwise, notify that INFO has changed and check later. + c.flags.set(infoUpdated) + } + } + c.mu.Unlock() + } +} + // This will process implicit route information received from another server. // We will check to see if we have configured or are already connected, // and if so we will ignore. Otherwise we will attempt to connect. @@ -579,19 +630,31 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { // StartRouting will start the accept loop on the cluster host:port // and will actively try to connect to listed routes. -func (s *Server) StartRouting() { +func (s *Server) StartRouting(clientListenReady chan struct{}) { + defer s.grWG.Done() + + // Wait for the client listen port to be opened, and + // the possible ephemeral port to be selected. + <-clientListenReady + + // Get all possible URLs (when server listens to 0.0.0.0). + // This is going to be sent to other Servers, so that they can let their + // clients know about us. + clientConnectURLs := s.getClientConnectURLs() + // Check for TLSConfig tlsReq := s.opts.ClusterTLSConfig != nil info := Info{ - ID: s.info.ID, - Version: s.info.Version, - Host: s.opts.ClusterHost, - Port: s.opts.ClusterPort, - AuthRequired: false, - TLSRequired: tlsReq, - SSLRequired: tlsReq, - TLSVerify: tlsReq, - MaxPayload: s.info.MaxPayload, + ID: s.info.ID, + Version: s.info.Version, + Host: s.opts.ClusterHost, + Port: s.opts.ClusterPort, + AuthRequired: false, + TLSRequired: tlsReq, + SSLRequired: tlsReq, + TLSVerify: tlsReq, + MaxPayload: s.info.MaxPayload, + ClientConnectURLs: clientConnectURLs, } // Check for Auth items if s.opts.ClusterUsername != "" { diff --git a/server/server.go b/server/server.go index 683d2da1..254e833f 100644 --- a/server/server.go +++ b/server/server.go @@ -24,17 +24,21 @@ import ( // Info is the information sent to clients to help them understand information // about this server. type Info struct { - ID string `json:"server_id"` - Version string `json:"version"` - GoVersion string `json:"go"` - Host string `json:"host"` - Port int `json:"port"` - AuthRequired bool `json:"auth_required"` - SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients - TLSRequired bool `json:"tls_required"` - TLSVerify bool `json:"tls_verify"` - MaxPayload int `json:"max_payload"` - IP string `json:"ip,omitempty"` + ID string `json:"server_id"` + Version string `json:"version"` + GoVersion string `json:"go"` + Host string `json:"host"` + Port int `json:"port"` + AuthRequired bool `json:"auth_required"` + SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients + TLSRequired bool `json:"tls_required"` + TLSVerify bool `json:"tls_verify"` + MaxPayload int `json:"max_payload"` + IP string `json:"ip,omitempty"` + ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. + + // Used internally for quick look-ups. + clientConnectURLs map[string]struct{} } // Server is our main struct. @@ -69,6 +73,7 @@ type Server struct { grTmpClients map[uint64]*client grRunning bool grWG sync.WaitGroup // to wait on various go routines + cproto int64 // number of clients supporting async INFO } // Make sure all are 64bits for atomic use @@ -89,16 +94,17 @@ func New(opts *Options) *Server { verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAnyClientCert) info := Info{ - ID: genID(), - Version: VERSION, - GoVersion: runtime.Version(), - Host: opts.Host, - Port: opts.Port, - AuthRequired: false, - TLSRequired: tlsReq, - SSLRequired: tlsReq, - TLSVerify: verify, - MaxPayload: opts.MaxPayload, + ID: genID(), + Version: VERSION, + GoVersion: runtime.Version(), + Host: opts.Host, + Port: opts.Port, + AuthRequired: false, + TLSRequired: tlsReq, + SSLRequired: tlsReq, + TLSVerify: verify, + MaxPayload: opts.MaxPayload, + clientConnectURLs: make(map[string]struct{}), } s := &Server{ @@ -236,9 +242,15 @@ func (s *Server) Start() { s.StartHTTPSMonitoring() } + // The Routing routine needs to wait for the client listen + // port to be opened and potential ephemeral port selected. + clientListenReady := make(chan struct{}) + // Start up routing as well if needed. if s.opts.ClusterPort != 0 { - s.StartRouting() + s.startGoRoutine(func() { + s.StartRouting(clientListenReady) + }) } // Pprof http endpoint for the profiler. @@ -247,7 +259,7 @@ func (s *Server) Start() { } // Wait for clients. - s.AcceptLoop() + s.AcceptLoop(clientListenReady) } // Shutdown will shutdown the server instance by kicking out the AcceptLoop @@ -329,7 +341,7 @@ func (s *Server) Shutdown() { } // AcceptLoop is exported for easier testing. -func (s *Server) AcceptLoop() { +func (s *Server) AcceptLoop(clientListenReady chan struct{}) { hp := net.JoinHostPort(s.opts.Host, strconv.Itoa(s.opts.Port)) Noticef("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) @@ -370,6 +382,9 @@ func (s *Server) AcceptLoop() { } s.mu.Unlock() + // Let the caller know that we are ready + close(clientListenReady) + tmpDelay := ACCEPT_MIN_SLEEP for s.isRunning() { @@ -597,6 +612,30 @@ func (s *Server) createClient(conn net.Conn) *client { return c } +// updateServerINFO updates the server's Info object with the given +// array of URLs and re-generate the infoJSON byte array, only if the +// given URLs were not already recorded. +// Returns a boolean indicating if server's Info was updated. +func (s *Server) updateServerINFO(urls []string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + // Will be set to true if we alter the server's Info object. + wasUpdated := false + for _, url := range urls { + if _, present := s.info.clientConnectURLs[url]; !present { + + s.info.clientConnectURLs[url] = struct{}{} + s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url) + wasUpdated = true + } + } + if wasUpdated { + s.generateServerInfoJSON() + } + return wasUpdated +} + // Handle closing down a connection when the handshake has timedout. func tlsTimeout(c *client, conn *tls.Conn) { c.mu.Lock() @@ -700,12 +739,19 @@ func (s *Server) removeClient(c *client) { if r != nil { rID = r.remoteID } + updateProtoInfoCount := false + if typ == CLIENT && c.opts.Protocol >= ClientProtoInfo { + updateProtoInfoCount = true + } c.mu.Unlock() s.mu.Lock() switch typ { case CLIENT: delete(s.clients, cid) + if updateProtoInfoCount { + s.cproto-- + } case ROUTER: delete(s.routes, cid) if r != nil { @@ -824,3 +870,45 @@ func (s *Server) startGoRoutine(f func()) { } s.grMu.Unlock() } + +// getClientConnectURLs returns suitable URLs for clients to connect to the listen +// port based on the server options' Host and Port. If the Host corresponds to +// "any" interfaces, this call returns the list of resolved IP addresses. +func (s *Server) getClientConnectURLs() []string { + s.mu.Lock() + defer s.mu.Unlock() + + sPort := strconv.Itoa(s.opts.Port) + urls := make([]string, 0, 1) + + ipAddr, err := net.ResolveIPAddr("ip", s.opts.Host) + // If the host is "any" (0.0.0.0 or ::), get specific IPs from available + // interfaces. + if err == nil && ipAddr.IP.IsUnspecified() { + var ip net.IP + ifaces, _ := net.Interfaces() + for _, i := range ifaces { + addrs, _ := i.Addrs() + for _, addr := range addrs { + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + // Skip loopback/localhost + if ip.IsLoopback() { + ip = nil + continue + } + urls = append(urls, net.JoinHostPort(ip.String(), sPort)) + } + } + } + if err != nil || len(urls) == 0 { + // We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some + // reason we could not add any URL in the loop above. + urls = append(urls, net.JoinHostPort(s.opts.Host, sPort)) + } + return urls +} diff --git a/server/server_test.go b/server/server_test.go index c9625d41..c58fba70 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1,4 +1,4 @@ -// Copyright 2015 Apcera Inc. All rights reserved. +// Copyright 2015-2016 Apcera Inc. All rights reserved. package server @@ -138,3 +138,36 @@ func TestTlsCipher(t *testing.T) { t.Fatalf("Expected an unknown cipher.") } } + +func TestGetConnectURLs(t *testing.T) { + opts := DefaultOptions + opts.Host = "0.0.0.0" + opts.Port = 4222 + s := New(&opts) + defer s.Shutdown() + + urls := s.getClientConnectURLs() + if len(urls) == 0 { + t.Fatal("Expected to get a list of urls, got none") + } + for _, u := range urls { + if strings.HasPrefix(u, opts.Host) { + t.Fatalf("This URL looks wrong: %v", u) + } + } + s.Shutdown() + + opts.Host = "localhost" + opts.Port = 4222 + s = New(&opts) + defer s.Shutdown() + + expectedURL := "localhost:4222" + urls = s.getClientConnectURLs() + if len(urls) == 0 { + t.Fatal("Expected to get a list of urls, got none") + } + if urls[0] != expectedURL { + t.Fatalf("Expected to get %v, got %v", expectedURL, urls[0]) + } +} diff --git a/test/routes_test.go b/test/routes_test.go index 683954e1..1f458037 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -14,8 +14,12 @@ import ( "time" "github.com/nats-io/gnatsd/server" + "reflect" + "strconv" ) +const clientProtoInfo = 1 + func shutdownServerAndWait(t *testing.T, s *server.Server) bool { listenSpec := s.GetListenEndpoint() routeListenSpec := s.GetRouteListenEndpoint() @@ -656,3 +660,120 @@ func TestRouteConnectOnShutdownRace(t *testing.T) { wg.Wait() } + +func TestRouteSendAsyncINFOToClients(t *testing.T) { + s, opts := runRouteServer(t) + defer s.Shutdown() + + clientURL := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) + + oldClient := createClientConn(t, opts.Host, opts.Port) + defer oldClient.Close() + + oldClientSend, oldClientExpect := setupConn(t, oldClient) + oldClientSend("PING\r\n") + oldClientExpect(pongRe) + + newClient := createClientConn(t, opts.Host, opts.Port) + defer newClient.Close() + + newClientSend, newClientExpect := setupConnWithProto(t, newClient, clientProtoInfo) + newClientSend("PING\r\n") + newClientExpect(pongRe) + + // Check that even a new client does not receive an async INFO at this point + // since there is no route created yet. + expectNothing(t, newClient) + + routeID := "Server-B" + + createRoute := func() (net.Conn, sendFun, expectFun) { + rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID) + + buf := routeExpect(infoRe) + info := server.Info{} + if err := json.Unmarshal(buf[4:], &info); err != nil { + t.Fatalf("Could not unmarshal route info: %v", err) + } + if len(info.ClientConnectURLs) == 0 { + t.Fatal("Expected a list of URLs, got none") + } + if info.ClientConnectURLs[0] != clientURL { + t.Fatalf("Expected ClientConnectURLs to be %q, got %q", clientURL, info.ClientConnectURLs[0]) + } + + return rc, routeSend, routeExpect + } + + sendRouteINFO := func(routeSend sendFun, routeExpect expectFun, urls []string) { + routeInfo := server.Info{} + routeInfo.ID = routeID + routeInfo.Host = "localhost" + routeInfo.Port = 5222 + routeInfo.ClientConnectURLs = urls + b, err := json.Marshal(routeInfo) + if err != nil { + t.Fatalf("Could not marshal test route info: %v", err) + } + infoJSON := fmt.Sprintf("INFO %s\r\n", b) + routeSend(infoJSON) + routeSend("PING\r\n") + routeExpect(pongRe) + } + + checkINFOReceived := func(clientExpect expectFun, expectedURLs []string) { + buf := clientExpect(infoRe) + info := server.Info{} + if err := json.Unmarshal(buf[4:], &info); err != nil { + t.Fatalf("Could not unmarshal route info: %v", err) + } + if !reflect.DeepEqual(info.ClientConnectURLs, expectedURLs) { + t.Fatalf("Expected ClientConnectURLs to be %v, got %v", expectedURLs, info.ClientConnectURLs) + } + } + + // Create a route + rc, routeSend, routeExpect := createRoute() + defer rc.Close() + + // Send an INFO with single URL + routeConnectURLs := []string{"localhost:5222"} + sendRouteINFO(routeSend, routeExpect, routeConnectURLs) + + // Expect nothing for old clients + expectNothing(t, oldClient) + + // Expect new client to receive an INFO + checkINFOReceived(newClientExpect, routeConnectURLs) + + // Disconnect and reconnect the route. + rc.Close() + rc, routeSend, routeExpect = createRoute() + defer rc.Close() + + // Resend the same route INFO json, since there is no new URL, + // no client should receive an INFO + sendRouteINFO(routeSend, routeExpect, routeConnectURLs) + + // Expect nothing for old clients + expectNothing(t, oldClient) + + // Expect nothing for new clients as well (no real update) + expectNothing(t, newClient) + + // Now stop the route and restart with an additional URL + rc.Close() + rc, routeSend, routeExpect = createRoute() + defer rc.Close() + + // The route now has an additional URL + routeConnectURLs = append(routeConnectURLs, "localhost:7777") + sendRouteINFO(routeSend, routeExpect, routeConnectURLs) + + // Expect nothing for old clients + expectNothing(t, oldClient) + + // Expect new client to receive an INFO, and verify content as expected. + checkINFOReceived(newClientExpect, routeConnectURLs) +} diff --git a/test/test.go b/test/test.go index e475129c..db9b59c3 100644 --- a/test/test.go +++ b/test/test.go @@ -244,6 +244,13 @@ func setupConn(t tLogger, c net.Conn) (sendFun, expectFun) { return sendCommand(t, c), expectCommand(t, c) } +func setupConnWithProto(t tLogger, c net.Conn, proto int) (sendFun, expectFun) { + checkInfoMsg(t, c) + cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"ssl_required\":%v,\"protocol\":%d}\r\n", false, false, false, proto) + sendProto(t, c, cs) + return sendCommand(t, c), expectCommand(t, c) +} + type sendFun func(string) type expectFun func(*regexp.Regexp) []byte