diff --git a/go.mod b/go.mod index 96c06cb0..9735fb01 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/nats-io/nats-server/v2 require ( github.com/minio/highwayhash v1.0.0 github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed - github.com/nats-io/nats.go v1.10.1-0.20200601214746-e93e18d0ed6f + github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a github.com/nats-io/nkeys v0.2.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 diff --git a/go.sum b/go.sum index b5e683be..43fe0de5 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1 github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= -github.com/nats-io/nats.go v1.10.1-0.20200601214746-e93e18d0ed6f h1:ILS2coqvxP2B/pptNWB5xLu626mIiflILpKwEvma9eQ= -github.com/nats-io/nats.go v1.10.1-0.20200601214746-e93e18d0ed6f/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= +github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a h1:gzSKZOBlu/DpbuPbt34paXCOvA6+E+lVfU2BmomQ9HA= +github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM= diff --git a/server/client.go b/server/client.go index 47d9253b..4298b015 100644 --- a/server/client.go +++ b/server/client.go @@ -868,7 +868,7 @@ func (c *client) flushClients(budget time.Duration) time.Time { // readLoop is the main socket read functionality. // Runs in its own Go routine. -func (c *client) readLoop() { +func (c *client) readLoop(pre []byte) { // Grab the connection off the client, it will be cleared on a close. // We check for that after the loop, but want to avoid a nil dereference c.mu.Lock() @@ -918,6 +918,11 @@ func (c *client) readLoop() { wsr.init() } + // If we have a pre parse that first. + if len(pre) > 0 { + c.parse(pre) + } + for { n, err := nc.Read(b) // If we have any data we will try to parse and exit at the end. diff --git a/server/gateway.go b/server/gateway.go index acae75c1..08622c81 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -846,7 +846,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { } // Spin up the read loop. - s.startGoRoutine(func() { c.readLoop() }) + s.startGoRoutine(func() { c.readLoop(nil) }) // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) diff --git a/server/leafnode.go b/server/leafnode.go index e72d4d37..2305dccf 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -813,7 +813,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { } // Spin up the read loop. - s.startGoRoutine(func() { c.readLoop() }) + s.startGoRoutine(func() { c.readLoop(nil) }) // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) diff --git a/server/opts.go b/server/opts.go index 0ee18590..aec8e839 100644 --- a/server/opts.go +++ b/server/opts.go @@ -208,6 +208,7 @@ type Options struct { TLSKey string `json:"-"` TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` + AllowNonTLS bool `json:"-"` WriteDeadline time.Duration `json:"-"` MaxClosedClients int `json:"-"` LameDuckDuration time.Duration `json:"-"` @@ -714,6 +715,9 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error } o.TLSTimeout = tc.Timeout o.TLSMap = tc.Map + + case "allow_non_tls": + o.AllowNonTLS = v.(bool) case "write_deadline": o.WriteDeadline = parseDuration("write_deadline", tk, v, errors, warnings) case "lame_duck_duration": diff --git a/server/route.go b/server/route.go index 0a69a74b..5b20501d 100644 --- a/server/route.go +++ b/server/route.go @@ -1222,7 +1222,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } // Spin up the read loop. - s.startGoRoutine(func() { c.readLoop() }) + s.startGoRoutine(func() { c.readLoop(nil) }) // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) diff --git a/server/server.go b/server/server.go index 50fe1046..568362e1 100644 --- a/server/server.go +++ b/server/server.go @@ -73,6 +73,7 @@ type Info struct { AuthRequired bool `json:"auth_required,omitempty"` TLSRequired bool `json:"tls_required,omitempty"` TLSVerify bool `json:"tls_verify,omitempty"` + TLSAvailable bool `json:"tls_available,omitempty"` MaxPayload int32 `json:"max_payload"` JetStream bool `json:"jetstream,omitempty"` IP string `json:"ip,omitempty"` @@ -275,13 +276,17 @@ func NewServer(opts *Options) (*Server, error) { Host: opts.Host, Port: opts.Port, AuthRequired: false, - TLSRequired: tlsReq, + TLSRequired: tlsReq && !opts.AllowNonTLS, TLSVerify: verify, MaxPayload: opts.MaxPayload, JetStream: opts.JetStream, Headers: !opts.NoHeaderSupport, } + if tlsReq && !info.TLSRequired { + info.TLSAvailable = true + } + now := time.Now() s := &Server{ @@ -1856,6 +1861,24 @@ func (s *Server) copyInfo() Info { return info } +// tlsMixConn is used when we can receive both TLS and non-TLS connections on same port. +type tlsMixConn struct { + net.Conn + pre *bytes.Buffer +} + +// Read for our mixed multi-reader. +func (c *tlsMixConn) Read(b []byte) (int, error) { + if c.pre != nil { + n, err := c.pre.Read(b) + if c.pre.Len() == 0 { + c.pre = nil + } + return n, err + } + return c.Conn.Read(b) +} + func (s *Server) createClient(conn net.Conn, ws *websocket) *client { // Snapshot server options. opts := s.getOpts() @@ -1902,7 +1925,6 @@ func (s *Server) createClient(conn net.Conn, ws *websocket) *client { // TLS handshake is done (if applicable). c.sendProtoNow(c.generateClientInfoJSON(info)) - tlsRequired := ws == nil && info.TLSRequired // Unlock to register c.mu.Unlock() @@ -1930,9 +1952,34 @@ func (s *Server) createClient(conn net.Conn, ws *websocket) *client { // Re-Grab lock c.mu.Lock() + tlsRequired := ws == nil && info.TLSRequired + var pre []byte + // If we have both TLS and non-TLS allowed we need to see which + // one the client wants. + if opts.TLSConfig != nil && opts.AllowNonTLS { + pre = make([]byte, 8) + c.nc.SetReadDeadline(time.Now().Add(25 * time.Millisecond)) + n, err := c.nc.Read(pre[:]) + c.nc.SetReadDeadline(time.Time{}) + pre = pre[:n] + // Assume TLS unless we see nothing or CONNECT. + if err != nil || bytes.Contains(pre, []byte("CONNECT")) { + tlsRequired = false + } else { + tlsRequired = true + } + } + // Check for TLS if tlsRequired { c.Debugf("Starting TLS client connection handshake") + // If we have a prebuffer create a multi-reader. + if len(pre) > 0 { + c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)} + // Clear pre so it is not parsed. + pre = nil + } + c.nc = tls.Server(c.nc, opts.TLSConfig) conn := c.nc.(*tls.Conn) @@ -1983,7 +2030,7 @@ func (s *Server) createClient(conn net.Conn, ws *websocket) *client { c.setPingTimer() // Spin up the read loop. - s.startGoRoutine(func() { c.readLoop() }) + s.startGoRoutine(func() { c.readLoop(pre) }) // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) diff --git a/test/configs/tls_mixed.conf b/test/configs/tls_mixed.conf new file mode 100644 index 00000000..6b218fe8 --- /dev/null +++ b/test/configs/tls_mixed.conf @@ -0,0 +1,15 @@ +# Allow TLS and non TLS on same port. + +listen: 127.0.0.1:-1 + +tls { + # Server cert + cert_file: "./configs/certs/server-cert.pem" + # Server private key + key_file: "./configs/certs/server-key.pem" + # Specified time for handshake to complete + timeout: 2 +} + +# This allows non tls traffic on same port. +allow_non_tls: true diff --git a/test/tls_test.go b/test/tls_test.go index 6aa4e2b6..b5649198 100644 --- a/test/tls_test.go +++ b/test/tls_test.go @@ -859,6 +859,55 @@ func TestTLSAuthorizationShortTimeout(t *testing.T) { } } +func TestClientTLSAndNonTLSConnections(t *testing.T) { + s, opts := RunServerWithConfig("./configs/tls_mixed.conf") + defer s.Shutdown() + + surl := fmt.Sprintf("tls://%s:%d", opts.Host, opts.Port) + nc, err := nats.Connect(surl, nats.RootCAs("./configs/certs/ca.pem")) + if err != nil { + t.Fatalf("Failed to connect with TLS: %v", err) + } + defer nc.Close() + + // Now also make sure we can connect through plain text. + nurl := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + nc2, err := nats.Connect(nurl) + if err != nil { + t.Fatalf("Failed to connect without TLS: %v", err) + } + defer nc2.Close() + + // Make sure they can go back and forth. + sub, err := nc.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + sub2, err := nc2.SubscribeSync("bar") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + nc.Flush() + nc2.Flush() + + nmsgs := 100 + for i := 0; i < nmsgs; i++ { + nc2.Publish("foo", []byte("HELLO FROM PLAINTEXT")) + nc.Publish("bar", []byte("HELLO FROM TLS")) + } + // Now wait for the messages. + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if msgs, _, err := sub.Pending(); err != nil || msgs != nmsgs { + return fmt.Errorf("Did not receive the correct number of messages: %d", msgs) + } + if msgs, _, err := sub2.Pending(); err != nil || msgs != nmsgs { + return fmt.Errorf("Did not receive the correct number of messages: %d", msgs) + } + return nil + }) + +} + func stressConnect(t *testing.T, wg *sync.WaitGroup, errCh chan error, url string, index int) { defer wg.Done() diff --git a/vendor/github.com/nats-io/nats.go/context.go b/vendor/github.com/nats-io/nats.go/context.go index c921d6be..769f88a0 100644 --- a/vendor/github.com/nats-io/nats.go/context.go +++ b/vendor/github.com/nats-io/nats.go/context.go @@ -11,8 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build go1.7 - // A Go client for the NATS messaging system (https://nats.io). package nats @@ -21,9 +19,33 @@ import ( "reflect" ) +// RequestMsgWithContext takes a context, a subject and payload +// in bytes and request expecting a single response. +func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error) { + var hdr []byte + var err error + + if len(msg.Header) > 0 { + if !nc.info.Headers { + return nil, ErrHeadersNotSupported + } + + hdr, err = msg.headerBytes() + if err != nil { + return nil, err + } + } + + return nc.requestWithContext(ctx, msg.Subject, hdr, msg.Data) +} + // RequestWithContext takes a context, a subject and payload // in bytes and request expecting a single response. func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) { + return nc.requestWithContext(ctx, subj, nil, data) +} + +func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) { if ctx == nil { return nil, ErrInvalidContext } @@ -40,10 +62,10 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte // If user wants the old style. if nc.Opts.UseOldRequestStyle { nc.mu.Unlock() - return nc.oldRequestWithContext(ctx, subj, data) + return nc.oldRequestWithContext(ctx, subj, hdr, data) } - mch, token, err := nc.createNewRequestAndSend(subj, data) + mch, token, err := nc.createNewRequestAndSend(subj, hdr, data) if err != nil { return nil, err } @@ -67,7 +89,7 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte } // oldRequestWithContext utilizes inbox and subscription per request. -func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) { +func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) { inbox := NewInbox() ch := make(chan *Msg, RequestChanLen) @@ -78,7 +100,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []b s.AutoUnsubscribe(1) defer s.Unsubscribe() - err = nc.PublishRequest(subj, inbox, data) + err = nc.publish(subj, inbox, hdr, data) if err != nil { return nil, err } diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index d3fa2b99..daa56fdb 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -510,6 +510,31 @@ type Msg struct { barrier *barrierInfo } +func (m *Msg) headerBytes() ([]byte, error) { + var hdr []byte + if len(m.Header) == 0 { + return hdr, nil + } + + var b bytes.Buffer + _, err := b.WriteString(hdrLine) + if err != nil { + return nil, ErrBadHeaderMsg + } + + err = m.Header.Write(&b) + if err != nil { + return nil, ErrBadHeaderMsg + } + + _, err = b.WriteString(crlf) + if err != nil { + return nil, ErrBadHeaderMsg + } + + return b.Bytes(), nil +} + type barrierInfo struct { refs int64 f func() @@ -543,6 +568,7 @@ type serverInfo struct { Version string `json:"version"` AuthRequired bool `json:"auth_required"` TLSRequired bool `json:"tls_required"` + TLSAvailable bool `json:"tls_available"` Headers bool `json:"headers"` MaxPayload int64 `json:"max_payload"` ConnectURLs []string `json:"connect_urls,omitempty"` @@ -1576,7 +1602,7 @@ func (nc *Conn) checkForSecure() error { o := nc.Opts // Check for mismatch in setups - if o.Secure && !nc.info.TLSRequired { + if o.Secure && !nc.info.TLSRequired && !nc.info.TLSAvailable { return ErrSecureConnWanted } else if nc.info.TLSRequired && !o.Secure { // Switch to Secure since server needs TLS. @@ -2687,18 +2713,21 @@ func (nc *Conn) PublishMsg(m *Msg) error { if m == nil { return ErrInvalidMsg } + var hdr []byte + var err error + if len(m.Header) > 0 { if !nc.info.Headers { return ErrHeadersNotSupported } - // FIXME(dlc) - Optimize - var b bytes.Buffer - b.WriteString(hdrLine) - m.Header.Write(&b) - b.WriteString(crlf) - hdr = b.Bytes() + + hdr, err = m.headerBytes() + if err != nil { + return err + } } + return nc.publish(m.Subject, m.Reply, hdr, m.Data) } @@ -2874,7 +2903,7 @@ func (nc *Conn) respHandler(m *Msg) { } // Helper to setup and send new request style requests. Return the chan to receive the response. -func (nc *Conn) createNewRequestAndSend(subj string, data []byte) (chan *Msg, string, error) { +func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) { // Do setup for the new style if needed. if nc.respMap == nil { nc.initNewResp() @@ -2898,28 +2927,55 @@ func (nc *Conn) createNewRequestAndSend(subj string, data []byte) (chan *Msg, st } nc.mu.Unlock() - if err := nc.PublishRequest(subj, respInbox, data); err != nil { + if err := nc.publish(subj, respInbox, hdr, data); err != nil { return nil, token, err } return mch, token, nil } +// RequestMsg will send a request payload including optional headers and deliver +// the response message, or an error, including a timeout if no message was received properly. +func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) { + var hdr []byte + var err error + + if len(msg.Header) > 0 { + if !nc.info.Headers { + return nil, ErrHeadersNotSupported + } + + hdr, err = msg.headerBytes() + if err != nil { + return nil, err + } + } + + return nc.request(msg.Subject, hdr, msg.Data, timeout) +} + // Request will send a request payload and deliver the response message, // or an error, including a timeout if no message was received properly. func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) { + return nc.request(subj, nil, data, timeout) +} + +func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { if nc == nil { return nil, ErrInvalidConnection } nc.mu.Lock() - // If user wants the old style. if nc.Opts.UseOldRequestStyle { nc.mu.Unlock() - return nc.oldRequest(subj, data, timeout) + return nc.oldRequest(subj, hdr, data, timeout) } - mch, token, err := nc.createNewRequestAndSend(subj, data) + return nc.newRequest(subj, hdr, data, timeout) +} + +func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { + mch, token, err := nc.createNewRequestAndSend(subj, hdr, data) if err != nil { return nil, err } @@ -2948,7 +3004,7 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, // oldRequest will create an Inbox and perform a Request() call // with the Inbox reply and return the first reply received. // This is optimized for the case of multiple responses. -func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Msg, error) { +func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { inbox := NewInbox() ch := make(chan *Msg, RequestChanLen) @@ -2959,10 +3015,11 @@ func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Ms s.AutoUnsubscribe(1) defer s.Unsubscribe() - err = nc.PublishRequest(subj, inbox, data) + err = nc.publish(subj, inbox, hdr, data) if err != nil { return nil, err } + return s.NextMsg(timeout) } @@ -3653,6 +3710,21 @@ func (m *Msg) Respond(data []byte) error { return nc.Publish(m.Reply, data) } +// RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers +func (m *Msg) RespondMsg(msg *Msg) error { + if m == nil || m.Sub == nil { + return ErrMsgNotBound + } + if m.Reply == "" { + return ErrMsgNoReply + } + m.Sub.mu.Lock() + nc := m.Sub.conn + m.Sub.mu.Unlock() + // No need to check the connection here since the call to publish will do all the checking. + return nc.PublishMsg(msg) +} + // FIXME: This is a hack // removeFlushEntry is needed when we need to discard queued up responses // for our pings as part of a flush call. This happens when we have a flush diff --git a/vendor/modules.txt b/vendor/modules.txt index f99068cb..6a05436b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2,7 +2,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.0-20200602193336-473d698956ed github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.10.1-0.20200601214746-e93e18d0ed6f +# github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin github.com/nats-io/nats.go/util