From 7c59ce3dba5d54c3d3dff35c56aef4b8c6720213 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 24 Jan 2017 19:17:26 -0700 Subject: [PATCH 1/2] [ADDED] Reject clients connecting to route's listen port This happens sometimes, and the latest occurence was today: https://github.com/nats-io/java-nats/issues/96 When it happens, there is no error but subscribers would not receive anything, etc... This PR uses the fact that clients set the field Lang in the CONNECT protocol that ROUTEs do not. I have checked that all Apcera supported clients do set Lang in the CONNECT protocol. If we plan to add Lang for routes, we need to find another field or use a new one, in which case that would work only for new clients (that would need to be updated). With this change, when the server accepts a connection on the route port and detects that this protocol field is present, it now closes the client connection. The nice thing is that newer clients, when incorrectly connecting to the route port, get from the route's INFO the list of client URLs, which means that on the initial connect error, they are able to subsequently connect to the proper client port, so it is transparent to the user (which may or may not be a good thing). However, it is not guaranteed because if the client is not setting NoRandomize to true, the client URL is added but the array shuffled, so it is possible that the client library does not find the correct port in the connect loop. --- server/client.go | 17 +++++++++++++++-- server/client_test.go | 16 +++++++++++++++- server/errors.go | 4 ++++ server/routes_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 77 insertions(+), 3 deletions(-) diff --git a/server/client.go b/server/client.go index b3f5f1cf..c96de3c7 100644 --- a/server/client.go +++ b/server/client.go @@ -281,8 +281,12 @@ func (c *client) readLoop() { c.cache.subs = 0 if err := c.parse(b[:n]); err != nil { - // handled inline - if err != ErrMaxPayload && err != ErrAuthorization { + // If client connection has been closed, simply return, + // otherwise report a generic parsing error. + c.mu.Lock() + closed := c.nc == nil + c.mu.Unlock() + if !closed { c.Errorf("Error reading from client: %s", err.Error()) c.sendErr("Parser Error") c.closeConnection() @@ -446,6 +450,7 @@ func (c *client) processConnect(arg []byte) error { // Capture these under lock proto := c.opts.Protocol verbose := c.opts.Verbose + lang := c.opts.Lang c.mu.Unlock() if srv != nil { @@ -468,7 +473,15 @@ func (c *client) processConnect(arg []byte) error { // Check client protocol request if it exists. if typ == CLIENT && (proto < ClientProtoZero || proto > ClientProtoInfo) { + c.sendErr(ErrBadClientProtocol.Error()) + c.closeConnection() return ErrBadClientProtocol + } else if typ == ROUTER && lang != "" { + // Way to detect clients that incorrectly connect to the route listen + // port. Client provide Lang in the CONNECT protocol while ROUTEs don't. + c.sendErr(ErrClientConnectedToRoutePort.Error()) + c.closeConnection() + return ErrClientConnectedToRoutePort } // Grab connection name of remote route. diff --git a/server/client_test.go b/server/client_test.go index 630a4af5..46a48064 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -177,7 +177,7 @@ func TestClientConnect(t *testing.T) { } func TestClientConnectProto(t *testing.T) { - _, c, _ := setupClient() + _, c, r := setupClient() // Basic Connect setting flags, proto should be zero (original proto) connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n") @@ -210,6 +210,19 @@ func TestClientConnectProto(t *testing.T) { // Illegal Option connectOp = []byte("CONNECT {\"protocol\":22}\r\n") + wg := sync.WaitGroup{} + wg.Add(1) + // The client here is using a pipe, we need to be dequeuing + // data otherwise the server would be blocked trying to send + // the error back to it. + go func() { + defer wg.Done() + for { + if _, _, err := r.ReadLine(); err != nil { + return + } + } + }() err = c.parse(connectOp) if err == nil { t.Fatalf("Expected to receive an error\n") @@ -217,6 +230,7 @@ func TestClientConnectProto(t *testing.T) { if err != ErrBadClientProtocol { t.Fatalf("Expected err of %q, got %q\n", ErrBadClientProtocol, err) } + wg.Wait() } func TestClientPing(t *testing.T) { diff --git a/server/errors.go b/server/errors.go index fee22923..cc22bd1e 100644 --- a/server/errors.go +++ b/server/errors.go @@ -29,4 +29,8 @@ var ( // ErrTooManyConnections signals a client that the maximum number of connections supported by the // server has been reached. ErrTooManyConnections = errors.New("Maximum Connections Exceeded") + + // ErrClientConnectedToRoutePort represents an error condition when a client + // attempted to connect to the route listen port. + ErrClientConnectedToRoutePort = errors.New("Attempted To Connect To Route Port") ) diff --git a/server/routes_test.go b/server/routes_test.go index e0f5f90d..9cd0e30e 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -477,3 +477,46 @@ func TestRouteUseIPv6(t *testing.T) { t.Fatal("Server failed to start route accept loop") } } + +func TestClientConnectToRoutePort(t *testing.T) { + opts := DefaultOptions + opts.Cluster.NoAdvertise = true + s := RunServer(&opts) + defer s.Shutdown() + + url := fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, opts.Cluster.Port) + clientURL := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + total := 100 + for i := 0; i < total; i++ { + nc, err := nats.Connect(url) + if err == nil { + // It is possible that the client reconnects because + // it gets from the initial connect to the route the + // connectedUrl array and may be able to reconnects + // to the client URL. + // If connected, it should be to the client URL + if nc.ConnectedUrl() != clientURL { + t.Fatalf("Expected client to be connected to %v, got %v", clientURL, nc.ConnectedUrl()) + } + nc.Close() + } + // If error, it could be ErrClientConnectingToRoutePort or + // other (EOF, etc)... so not checking for specific one. + } + + // When disabling randomization, the client URL is added to the server + // pool and so should be tried after the connection is closed trying + // to connect to the route port. Connect must always succeed and + // must be connected to client URL. + for i := 0; i < total; i++ { + nc, err := nats.Connect(url, nats.DontRandomize()) + if err == nil { + if nc.ConnectedUrl() != clientURL { + t.Fatalf("Expected client to be connected to %v, got %v", clientURL, nc.ConnectedUrl()) + } + nc.Close() + continue + } + t.Fatalf("Error on connect: %v", err) + } +} From c925e1c9bf66d9f970d559b1061440e328007dc0 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 25 Jan 2017 09:15:47 -0700 Subject: [PATCH 2/2] Restored behavior after c.parse() error --- server/client.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/client.go b/server/client.go index c96de3c7..5a64a79c 100644 --- a/server/client.go +++ b/server/client.go @@ -281,12 +281,8 @@ func (c *client) readLoop() { c.cache.subs = 0 if err := c.parse(b[:n]); err != nil { - // If client connection has been closed, simply return, - // otherwise report a generic parsing error. - c.mu.Lock() - closed := c.nc == nil - c.mu.Unlock() - if !closed { + // handled inline + if err != ErrMaxPayload && err != ErrAuthorization { c.Errorf("Error reading from client: %s", err.Error()) c.sendErr("Parser Error") c.closeConnection()