[ADDED] Reject clients connecting to route's listen port

This happens sometimes, and the latest occurence was today:
https://github.com/nats-io/java-nats/issues/96

When it happens, there is no error but subscribers would not receive
anything, etc...

This PR uses the fact that clients set the field Lang in the CONNECT
protocol that ROUTEs do not. I have checked that all Apcera supported
clients do set Lang in the CONNECT protocol.
If we plan to add Lang for routes, we need to find another field or
use a new one, in which case that would work only for new clients
(that would need to be updated).

With this change, when the server accepts a connection on the route
port and detects that this protocol field is present, it now closes
the client connection.

The nice thing is that newer clients, when incorrectly connecting
to the route port, get from the route's INFO the list of client URLs,
which means that on the initial connect error, they are able to
subsequently connect to the proper client port, so it is transparent
to the user (which may or may not be a good thing). However, it is not
guaranteed because if the client is not setting NoRandomize to true,
the client URL is added but the array shuffled, so it is possible that
the client library does not find the correct port in the connect loop.
This commit is contained in:
Ivan Kozlovic
2017-01-24 19:17:26 -07:00
parent 0d45f49d90
commit 7c59ce3dba
4 changed files with 77 additions and 3 deletions

View File

@@ -281,8 +281,12 @@ func (c *client) readLoop() {
c.cache.subs = 0
if err := c.parse(b[:n]); err != nil {
// handled inline
if err != ErrMaxPayload && err != ErrAuthorization {
// If client connection has been closed, simply return,
// otherwise report a generic parsing error.
c.mu.Lock()
closed := c.nc == nil
c.mu.Unlock()
if !closed {
c.Errorf("Error reading from client: %s", err.Error())
c.sendErr("Parser Error")
c.closeConnection()
@@ -446,6 +450,7 @@ func (c *client) processConnect(arg []byte) error {
// Capture these under lock
proto := c.opts.Protocol
verbose := c.opts.Verbose
lang := c.opts.Lang
c.mu.Unlock()
if srv != nil {
@@ -468,7 +473,15 @@ func (c *client) processConnect(arg []byte) error {
// Check client protocol request if it exists.
if typ == CLIENT && (proto < ClientProtoZero || proto > ClientProtoInfo) {
c.sendErr(ErrBadClientProtocol.Error())
c.closeConnection()
return ErrBadClientProtocol
} else if typ == ROUTER && lang != "" {
// Way to detect clients that incorrectly connect to the route listen
// port. Client provide Lang in the CONNECT protocol while ROUTEs don't.
c.sendErr(ErrClientConnectedToRoutePort.Error())
c.closeConnection()
return ErrClientConnectedToRoutePort
}
// Grab connection name of remote route.

View File

@@ -177,7 +177,7 @@ func TestClientConnect(t *testing.T) {
}
func TestClientConnectProto(t *testing.T) {
_, c, _ := setupClient()
_, c, r := setupClient()
// Basic Connect setting flags, proto should be zero (original proto)
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n")
@@ -210,6 +210,19 @@ func TestClientConnectProto(t *testing.T) {
// Illegal Option
connectOp = []byte("CONNECT {\"protocol\":22}\r\n")
wg := sync.WaitGroup{}
wg.Add(1)
// The client here is using a pipe, we need to be dequeuing
// data otherwise the server would be blocked trying to send
// the error back to it.
go func() {
defer wg.Done()
for {
if _, _, err := r.ReadLine(); err != nil {
return
}
}
}()
err = c.parse(connectOp)
if err == nil {
t.Fatalf("Expected to receive an error\n")
@@ -217,6 +230,7 @@ func TestClientConnectProto(t *testing.T) {
if err != ErrBadClientProtocol {
t.Fatalf("Expected err of %q, got %q\n", ErrBadClientProtocol, err)
}
wg.Wait()
}
func TestClientPing(t *testing.T) {

View File

@@ -29,4 +29,8 @@ var (
// ErrTooManyConnections signals a client that the maximum number of connections supported by the
// server has been reached.
ErrTooManyConnections = errors.New("Maximum Connections Exceeded")
// ErrClientConnectedToRoutePort represents an error condition when a client
// attempted to connect to the route listen port.
ErrClientConnectedToRoutePort = errors.New("Attempted To Connect To Route Port")
)

View File

@@ -477,3 +477,46 @@ func TestRouteUseIPv6(t *testing.T) {
t.Fatal("Server failed to start route accept loop")
}
}
func TestClientConnectToRoutePort(t *testing.T) {
opts := DefaultOptions
opts.Cluster.NoAdvertise = true
s := RunServer(&opts)
defer s.Shutdown()
url := fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, opts.Cluster.Port)
clientURL := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
total := 100
for i := 0; i < total; i++ {
nc, err := nats.Connect(url)
if err == nil {
// It is possible that the client reconnects because
// it gets from the initial connect to the route the
// connectedUrl array and may be able to reconnects
// to the client URL.
// If connected, it should be to the client URL
if nc.ConnectedUrl() != clientURL {
t.Fatalf("Expected client to be connected to %v, got %v", clientURL, nc.ConnectedUrl())
}
nc.Close()
}
// If error, it could be ErrClientConnectingToRoutePort or
// other (EOF, etc)... so not checking for specific one.
}
// When disabling randomization, the client URL is added to the server
// pool and so should be tried after the connection is closed trying
// to connect to the route port. Connect must always succeed and
// must be connected to client URL.
for i := 0; i < total; i++ {
nc, err := nats.Connect(url, nats.DontRandomize())
if err == nil {
if nc.ConnectedUrl() != clientURL {
t.Fatalf("Expected client to be connected to %v, got %v", clientURL, nc.ConnectedUrl())
}
nc.Close()
continue
}
t.Fatalf("Error on connect: %v", err)
}
}