mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 11:04:42 -07:00
TLS timeout handling for routes
This commit is contained in:
@@ -481,6 +481,9 @@ func processOptions(opts *Options) {
|
||||
if opts.AuthTimeout == 0 {
|
||||
opts.AuthTimeout = float64(AUTH_TIMEOUT) / float64(time.Second)
|
||||
}
|
||||
if opts.ClusterTLSTimeout == 0 {
|
||||
opts.ClusterTLSTimeout = float64(TLS_TIMEOUT) / float64(time.Second)
|
||||
}
|
||||
if opts.ClusterAuthTimeout == 0 {
|
||||
opts.ClusterAuthTimeout = float64(AUTH_TIMEOUT) / float64(time.Second)
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ func TestDefaultOptions(t *testing.T) {
|
||||
MaxPayload: MAX_PAYLOAD_SIZE,
|
||||
MaxPending: MAX_PENDING_SIZE,
|
||||
ClusterAuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
|
||||
ClusterTLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second),
|
||||
BufSize: DEFAULT_BUF_SIZE,
|
||||
}
|
||||
|
||||
|
||||
@@ -160,14 +160,31 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
}
|
||||
|
||||
conn := c.nc.(*tls.Conn)
|
||||
err := conn.Handshake()
|
||||
if err != nil {
|
||||
|
||||
// Setup the timeout
|
||||
ttl := secondsToDuration(s.opts.ClusterTLSTimeout)
|
||||
time.AfterFunc(ttl, func() { tlsTimeout(c, conn) })
|
||||
conn.SetReadDeadline(time.Now().Add(ttl))
|
||||
|
||||
c.mu.Unlock()
|
||||
if err := conn.Handshake(); err != nil {
|
||||
c.Debugf("TLS route handshake error: %v", err)
|
||||
c.sendErr("Secure Connection - TLS Required")
|
||||
c.closeConnection()
|
||||
return nil
|
||||
}
|
||||
// Reset the read deadline
|
||||
conn.SetReadDeadline(time.Time{})
|
||||
|
||||
// Re-Grab lock
|
||||
c.mu.Lock()
|
||||
|
||||
// Rewrap bw
|
||||
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
|
||||
|
||||
c.Debugf("TLS handshake complete")
|
||||
cs := conn.ConnectionState()
|
||||
c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite))
|
||||
}
|
||||
|
||||
// Queue Connect proto if we solicited the connection.
|
||||
@@ -366,7 +383,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
continue
|
||||
}
|
||||
tmpDelay = ACCEPT_MIN_SLEEP
|
||||
s.createRoute(conn, nil)
|
||||
go s.createRoute(conn, nil)
|
||||
}
|
||||
Debugf("Router accept loop exiting..")
|
||||
s.done <- true
|
||||
@@ -415,7 +432,7 @@ func (s *Server) reConnectToRoute(rUrl *url.URL) {
|
||||
}
|
||||
|
||||
func (s *Server) connectToRoute(rUrl *url.URL) {
|
||||
for s.isRunning() {
|
||||
for s.isRunning() && rUrl != nil {
|
||||
Debugf("Trying to connect to route on %s", rUrl.Host)
|
||||
conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL)
|
||||
if err != nil {
|
||||
|
||||
@@ -467,6 +467,7 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
|
||||
// Rewrap bw
|
||||
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
|
||||
|
||||
c.Debugf("TLS handshake complete")
|
||||
cs := conn.ConnectionState()
|
||||
c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite))
|
||||
|
||||
@@ -147,7 +147,7 @@ func TestTLSConnectionTimeout(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// Read deadlines
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second))
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
|
||||
// Read the INFO string.
|
||||
br := bufio.NewReader(conn)
|
||||
@@ -161,7 +161,7 @@ func TestTLSConnectionTimeout(t *testing.T) {
|
||||
wait := time.Duration(opts.TLSTimeout * float64(time.Second))
|
||||
time.Sleep(wait)
|
||||
// Read deadlines
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second))
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
tlsErr, err := br.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading error response - %v\n", err)
|
||||
|
||||
Reference in New Issue
Block a user